1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Inter-node RPC library.
23
24 """
25
26
27
28
29
30
31
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49
50
51 import ganeti.http.client
52
53
54
55 _RPC_CONNECT_TIMEOUT = 5
56
57 _RPC_CLIENT_HEADERS = [
58 "Content-type: %s" % http.HTTP_APP_JSON,
59 "Expect:",
60 ]
61
62
63 _TMO_URGENT = 60
64 _TMO_FAST = 5 * 60
65 _TMO_NORMAL = 15 * 60
66 _TMO_SLOW = 3600
67 _TMO_4HRS = 4 * 3600
68 _TMO_1DAY = 86400
69
70
71
72
73
74
75
76
77 _TIMEOUTS = {
78 }
82 """Initializes the module-global HTTP client manager.
83
84 Must be called before using any RPC function and while exactly one thread is
85 running.
86
87 """
88
89
90
91 assert threading.activeCount() == 1, \
92 "Found more than one active thread when initializing pycURL"
93
94 logging.info("Using PycURL %s", pycurl.version)
95
96 pycurl.global_init(pycurl.GLOBAL_ALL)
97
100 """Stops the module-global HTTP client manager.
101
102 Must be called before quitting the program and while exactly one thread is
103 running.
104
105 """
106 pycurl.global_cleanup()
107
110 noded_cert = str(constants.NODED_CERT_FILE)
111
112 curl.setopt(pycurl.FOLLOWLOCATION, False)
113 curl.setopt(pycurl.CAINFO, noded_cert)
114 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
115 curl.setopt(pycurl.SSL_VERIFYPEER, True)
116 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
117 curl.setopt(pycurl.SSLCERT, noded_cert)
118 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
119 curl.setopt(pycurl.SSLKEY, noded_cert)
120 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121
122
123
124
125 _threading = threading
130 """Returns a per-thread HTTP client pool.
131
132 @rtype: L{http.client.HttpClientPool}
133
134 """
135 try:
136 pool = self.hcp
137 except AttributeError:
138 pool = http.client.HttpClientPool(_ConfigRpcCurl)
139 self.hcp = pool
140
141 return pool
142
143
144
145 del _threading
146
147
148 _thread_local = _RpcThreadLocal()
152 """Timeout decorator.
153
154 When applied to a rpc call_* function, it updates the global timeout
155 table with the given function/timeout.
156
157 """
158 def decorator(f):
159 name = f.__name__
160 assert name.startswith("call_")
161 _TIMEOUTS[name[len("call_"):]] = secs
162 return f
163 return decorator
164
167 """RPC-wrapper decorator.
168
169 When applied to a function, it runs it with the RPC system
170 initialized, and it shutsdown the system afterwards. This means the
171 function must be called without RPC being initialized.
172
173 """
174 def wrapper(*args, **kwargs):
175 Init()
176 try:
177 return fn(*args, **kwargs)
178 finally:
179 Shutdown()
180 return wrapper
181
184 """RPC Result class.
185
186 This class holds an RPC result. It is needed since in multi-node
187 calls we can't raise an exception just because one one out of many
188 failed, and therefore we use this class to encapsulate the result.
189
190 @ivar data: the data payload, for successful results, or None
191 @ivar call: the name of the RPC call
192 @ivar node: the name of the node to which we made the call
193 @ivar offline: whether the operation failed because the node was
194 offline, as opposed to actual failure; offline=True will always
195 imply failed=True, in order to allow simpler checking if
196 the user doesn't care about the exact failure mode
197 @ivar fail_msg: the error message if the call failed
198
199 """
200 - def __init__(self, data=None, failed=False, offline=False,
201 call=None, node=None):
202 self.offline = offline
203 self.call = call
204 self.node = node
205
206 if offline:
207 self.fail_msg = "Node is marked offline"
208 self.data = self.payload = None
209 elif failed:
210 self.fail_msg = self._EnsureErr(data)
211 self.data = self.payload = None
212 else:
213 self.data = data
214 if not isinstance(self.data, (tuple, list)):
215 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
216 type(self.data))
217 self.payload = None
218 elif len(data) != 2:
219 self.fail_msg = ("RPC layer error: invalid result length (%d), "
220 "expected 2" % len(self.data))
221 self.payload = None
222 elif not self.data[0]:
223 self.fail_msg = self._EnsureErr(self.data[1])
224 self.payload = None
225 else:
226
227 self.fail_msg = None
228 self.payload = data[1]
229
230 for attr_name in ["call", "data", "fail_msg",
231 "node", "offline", "payload"]:
232 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
233
234 @staticmethod
236 """Helper to ensure we return a 'True' value for error."""
237 if val:
238 return val
239 else:
240 return "No error information"
241
242 - def Raise(self, msg, prereq=False, ecode=None):
243 """If the result has failed, raise an OpExecError.
244
245 This is used so that LU code doesn't have to check for each
246 result, but instead can call this function.
247
248 """
249 if not self.fail_msg:
250 return
251
252 if not msg:
253 msg = ("Call '%s' to node '%s' has failed: %s" %
254 (self.call, self.node, self.fail_msg))
255 else:
256 msg = "%s: %s" % (msg, self.fail_msg)
257 if prereq:
258 ec = errors.OpPrereqError
259 else:
260 ec = errors.OpExecError
261 if ecode is not None:
262 args = (msg, ecode)
263 else:
264 args = (msg, )
265 raise ec(*args)
266
271 """Return addresses for given node names.
272
273 @type node_list: list
274 @param node_list: List of node names
275 @type ssc: class
276 @param ssc: SimpleStore class that is used to obtain node->ip mappings
277 @type nslookup_fn: callable
278 @param nslookup_fn: function use to do NS lookup
279 @rtype: list of addresses and/or None's
280 @returns: List of corresponding addresses, if found
281
282 """
283 ss = ssc()
284 iplist = ss.GetNodePrimaryIPList()
285 family = ss.GetPrimaryIPFamily()
286 addresses = []
287 ipmap = dict(entry.split() for entry in iplist)
288 for node in node_list:
289 address = ipmap.get(node)
290 if address is None:
291 address = nslookup_fn(node, family=family)
292 addresses.append(address)
293
294 return addresses
295
298 """RPC Client class.
299
300 This class, given a (remote) method name, a list of parameters and a
301 list of nodes, will contact (in parallel) all nodes, and return a
302 dict of results (key: node name, value: result).
303
304 One current bug is that generic failure is still signaled by
305 'False' result, which is not good. This overloading of values can
306 cause bugs.
307
308 """
310 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
311 " timeouts table")
312 self.procedure = procedure
313 self.body = body
314 self.port = port
315 self._request = {}
316 self._address_lookup_fn = address_lookup_fn
317
318 - def ConnectList(self, node_list, address_list=None, read_timeout=None):
319 """Add a list of nodes to the target nodes.
320
321 @type node_list: list
322 @param node_list: the list of node names to connect
323 @type address_list: list or None
324 @keyword address_list: either None or a list with node addresses,
325 which must have the same length as the node list
326 @type read_timeout: int
327 @param read_timeout: overwrites default timeout for operation
328
329 """
330 if address_list is None:
331
332 address_list = self._address_lookup_fn(node_list)
333
334 assert len(node_list) == len(address_list), \
335 "Name and address lists must have the same length"
336
337 for node, address in zip(node_list, address_list):
338 self.ConnectNode(node, address, read_timeout=read_timeout)
339
340 - def ConnectNode(self, name, address=None, read_timeout=None):
341 """Add a node to the target list.
342
343 @type name: str
344 @param name: the node name
345 @type address: str
346 @param address: the node address, if known
347 @type read_timeout: int
348 @param read_timeout: overwrites default timeout for operation
349
350 """
351 if address is None:
352
353 address = self._address_lookup_fn([name])[0]
354
355 assert(address is not None)
356
357 if read_timeout is None:
358 read_timeout = _TIMEOUTS[self.procedure]
359
360 self._request[name] = \
361 http.client.HttpClientRequest(str(address), self.port,
362 http.HTTP_PUT, str("/%s" % self.procedure),
363 headers=_RPC_CLIENT_HEADERS,
364 post_data=str(self.body),
365 read_timeout=read_timeout)
366
368 """Call nodes and return results.
369
370 @rtype: list
371 @return: List of RPC results
372
373 """
374 if not http_pool:
375 http_pool = http.client.HttpClientPool(_ConfigRpcCurl)
376
377 http_pool.ProcessRequests(self._request.values())
378
379 results = {}
380
381 for name, req in self._request.iteritems():
382 if req.success and req.resp_status_code == http.HTTP_OK:
383 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
384 node=name, call=self.procedure)
385 continue
386
387
388 if req.error:
389 msg = req.error
390 else:
391 msg = req.resp_body
392
393 logging.error("RPC error in %s from node %s: %s",
394 self.procedure, name, msg)
395 results[name] = RpcResult(data=msg, failed=True, node=name,
396 call=self.procedure)
397
398 return results
399
402 """Encodes import/export I/O information.
403
404 """
405 if ieio == constants.IEIO_RAW_DISK:
406 assert len(ieioargs) == 1
407 return (ieioargs[0].ToDict(), )
408
409 if ieio == constants.IEIO_SCRIPT:
410 assert len(ieioargs) == 2
411 return (ieioargs[0].ToDict(), ieioargs[1])
412
413 return ieioargs
414
417 """RPC runner class"""
418
420 """Initialized the rpc runner.
421
422 @type cfg: C{config.ConfigWriter}
423 @param cfg: the configuration object that will be used to get data
424 about the cluster
425
426 """
427 self._cfg = cfg
428 self.port = netutils.GetDaemonPort(constants.NODED)
429
430 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
431 """Convert the given instance to a dict.
432
433 This is done via the instance's ToDict() method and additionally
434 we fill the hvparams with the cluster defaults.
435
436 @type instance: L{objects.Instance}
437 @param instance: an Instance object
438 @type hvp: dict or None
439 @param hvp: a dictionary with overridden hypervisor parameters
440 @type bep: dict or None
441 @param bep: a dictionary with overridden backend parameters
442 @type osp: dict or None
443 @param osp: a dictionary with overridden os parameters
444 @rtype: dict
445 @return: the instance dict, with the hvparams filled with the
446 cluster defaults
447
448 """
449 idict = instance.ToDict()
450 cluster = self._cfg.GetClusterInfo()
451 idict["hvparams"] = cluster.FillHV(instance)
452 if hvp is not None:
453 idict["hvparams"].update(hvp)
454 idict["beparams"] = cluster.FillBE(instance)
455 if bep is not None:
456 idict["beparams"].update(bep)
457 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
458 if osp is not None:
459 idict["osparams"].update(osp)
460 for nic in idict["nics"]:
461 nic['nicparams'] = objects.FillDict(
462 cluster.nicparams[constants.PP_DEFAULT],
463 nic['nicparams'])
464 return idict
465
466 - def _ConnectList(self, client, node_list, call, read_timeout=None):
467 """Helper for computing node addresses.
468
469 @type client: L{ganeti.rpc.Client}
470 @param client: a C{Client} instance
471 @type node_list: list
472 @param node_list: the node list we should connect
473 @type call: string
474 @param call: the name of the remote procedure call, for filling in
475 correctly any eventual offline nodes' results
476 @type read_timeout: int
477 @param read_timeout: overwrites the default read timeout for the
478 given operation
479
480 """
481 all_nodes = self._cfg.GetAllNodesInfo()
482 name_list = []
483 addr_list = []
484 skip_dict = {}
485 for node in node_list:
486 if node in all_nodes:
487 if all_nodes[node].offline:
488 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
489 continue
490 val = all_nodes[node].primary_ip
491 else:
492 val = None
493 addr_list.append(val)
494 name_list.append(node)
495 if name_list:
496 client.ConnectList(name_list, address_list=addr_list,
497 read_timeout=read_timeout)
498 return skip_dict
499
500 - def _ConnectNode(self, client, node, call, read_timeout=None):
501 """Helper for computing one node's address.
502
503 @type client: L{ganeti.rpc.Client}
504 @param client: a C{Client} instance
505 @type node: str
506 @param node: the node we should connect
507 @type call: string
508 @param call: the name of the remote procedure call, for filling in
509 correctly any eventual offline nodes' results
510 @type read_timeout: int
511 @param read_timeout: overwrites the default read timeout for the
512 given operation
513
514 """
515 node_info = self._cfg.GetNodeInfo(node)
516 if node_info is not None:
517 if node_info.offline:
518 return RpcResult(node=node, offline=True, call=call)
519 addr = node_info.primary_ip
520 else:
521 addr = None
522 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
523
524 - def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
525 """Helper for making a multi-node call
526
527 """
528 body = serializer.DumpJson(args, indent=False)
529 c = Client(procedure, body, self.port)
530 skip_dict = self._ConnectList(c, node_list, procedure,
531 read_timeout=read_timeout)
532 skip_dict.update(c.GetResults())
533 return skip_dict
534
535 @classmethod
536 - def _StaticMultiNodeCall(cls, node_list, procedure, args,
537 address_list=None, read_timeout=None):
546
548 """Helper for making a single-node call
549
550 """
551 body = serializer.DumpJson(args, indent=False)
552 c = Client(procedure, body, self.port)
553 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
554 if result is None:
555
556 result = c.GetResults()[node]
557 return result
558
559 @classmethod
568
569 @staticmethod
571 """Compresses a string for transport over RPC.
572
573 Small amounts of data are not compressed.
574
575 @type data: str
576 @param data: Data
577 @rtype: tuple
578 @return: Encoded data to send
579
580 """
581
582 if len(data) < 512:
583 return (constants.RPC_ENCODING_NONE, data)
584
585
586 return (constants.RPC_ENCODING_ZLIB_BASE64,
587 base64.b64encode(zlib.compress(data, 3)))
588
589
590
591
592
593 @_RpcTimeout(_TMO_URGENT)
595 """Gets the logical volumes present in a given volume group.
596
597 This is a multi-node call.
598
599 """
600 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
601
602 @_RpcTimeout(_TMO_URGENT)
604 """Gets the volume group list.
605
606 This is a multi-node call.
607
608 """
609 return self._MultiNodeCall(node_list, "vg_list", [])
610
611 @_RpcTimeout(_TMO_NORMAL)
613 """Get list of storage units.
614
615 This is a multi-node call.
616
617 """
618 return self._MultiNodeCall(node_list, "storage_list",
619 [su_name, su_args, name, fields])
620
621 @_RpcTimeout(_TMO_NORMAL)
623 """Modify a storage unit.
624
625 This is a single-node call.
626
627 """
628 return self._SingleNodeCall(node, "storage_modify",
629 [su_name, su_args, name, changes])
630
631 @_RpcTimeout(_TMO_NORMAL)
633 """Executes an operation on a storage unit.
634
635 This is a single-node call.
636
637 """
638 return self._SingleNodeCall(node, "storage_execute",
639 [su_name, su_args, name, op])
640
641 @_RpcTimeout(_TMO_URGENT)
643 """Checks if a node has all the bridges given.
644
645 This method checks if all bridges given in the bridges_list are
646 present on the remote node, so that an instance that uses interfaces
647 on those bridges can be started.
648
649 This is a single-node call.
650
651 """
652 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
653
654 @_RpcTimeout(_TMO_NORMAL)
656 """Starts an instance.
657
658 This is a single-node call.
659
660 """
661 idict = self._InstDict(instance, hvp=hvp, bep=bep)
662 return self._SingleNodeCall(node, "instance_start", [idict])
663
664 @_RpcTimeout(_TMO_NORMAL)
666 """Stops an instance.
667
668 This is a single-node call.
669
670 """
671 return self._SingleNodeCall(node, "instance_shutdown",
672 [self._InstDict(instance), timeout])
673
674 @_RpcTimeout(_TMO_NORMAL)
676 """Gather the information necessary to prepare an instance migration.
677
678 This is a single-node call.
679
680 @type node: string
681 @param node: the node on which the instance is currently running
682 @type instance: C{objects.Instance}
683 @param instance: the instance definition
684
685 """
686 return self._SingleNodeCall(node, "migration_info",
687 [self._InstDict(instance)])
688
689 @_RpcTimeout(_TMO_NORMAL)
691 """Prepare a node to accept an instance.
692
693 This is a single-node call.
694
695 @type node: string
696 @param node: the target node for the migration
697 @type instance: C{objects.Instance}
698 @param instance: the instance definition
699 @type info: opaque/hypervisor specific (string/data)
700 @param info: result for the call_migration_info call
701 @type target: string
702 @param target: target hostname (usually ip address) (on the node itself)
703
704 """
705 return self._SingleNodeCall(node, "accept_instance",
706 [self._InstDict(instance), info, target])
707
708 @_RpcTimeout(_TMO_NORMAL)
710 """Finalize any target-node migration specific operation.
711
712 This is called both in case of a successful migration and in case of error
713 (in which case it should abort the migration).
714
715 This is a single-node call.
716
717 @type node: string
718 @param node: the target node for the migration
719 @type instance: C{objects.Instance}
720 @param instance: the instance definition
721 @type info: opaque/hypervisor specific (string/data)
722 @param info: result for the call_migration_info call
723 @type success: boolean
724 @param success: whether the migration was a success or a failure
725
726 """
727 return self._SingleNodeCall(node, "finalize_migration",
728 [self._InstDict(instance), info, success])
729
730 @_RpcTimeout(_TMO_SLOW)
732 """Migrate an instance.
733
734 This is a single-node call.
735
736 @type node: string
737 @param node: the node on which the instance is currently running
738 @type instance: C{objects.Instance}
739 @param instance: the instance definition
740 @type target: string
741 @param target: the target node name
742 @type live: boolean
743 @param live: whether the migration should be done live or not (the
744 interpretation of this parameter is left to the hypervisor)
745
746 """
747 return self._SingleNodeCall(node, "instance_migrate",
748 [self._InstDict(instance), target, live])
749
750 @_RpcTimeout(_TMO_NORMAL)
752 """Reboots an instance.
753
754 This is a single-node call.
755
756 """
757 return self._SingleNodeCall(node, "instance_reboot",
758 [self._InstDict(inst), reboot_type,
759 shutdown_timeout])
760
761 @_RpcTimeout(_TMO_1DAY)
763 """Installs an OS on the given instance.
764
765 This is a single-node call.
766
767 """
768 return self._SingleNodeCall(node, "instance_os_add",
769 [self._InstDict(inst, osp=osparams),
770 reinstall, debug])
771
772 @_RpcTimeout(_TMO_SLOW)
774 """Run the OS rename script for an instance.
775
776 This is a single-node call.
777
778 """
779 return self._SingleNodeCall(node, "instance_run_rename",
780 [self._InstDict(inst), old_name, debug])
781
782 @_RpcTimeout(_TMO_URGENT)
784 """Returns information about a single instance.
785
786 This is a single-node call.
787
788 @type node: list
789 @param node: the list of nodes to query
790 @type instance: string
791 @param instance: the instance name
792 @type hname: string
793 @param hname: the hypervisor type of the instance
794
795 """
796 return self._SingleNodeCall(node, "instance_info", [instance, hname])
797
798 @_RpcTimeout(_TMO_NORMAL)
800 """Checks whether the given instance can be migrated.
801
802 This is a single-node call.
803
804 @param node: the node to query
805 @type instance: L{objects.Instance}
806 @param instance: the instance to check
807
808
809 """
810 return self._SingleNodeCall(node, "instance_migratable",
811 [self._InstDict(instance)])
812
813 @_RpcTimeout(_TMO_URGENT)
815 """Returns information about all instances on the given nodes.
816
817 This is a multi-node call.
818
819 @type node_list: list
820 @param node_list: the list of nodes to query
821 @type hypervisor_list: list
822 @param hypervisor_list: the hypervisors to query for instances
823
824 """
825 return self._MultiNodeCall(node_list, "all_instances_info",
826 [hypervisor_list])
827
828 @_RpcTimeout(_TMO_URGENT)
830 """Returns the list of running instances on a given node.
831
832 This is a multi-node call.
833
834 @type node_list: list
835 @param node_list: the list of nodes to query
836 @type hypervisor_list: list
837 @param hypervisor_list: the hypervisors to query for instances
838
839 """
840 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
841
842 @_RpcTimeout(_TMO_FAST)
845 """Do a TcpPing on the remote node
846
847 This is a single-node call.
848
849 """
850 return self._SingleNodeCall(node, "node_tcp_ping",
851 [source, target, port, timeout,
852 live_port_needed])
853
854 @_RpcTimeout(_TMO_FAST)
856 """Checks if a node has the given IP address.
857
858 This is a single-node call.
859
860 """
861 return self._SingleNodeCall(node, "node_has_ip_address", [address])
862
863 @_RpcTimeout(_TMO_URGENT)
865 """Return node information.
866
867 This will return memory information and volume group size and free
868 space.
869
870 This is a multi-node call.
871
872 @type node_list: list
873 @param node_list: the list of nodes to query
874 @type vg_name: C{string}
875 @param vg_name: the name of the volume group to ask for disk space
876 information
877 @type hypervisor_type: C{str}
878 @param hypervisor_type: the name of the hypervisor to ask for
879 memory information
880
881 """
882 return self._MultiNodeCall(node_list, "node_info",
883 [vg_name, hypervisor_type])
884
885 @_RpcTimeout(_TMO_NORMAL)
887 """Modify hosts file with name
888
889 @type node: string
890 @param node: The node to call
891 @type mode: string
892 @param mode: The mode to operate. Currently "add" or "remove"
893 @type name: string
894 @param name: The host name to be modified
895 @type ip: string
896 @param ip: The ip of the entry (just valid if mode is "add")
897
898 """
899 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
900
901 @_RpcTimeout(_TMO_NORMAL)
903 """Request verification of given parameters.
904
905 This is a multi-node call.
906
907 """
908 return self._MultiNodeCall(node_list, "node_verify",
909 [checkdict, cluster_name])
910
911 @classmethod
912 @_RpcTimeout(_TMO_FAST)
914 """Tells a node to activate itself as a master.
915
916 This is a single-node call.
917
918 """
919 return cls._StaticSingleNodeCall(node, "node_start_master",
920 [start_daemons, no_voting])
921
922 @classmethod
923 @_RpcTimeout(_TMO_FAST)
925 """Tells a node to demote itself from master status.
926
927 This is a single-node call.
928
929 """
930 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
931
932 @classmethod
933 @_RpcTimeout(_TMO_URGENT)
935 """Query master info.
936
937 This is a multi-node call.
938
939 """
940
941 return cls._StaticMultiNodeCall(node_list, "master_info", [])
942
943 @classmethod
944 @_RpcTimeout(_TMO_URGENT)
946 """Query node version.
947
948 This is a multi-node call.
949
950 """
951 return cls._StaticMultiNodeCall(node_list, "version", [])
952
953 @_RpcTimeout(_TMO_NORMAL)
955 """Request creation of a given block device.
956
957 This is a single-node call.
958
959 """
960 return self._SingleNodeCall(node, "blockdev_create",
961 [bdev.ToDict(), size, owner, on_primary, info])
962
963 @_RpcTimeout(_TMO_SLOW)
965 """Request wipe at given offset with given size of a block device.
966
967 This is a single-node call.
968
969 """
970 return self._SingleNodeCall(node, "blockdev_wipe",
971 [bdev.ToDict(), offset, size])
972
973 @_RpcTimeout(_TMO_NORMAL)
975 """Request removal of a given block device.
976
977 This is a single-node call.
978
979 """
980 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
981
982 @_RpcTimeout(_TMO_NORMAL)
984 """Request rename of the given block devices.
985
986 This is a single-node call.
987
988 """
989 return self._SingleNodeCall(node, "blockdev_rename",
990 [(d.ToDict(), uid) for d, uid in devlist])
991
992 @_RpcTimeout(_TMO_NORMAL)
994 """Request a pause/resume of given block device.
995
996 This is a single-node call.
997
998 """
999 return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1000 [[bdev.ToDict() for bdev in disks], pause])
1001
1002 @_RpcTimeout(_TMO_NORMAL)
1004 """Request assembling of a given block device.
1005
1006 This is a single-node call.
1007
1008 """
1009 return self._SingleNodeCall(node, "blockdev_assemble",
1010 [disk.ToDict(), owner, on_primary, idx])
1011
1012 @_RpcTimeout(_TMO_NORMAL)
1014 """Request shutdown of a given block device.
1015
1016 This is a single-node call.
1017
1018 """
1019 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1020
1021 @_RpcTimeout(_TMO_NORMAL)
1023 """Request adding a list of children to a (mirroring) device.
1024
1025 This is a single-node call.
1026
1027 """
1028 return self._SingleNodeCall(node, "blockdev_addchildren",
1029 [bdev.ToDict(),
1030 [disk.ToDict() for disk in ndevs]])
1031
1032 @_RpcTimeout(_TMO_NORMAL)
1034 """Request removing a list of children from a (mirroring) device.
1035
1036 This is a single-node call.
1037
1038 """
1039 return self._SingleNodeCall(node, "blockdev_removechildren",
1040 [bdev.ToDict(),
1041 [disk.ToDict() for disk in ndevs]])
1042
1043 @_RpcTimeout(_TMO_NORMAL)
1045 """Request status of a (mirroring) device.
1046
1047 This is a single-node call.
1048
1049 """
1050 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1051 [dsk.ToDict() for dsk in disks])
1052 if not result.fail_msg:
1053 result.payload = [objects.BlockDevStatus.FromDict(i)
1054 for i in result.payload]
1055 return result
1056
1057 @_RpcTimeout(_TMO_NORMAL)
1059 """Request status of (mirroring) devices from multiple nodes.
1060
1061 This is a multi-node call.
1062
1063 """
1064 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1065 [dict((name, [dsk.ToDict() for dsk in disks])
1066 for name, disks in node_disks.items())])
1067 for nres in result.values():
1068 if nres.fail_msg:
1069 continue
1070
1071 for idx, (success, status) in enumerate(nres.payload):
1072 if success:
1073 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1074
1075 return result
1076
1077 @_RpcTimeout(_TMO_NORMAL)
1079 """Request identification of a given block device.
1080
1081 This is a single-node call.
1082
1083 """
1084 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1085 if not result.fail_msg and result.payload is not None:
1086 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1087 return result
1088
1089 @_RpcTimeout(_TMO_NORMAL)
1091 """Closes the given block devices.
1092
1093 This is a single-node call.
1094
1095 """
1096 params = [instance_name, [cf.ToDict() for cf in disks]]
1097 return self._SingleNodeCall(node, "blockdev_close", params)
1098
1099 @_RpcTimeout(_TMO_NORMAL)
1101 """Returns the size of the given disks.
1102
1103 This is a single-node call.
1104
1105 """
1106 params = [[cf.ToDict() for cf in disks]]
1107 return self._SingleNodeCall(node, "blockdev_getsize", params)
1108
1109 @_RpcTimeout(_TMO_NORMAL)
1111 """Disconnects the network of the given drbd devices.
1112
1113 This is a multi-node call.
1114
1115 """
1116 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1117 [nodes_ip, [cf.ToDict() for cf in disks]])
1118
1119 @_RpcTimeout(_TMO_NORMAL)
1122 """Disconnects the given drbd devices.
1123
1124 This is a multi-node call.
1125
1126 """
1127 return self._MultiNodeCall(node_list, "drbd_attach_net",
1128 [nodes_ip, [cf.ToDict() for cf in disks],
1129 instance_name, multimaster])
1130
1131 @_RpcTimeout(_TMO_SLOW)
1133 """Waits for the synchronization of drbd devices is complete.
1134
1135 This is a multi-node call.
1136
1137 """
1138 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1139 [nodes_ip, [cf.ToDict() for cf in disks]])
1140
1141 @_RpcTimeout(_TMO_URGENT)
1143 """Gets drbd helper.
1144
1145 This is a multi-node call.
1146
1147 """
1148 return self._MultiNodeCall(node_list, "drbd_helper", [])
1149
1150 @classmethod
1151 @_RpcTimeout(_TMO_NORMAL)
1153 """Upload a file.
1154
1155 The node will refuse the operation in case the file is not on the
1156 approved file list.
1157
1158 This is a multi-node call.
1159
1160 @type node_list: list
1161 @param node_list: the list of node names to upload to
1162 @type file_name: str
1163 @param file_name: the filename to upload
1164 @type address_list: list or None
1165 @keyword address_list: an optional list of node addresses, in order
1166 to optimize the RPC speed
1167
1168 """
1169 file_contents = utils.ReadFile(file_name)
1170 data = cls._Compress(file_contents)
1171 st = os.stat(file_name)
1172 getents = runtime.GetEnts()
1173 params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1174 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1175 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1176 address_list=address_list)
1177
1178 @classmethod
1179 @_RpcTimeout(_TMO_NORMAL)
1181 """Write ssconf files.
1182
1183 This is a multi-node call.
1184
1185 """
1186 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1187
1188 @_RpcTimeout(_TMO_NORMAL)
1189 - def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1190 """Runs OOB.
1191
1192 This is a single-node call.
1193
1194 """
1195 return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1196 remote_node, timeout])
1197
1198 @_RpcTimeout(_TMO_FAST)
1200 """Request a diagnose of OS definitions.
1201
1202 This is a multi-node call.
1203
1204 """
1205 return self._MultiNodeCall(node_list, "os_diagnose", [])
1206
1207 @_RpcTimeout(_TMO_FAST)
1209 """Returns an OS definition.
1210
1211 This is a single-node call.
1212
1213 """
1214 result = self._SingleNodeCall(node, "os_get", [name])
1215 if not result.fail_msg and isinstance(result.payload, dict):
1216 result.payload = objects.OS.FromDict(result.payload)
1217 return result
1218
1219 @_RpcTimeout(_TMO_FAST)
1221 """Run a validation routine for a given OS.
1222
1223 This is a multi-node call.
1224
1225 """
1226 return self._MultiNodeCall(nodes, "os_validate",
1227 [required, name, checks, params])
1228
1229 @_RpcTimeout(_TMO_NORMAL)
1231 """Call the hooks runner.
1232
1233 Args:
1234 - op: the OpCode instance
1235 - env: a dictionary with the environment
1236
1237 This is a multi-node call.
1238
1239 """
1240 params = [hpath, phase, env]
1241 return self._MultiNodeCall(node_list, "hooks_runner", params)
1242
1243 @_RpcTimeout(_TMO_NORMAL)
1245 """Call an iallocator on a remote node
1246
1247 Args:
1248 - name: the iallocator name
1249 - input: the json-encoded input string
1250
1251 This is a single-node call.
1252
1253 """
1254 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1255
1256 @_RpcTimeout(_TMO_NORMAL)
1258 """Request a snapshot of the given block device.
1259
1260 This is a single-node call.
1261
1262 """
1263 return self._SingleNodeCall(node, "blockdev_grow",
1264 [cf_bdev.ToDict(), amount])
1265
1266 @_RpcTimeout(_TMO_1DAY)
1269 """Export a given disk to another node.
1270
1271 This is a single-node call.
1272
1273 """
1274 return self._SingleNodeCall(node, "blockdev_export",
1275 [cf_bdev.ToDict(), dest_node, dest_path,
1276 cluster_name])
1277
1278 @_RpcTimeout(_TMO_NORMAL)
1280 """Request a snapshot of the given block device.
1281
1282 This is a single-node call.
1283
1284 """
1285 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1286
1287 @_RpcTimeout(_TMO_NORMAL)
1289 """Request the completion of an export operation.
1290
1291 This writes the export config file, etc.
1292
1293 This is a single-node call.
1294
1295 """
1296 flat_disks = []
1297 for disk in snap_disks:
1298 if isinstance(disk, bool):
1299 flat_disks.append(disk)
1300 else:
1301 flat_disks.append(disk.ToDict())
1302
1303 return self._SingleNodeCall(node, "finalize_export",
1304 [self._InstDict(instance), flat_disks])
1305
1306 @_RpcTimeout(_TMO_FAST)
1308 """Queries the export information in a given path.
1309
1310 This is a single-node call.
1311
1312 """
1313 return self._SingleNodeCall(node, "export_info", [path])
1314
1315 @_RpcTimeout(_TMO_FAST)
1317 """Gets the stored exports list.
1318
1319 This is a multi-node call.
1320
1321 """
1322 return self._MultiNodeCall(node_list, "export_list", [])
1323
1324 @_RpcTimeout(_TMO_FAST)
1326 """Requests removal of a given export.
1327
1328 This is a single-node call.
1329
1330 """
1331 return self._SingleNodeCall(node, "export_remove", [export])
1332
1333 @classmethod
1334 @_RpcTimeout(_TMO_NORMAL)
1336 """Requests a node to clean the cluster information it has.
1337
1338 This will remove the configuration information from the ganeti data
1339 dir.
1340
1341 This is a single-node call.
1342
1343 """
1344 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1345 [modify_ssh_setup])
1346
1347 @_RpcTimeout(_TMO_FAST)
1349 """Gets all volumes on node(s).
1350
1351 This is a multi-node call.
1352
1353 """
1354 return self._MultiNodeCall(node_list, "node_volumes", [])
1355
1356 @_RpcTimeout(_TMO_FAST)
1358 """Demote a node from the master candidate role.
1359
1360 This is a single-node call.
1361
1362 """
1363 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1364
1365 @_RpcTimeout(_TMO_NORMAL)
1367 """Tries to powercycle a node.
1368
1369 This is a single-node call.
1370
1371 """
1372 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1373
1374 @_RpcTimeout(None)
1376 """Sleep for a fixed time on given node(s).
1377
1378 This is a multi-node call.
1379
1380 """
1381 return self._MultiNodeCall(node_list, "test_delay", [duration],
1382 read_timeout=int(duration + 5))
1383
1384 @_RpcTimeout(_TMO_FAST)
1386 """Create the given file storage directory.
1387
1388 This is a single-node call.
1389
1390 """
1391 return self._SingleNodeCall(node, "file_storage_dir_create",
1392 [file_storage_dir])
1393
1394 @_RpcTimeout(_TMO_FAST)
1396 """Remove the given file storage directory.
1397
1398 This is a single-node call.
1399
1400 """
1401 return self._SingleNodeCall(node, "file_storage_dir_remove",
1402 [file_storage_dir])
1403
1404 @_RpcTimeout(_TMO_FAST)
1407 """Rename file storage directory.
1408
1409 This is a single-node call.
1410
1411 """
1412 return self._SingleNodeCall(node, "file_storage_dir_rename",
1413 [old_file_storage_dir, new_file_storage_dir])
1414
1415 @classmethod
1416 @_RpcTimeout(_TMO_URGENT)
1418 """Update job queue.
1419
1420 This is a multi-node call.
1421
1422 """
1423 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1424 [file_name, cls._Compress(content)],
1425 address_list=address_list)
1426
1427 @classmethod
1428 @_RpcTimeout(_TMO_NORMAL)
1430 """Purge job queue.
1431
1432 This is a single-node call.
1433
1434 """
1435 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1436
1437 @classmethod
1438 @_RpcTimeout(_TMO_URGENT)
1440 """Rename a job queue file.
1441
1442 This is a multi-node call.
1443
1444 """
1445 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1446 address_list=address_list)
1447
1448 @_RpcTimeout(_TMO_NORMAL)
1450 """Validate the hypervisor params.
1451
1452 This is a multi-node call.
1453
1454 @type node_list: list
1455 @param node_list: the list of nodes to query
1456 @type hvname: string
1457 @param hvname: the hypervisor name
1458 @type hvparams: dict
1459 @param hvparams: the hypervisor parameters to be validated
1460
1461 """
1462 cluster = self._cfg.GetClusterInfo()
1463 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1464 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1465 [hvname, hv_full])
1466
1467 @_RpcTimeout(_TMO_NORMAL)
1469 """Creates a new X509 certificate for SSL/TLS.
1470
1471 This is a single-node call.
1472
1473 @type validity: int
1474 @param validity: Validity in seconds
1475
1476 """
1477 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1478
1479 @_RpcTimeout(_TMO_NORMAL)
1481 """Removes a X509 certificate.
1482
1483 This is a single-node call.
1484
1485 @type name: string
1486 @param name: Certificate name
1487
1488 """
1489 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1490
1491 @_RpcTimeout(_TMO_NORMAL)
1493 """Starts a listener for an import.
1494
1495 This is a single-node call.
1496
1497 @type node: string
1498 @param node: Node name
1499 @type instance: C{objects.Instance}
1500 @param instance: Instance object
1501
1502 """
1503 return self._SingleNodeCall(node, "import_start",
1504 [opts.ToDict(),
1505 self._InstDict(instance), dest,
1506 _EncodeImportExportIO(dest, dest_args)])
1507
1508 @_RpcTimeout(_TMO_NORMAL)
1509 - def call_export_start(self, node, opts, host, port,
1510 instance, source, source_args):
1511 """Starts an export daemon.
1512
1513 This is a single-node call.
1514
1515 @type node: string
1516 @param node: Node name
1517 @type instance: C{objects.Instance}
1518 @param instance: Instance object
1519
1520 """
1521 return self._SingleNodeCall(node, "export_start",
1522 [opts.ToDict(), host, port,
1523 self._InstDict(instance), source,
1524 _EncodeImportExportIO(source, source_args)])
1525
1526 @_RpcTimeout(_TMO_FAST)
1528 """Gets the status of an import or export.
1529
1530 This is a single-node call.
1531
1532 @type node: string
1533 @param node: Node name
1534 @type names: List of strings
1535 @param names: Import/export names
1536 @rtype: List of L{objects.ImportExportStatus} instances
1537 @return: Returns a list of the state of each named import/export or None if
1538 a status couldn't be retrieved
1539
1540 """
1541 result = self._SingleNodeCall(node, "impexp_status", [names])
1542
1543 if not result.fail_msg:
1544 decoded = []
1545
1546 for i in result.payload:
1547 if i is None:
1548 decoded.append(None)
1549 continue
1550 decoded.append(objects.ImportExportStatus.FromDict(i))
1551
1552 result.payload = decoded
1553
1554 return result
1555
1556 @_RpcTimeout(_TMO_NORMAL)
1558 """Aborts an import or export.
1559
1560 This is a single-node call.
1561
1562 @type node: string
1563 @param node: Node name
1564 @type name: string
1565 @param name: Import/export name
1566
1567 """
1568 return self._SingleNodeCall(node, "impexp_abort", [name])
1569
1570 @_RpcTimeout(_TMO_NORMAL)
1572 """Cleans up after an import or export.
1573
1574 This is a single-node call.
1575
1576 @type node: string
1577 @param node: Node name
1578 @type name: string
1579 @param name: Import/export name
1580
1581 """
1582 return self._SingleNodeCall(node, "impexp_cleanup", [name])
1583