1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Inter-node RPC library.
23
24 """
25
26
27
28
29
30
31
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48
49
50 import ganeti.http.client
51
52
53
54 _RPC_CONNECT_TIMEOUT = 5
55
56 _RPC_CLIENT_HEADERS = [
57 "Content-type: %s" % http.HTTP_APP_JSON,
58 "Expect:",
59 ]
60
61
62 _TMO_URGENT = 60
63 _TMO_FAST = 5 * 60
64 _TMO_NORMAL = 15 * 60
65 _TMO_SLOW = 3600
66 _TMO_4HRS = 4 * 3600
67 _TMO_1DAY = 86400
68
69
70
71
72
73
74
75
76 _TIMEOUTS = {
77 }
81 """Initializes the module-global HTTP client manager.
82
83 Must be called before using any RPC function and while exactly one thread is
84 running.
85
86 """
87
88
89
90 assert threading.activeCount() == 1, \
91 "Found more than one active thread when initializing pycURL"
92
93 logging.info("Using PycURL %s", pycurl.version)
94
95 pycurl.global_init(pycurl.GLOBAL_ALL)
96
99 """Stops the module-global HTTP client manager.
100
101 Must be called before quitting the program and while exactly one thread is
102 running.
103
104 """
105 pycurl.global_cleanup()
106
109 noded_cert = str(constants.NODED_CERT_FILE)
110
111 curl.setopt(pycurl.FOLLOWLOCATION, False)
112 curl.setopt(pycurl.CAINFO, noded_cert)
113 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114 curl.setopt(pycurl.SSL_VERIFYPEER, True)
115 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116 curl.setopt(pycurl.SSLCERT, noded_cert)
117 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118 curl.setopt(pycurl.SSLKEY, noded_cert)
119 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120
121
122
123
124 _threading = threading
129 """Returns a per-thread HTTP client pool.
130
131 @rtype: L{http.client.HttpClientPool}
132
133 """
134 try:
135 pool = self.hcp
136 except AttributeError:
137 pool = http.client.HttpClientPool(_ConfigRpcCurl)
138 self.hcp = pool
139
140 return pool
141
142
143
144 del _threading
145
146
147 _thread_local = _RpcThreadLocal()
151 """Timeout decorator.
152
153 When applied to a rpc call_* function, it updates the global timeout
154 table with the given function/timeout.
155
156 """
157 def decorator(f):
158 name = f.__name__
159 assert name.startswith("call_")
160 _TIMEOUTS[name[len("call_"):]] = secs
161 return f
162 return decorator
163
166 """RPC-wrapper decorator.
167
168 When applied to a function, it runs it with the RPC system
169 initialized, and it shutsdown the system afterwards. This means the
170 function must be called without RPC being initialized.
171
172 """
173 def wrapper(*args, **kwargs):
174 Init()
175 try:
176 return fn(*args, **kwargs)
177 finally:
178 Shutdown()
179 return wrapper
180
183 """RPC Result class.
184
185 This class holds an RPC result. It is needed since in multi-node
186 calls we can't raise an exception just because one one out of many
187 failed, and therefore we use this class to encapsulate the result.
188
189 @ivar data: the data payload, for successful results, or None
190 @ivar call: the name of the RPC call
191 @ivar node: the name of the node to which we made the call
192 @ivar offline: whether the operation failed because the node was
193 offline, as opposed to actual failure; offline=True will always
194 imply failed=True, in order to allow simpler checking if
195 the user doesn't care about the exact failure mode
196 @ivar fail_msg: the error message if the call failed
197
198 """
199 - def __init__(self, data=None, failed=False, offline=False,
200 call=None, node=None):
201 self.offline = offline
202 self.call = call
203 self.node = node
204
205 if offline:
206 self.fail_msg = "Node is marked offline"
207 self.data = self.payload = None
208 elif failed:
209 self.fail_msg = self._EnsureErr(data)
210 self.data = self.payload = None
211 else:
212 self.data = data
213 if not isinstance(self.data, (tuple, list)):
214 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215 type(self.data))
216 self.payload = None
217 elif len(data) != 2:
218 self.fail_msg = ("RPC layer error: invalid result length (%d), "
219 "expected 2" % len(self.data))
220 self.payload = None
221 elif not self.data[0]:
222 self.fail_msg = self._EnsureErr(self.data[1])
223 self.payload = None
224 else:
225
226 self.fail_msg = None
227 self.payload = data[1]
228
229 assert hasattr(self, "call")
230 assert hasattr(self, "data")
231 assert hasattr(self, "fail_msg")
232 assert hasattr(self, "node")
233 assert hasattr(self, "offline")
234 assert hasattr(self, "payload")
235
236 @staticmethod
238 """Helper to ensure we return a 'True' value for error."""
239 if val:
240 return val
241 else:
242 return "No error information"
243
244 - def Raise(self, msg, prereq=False, ecode=None):
245 """If the result has failed, raise an OpExecError.
246
247 This is used so that LU code doesn't have to check for each
248 result, but instead can call this function.
249
250 """
251 if not self.fail_msg:
252 return
253
254 if not msg:
255 msg = ("Call '%s' to node '%s' has failed: %s" %
256 (self.call, self.node, self.fail_msg))
257 else:
258 msg = "%s: %s" % (msg, self.fail_msg)
259 if prereq:
260 ec = errors.OpPrereqError
261 else:
262 ec = errors.OpExecError
263 if ecode is not None:
264 args = (msg, ecode)
265 else:
266 args = (msg, )
267 raise ec(*args)
268
273 """Return addresses for given node names.
274
275 @type node_list: list
276 @param node_list: List of node names
277 @type ssc: class
278 @param ssc: SimpleStore class that is used to obtain node->ip mappings
279 @type nslookup_fn: callable
280 @param nslookup_fn: function use to do NS lookup
281 @rtype: list of addresses and/or None's
282 @returns: List of corresponding addresses, if found
283
284 """
285 ss = ssc()
286 iplist = ss.GetNodePrimaryIPList()
287 family = ss.GetPrimaryIPFamily()
288 addresses = []
289 ipmap = dict(entry.split() for entry in iplist)
290 for node in node_list:
291 address = ipmap.get(node)
292 if address is None:
293 address = nslookup_fn(node, family=family)
294 addresses.append(address)
295
296 return addresses
297
300 """RPC Client class.
301
302 This class, given a (remote) method name, a list of parameters and a
303 list of nodes, will contact (in parallel) all nodes, and return a
304 dict of results (key: node name, value: result).
305
306 One current bug is that generic failure is still signaled by
307 'False' result, which is not good. This overloading of values can
308 cause bugs.
309
310 """
312 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
313 " timeouts table")
314 self.procedure = procedure
315 self.body = body
316 self.port = port
317 self._request = {}
318 self._address_lookup_fn = address_lookup_fn
319
320 - def ConnectList(self, node_list, address_list=None, read_timeout=None):
321 """Add a list of nodes to the target nodes.
322
323 @type node_list: list
324 @param node_list: the list of node names to connect
325 @type address_list: list or None
326 @keyword address_list: either None or a list with node addresses,
327 which must have the same length as the node list
328 @type read_timeout: int
329 @param read_timeout: overwrites default timeout for operation
330
331 """
332 if address_list is None:
333
334 address_list = self._address_lookup_fn(node_list)
335
336 assert len(node_list) == len(address_list), \
337 "Name and address lists must have the same length"
338
339 for node, address in zip(node_list, address_list):
340 self.ConnectNode(node, address, read_timeout=read_timeout)
341
342 - def ConnectNode(self, name, address=None, read_timeout=None):
343 """Add a node to the target list.
344
345 @type name: str
346 @param name: the node name
347 @type address: str
348 @param address: the node address, if known
349 @type read_timeout: int
350 @param read_timeout: overwrites default timeout for operation
351
352 """
353 if address is None:
354
355 address = self._address_lookup_fn([name])[0]
356
357 assert(address is not None)
358
359 if read_timeout is None:
360 read_timeout = _TIMEOUTS[self.procedure]
361
362 self._request[name] = \
363 http.client.HttpClientRequest(str(address), self.port,
364 http.HTTP_PUT, str("/%s" % self.procedure),
365 headers=_RPC_CLIENT_HEADERS,
366 post_data=str(self.body),
367 read_timeout=read_timeout)
368
370 """Call nodes and return results.
371
372 @rtype: list
373 @return: List of RPC results
374
375 """
376 if not http_pool:
377 http_pool = _thread_local.GetHttpClientPool()
378
379 http_pool.ProcessRequests(self._request.values())
380
381 results = {}
382
383 for name, req in self._request.iteritems():
384 if req.success and req.resp_status_code == http.HTTP_OK:
385 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
386 node=name, call=self.procedure)
387 continue
388
389
390 if req.error:
391 msg = req.error
392 else:
393 msg = req.resp_body
394
395 logging.error("RPC error in %s from node %s: %s",
396 self.procedure, name, msg)
397 results[name] = RpcResult(data=msg, failed=True, node=name,
398 call=self.procedure)
399
400 return results
401
404 """Encodes import/export I/O information.
405
406 """
407 if ieio == constants.IEIO_RAW_DISK:
408 assert len(ieioargs) == 1
409 return (ieioargs[0].ToDict(), )
410
411 if ieio == constants.IEIO_SCRIPT:
412 assert len(ieioargs) == 2
413 return (ieioargs[0].ToDict(), ieioargs[1])
414
415 return ieioargs
416
419 """RPC runner class"""
420
422 """Initialized the rpc runner.
423
424 @type cfg: C{config.ConfigWriter}
425 @param cfg: the configuration object that will be used to get data
426 about the cluster
427
428 """
429 self._cfg = cfg
430 self.port = netutils.GetDaemonPort(constants.NODED)
431
432 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
433 """Convert the given instance to a dict.
434
435 This is done via the instance's ToDict() method and additionally
436 we fill the hvparams with the cluster defaults.
437
438 @type instance: L{objects.Instance}
439 @param instance: an Instance object
440 @type hvp: dict or None
441 @param hvp: a dictionary with overridden hypervisor parameters
442 @type bep: dict or None
443 @param bep: a dictionary with overridden backend parameters
444 @type osp: dict or None
445 @param osp: a dictionary with overridden os parameters
446 @rtype: dict
447 @return: the instance dict, with the hvparams filled with the
448 cluster defaults
449
450 """
451 idict = instance.ToDict()
452 cluster = self._cfg.GetClusterInfo()
453 idict["hvparams"] = cluster.FillHV(instance)
454 if hvp is not None:
455 idict["hvparams"].update(hvp)
456 idict["beparams"] = cluster.FillBE(instance)
457 if bep is not None:
458 idict["beparams"].update(bep)
459 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
460 if osp is not None:
461 idict["osparams"].update(osp)
462 for nic in idict["nics"]:
463 nic['nicparams'] = objects.FillDict(
464 cluster.nicparams[constants.PP_DEFAULT],
465 nic['nicparams'])
466 return idict
467
468 - def _ConnectList(self, client, node_list, call, read_timeout=None):
469 """Helper for computing node addresses.
470
471 @type client: L{ganeti.rpc.Client}
472 @param client: a C{Client} instance
473 @type node_list: list
474 @param node_list: the node list we should connect
475 @type call: string
476 @param call: the name of the remote procedure call, for filling in
477 correctly any eventual offline nodes' results
478 @type read_timeout: int
479 @param read_timeout: overwrites the default read timeout for the
480 given operation
481
482 """
483 all_nodes = self._cfg.GetAllNodesInfo()
484 name_list = []
485 addr_list = []
486 skip_dict = {}
487 for node in node_list:
488 if node in all_nodes:
489 if all_nodes[node].offline:
490 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
491 continue
492 val = all_nodes[node].primary_ip
493 else:
494 val = None
495 addr_list.append(val)
496 name_list.append(node)
497 if name_list:
498 client.ConnectList(name_list, address_list=addr_list,
499 read_timeout=read_timeout)
500 return skip_dict
501
502 - def _ConnectNode(self, client, node, call, read_timeout=None):
503 """Helper for computing one node's address.
504
505 @type client: L{ganeti.rpc.Client}
506 @param client: a C{Client} instance
507 @type node: str
508 @param node: the node we should connect
509 @type call: string
510 @param call: the name of the remote procedure call, for filling in
511 correctly any eventual offline nodes' results
512 @type read_timeout: int
513 @param read_timeout: overwrites the default read timeout for the
514 given operation
515
516 """
517 node_info = self._cfg.GetNodeInfo(node)
518 if node_info is not None:
519 if node_info.offline:
520 return RpcResult(node=node, offline=True, call=call)
521 addr = node_info.primary_ip
522 else:
523 addr = None
524 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
525
526 - def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
527 """Helper for making a multi-node call
528
529 """
530 body = serializer.DumpJson(args, indent=False)
531 c = Client(procedure, body, self.port)
532 skip_dict = self._ConnectList(c, node_list, procedure,
533 read_timeout=read_timeout)
534 skip_dict.update(c.GetResults())
535 return skip_dict
536
537 @classmethod
538 - def _StaticMultiNodeCall(cls, node_list, procedure, args,
539 address_list=None, read_timeout=None):
548
550 """Helper for making a single-node call
551
552 """
553 body = serializer.DumpJson(args, indent=False)
554 c = Client(procedure, body, self.port)
555 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
556 if result is None:
557
558 result = c.GetResults()[node]
559 return result
560
561 @classmethod
570
571 @staticmethod
573 """Compresses a string for transport over RPC.
574
575 Small amounts of data are not compressed.
576
577 @type data: str
578 @param data: Data
579 @rtype: tuple
580 @return: Encoded data to send
581
582 """
583
584 if len(data) < 512:
585 return (constants.RPC_ENCODING_NONE, data)
586
587
588 return (constants.RPC_ENCODING_ZLIB_BASE64,
589 base64.b64encode(zlib.compress(data, 3)))
590
591
592
593
594
595 @_RpcTimeout(_TMO_URGENT)
597 """Gets the logical volumes present in a given volume group.
598
599 This is a multi-node call.
600
601 """
602 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
603
604 @_RpcTimeout(_TMO_URGENT)
606 """Gets the volume group list.
607
608 This is a multi-node call.
609
610 """
611 return self._MultiNodeCall(node_list, "vg_list", [])
612
613 @_RpcTimeout(_TMO_NORMAL)
615 """Get list of storage units.
616
617 This is a multi-node call.
618
619 """
620 return self._MultiNodeCall(node_list, "storage_list",
621 [su_name, su_args, name, fields])
622
623 @_RpcTimeout(_TMO_NORMAL)
625 """Modify a storage unit.
626
627 This is a single-node call.
628
629 """
630 return self._SingleNodeCall(node, "storage_modify",
631 [su_name, su_args, name, changes])
632
633 @_RpcTimeout(_TMO_NORMAL)
635 """Executes an operation on a storage unit.
636
637 This is a single-node call.
638
639 """
640 return self._SingleNodeCall(node, "storage_execute",
641 [su_name, su_args, name, op])
642
643 @_RpcTimeout(_TMO_URGENT)
645 """Checks if a node has all the bridges given.
646
647 This method checks if all bridges given in the bridges_list are
648 present on the remote node, so that an instance that uses interfaces
649 on those bridges can be started.
650
651 This is a single-node call.
652
653 """
654 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
655
656 @_RpcTimeout(_TMO_NORMAL)
658 """Starts an instance.
659
660 This is a single-node call.
661
662 """
663 idict = self._InstDict(instance, hvp=hvp, bep=bep)
664 return self._SingleNodeCall(node, "instance_start", [idict])
665
666 @_RpcTimeout(_TMO_NORMAL)
668 """Stops an instance.
669
670 This is a single-node call.
671
672 """
673 return self._SingleNodeCall(node, "instance_shutdown",
674 [self._InstDict(instance), timeout])
675
676 @_RpcTimeout(_TMO_NORMAL)
678 """Gather the information necessary to prepare an instance migration.
679
680 This is a single-node call.
681
682 @type node: string
683 @param node: the node on which the instance is currently running
684 @type instance: C{objects.Instance}
685 @param instance: the instance definition
686
687 """
688 return self._SingleNodeCall(node, "migration_info",
689 [self._InstDict(instance)])
690
691 @_RpcTimeout(_TMO_NORMAL)
693 """Prepare a node to accept an instance.
694
695 This is a single-node call.
696
697 @type node: string
698 @param node: the target node for the migration
699 @type instance: C{objects.Instance}
700 @param instance: the instance definition
701 @type info: opaque/hypervisor specific (string/data)
702 @param info: result for the call_migration_info call
703 @type target: string
704 @param target: target hostname (usually ip address) (on the node itself)
705
706 """
707 return self._SingleNodeCall(node, "accept_instance",
708 [self._InstDict(instance), info, target])
709
710 @_RpcTimeout(_TMO_NORMAL)
712 """Finalize any target-node migration specific operation.
713
714 This is called both in case of a successful migration and in case of error
715 (in which case it should abort the migration).
716
717 This is a single-node call.
718
719 @type node: string
720 @param node: the target node for the migration
721 @type instance: C{objects.Instance}
722 @param instance: the instance definition
723 @type info: opaque/hypervisor specific (string/data)
724 @param info: result for the call_migration_info call
725 @type success: boolean
726 @param success: whether the migration was a success or a failure
727
728 """
729 return self._SingleNodeCall(node, "finalize_migration",
730 [self._InstDict(instance), info, success])
731
732 @_RpcTimeout(_TMO_SLOW)
734 """Migrate an instance.
735
736 This is a single-node call.
737
738 @type node: string
739 @param node: the node on which the instance is currently running
740 @type instance: C{objects.Instance}
741 @param instance: the instance definition
742 @type target: string
743 @param target: the target node name
744 @type live: boolean
745 @param live: whether the migration should be done live or not (the
746 interpretation of this parameter is left to the hypervisor)
747
748 """
749 return self._SingleNodeCall(node, "instance_migrate",
750 [self._InstDict(instance), target, live])
751
752 @_RpcTimeout(_TMO_NORMAL)
754 """Reboots an instance.
755
756 This is a single-node call.
757
758 """
759 return self._SingleNodeCall(node, "instance_reboot",
760 [self._InstDict(inst), reboot_type,
761 shutdown_timeout])
762
763 @_RpcTimeout(_TMO_1DAY)
765 """Installs an OS on the given instance.
766
767 This is a single-node call.
768
769 """
770 return self._SingleNodeCall(node, "instance_os_add",
771 [self._InstDict(inst, osp=osparams),
772 reinstall, debug])
773
774 @_RpcTimeout(_TMO_SLOW)
776 """Run the OS rename script for an instance.
777
778 This is a single-node call.
779
780 """
781 return self._SingleNodeCall(node, "instance_run_rename",
782 [self._InstDict(inst), old_name, debug])
783
784 @_RpcTimeout(_TMO_URGENT)
786 """Returns information about a single instance.
787
788 This is a single-node call.
789
790 @type node: list
791 @param node: the list of nodes to query
792 @type instance: string
793 @param instance: the instance name
794 @type hname: string
795 @param hname: the hypervisor type of the instance
796
797 """
798 return self._SingleNodeCall(node, "instance_info", [instance, hname])
799
800 @_RpcTimeout(_TMO_NORMAL)
802 """Checks whether the given instance can be migrated.
803
804 This is a single-node call.
805
806 @param node: the node to query
807 @type instance: L{objects.Instance}
808 @param instance: the instance to check
809
810
811 """
812 return self._SingleNodeCall(node, "instance_migratable",
813 [self._InstDict(instance)])
814
815 @_RpcTimeout(_TMO_URGENT)
817 """Returns information about all instances on the given nodes.
818
819 This is a multi-node call.
820
821 @type node_list: list
822 @param node_list: the list of nodes to query
823 @type hypervisor_list: list
824 @param hypervisor_list: the hypervisors to query for instances
825
826 """
827 return self._MultiNodeCall(node_list, "all_instances_info",
828 [hypervisor_list])
829
830 @_RpcTimeout(_TMO_URGENT)
832 """Returns the list of running instances on a given node.
833
834 This is a multi-node call.
835
836 @type node_list: list
837 @param node_list: the list of nodes to query
838 @type hypervisor_list: list
839 @param hypervisor_list: the hypervisors to query for instances
840
841 """
842 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
843
844 @_RpcTimeout(_TMO_FAST)
847 """Do a TcpPing on the remote node
848
849 This is a single-node call.
850
851 """
852 return self._SingleNodeCall(node, "node_tcp_ping",
853 [source, target, port, timeout,
854 live_port_needed])
855
856 @_RpcTimeout(_TMO_FAST)
858 """Checks if a node has the given IP address.
859
860 This is a single-node call.
861
862 """
863 return self._SingleNodeCall(node, "node_has_ip_address", [address])
864
865 @_RpcTimeout(_TMO_URGENT)
867 """Return node information.
868
869 This will return memory information and volume group size and free
870 space.
871
872 This is a multi-node call.
873
874 @type node_list: list
875 @param node_list: the list of nodes to query
876 @type vg_name: C{string}
877 @param vg_name: the name of the volume group to ask for disk space
878 information
879 @type hypervisor_type: C{str}
880 @param hypervisor_type: the name of the hypervisor to ask for
881 memory information
882
883 """
884 return self._MultiNodeCall(node_list, "node_info",
885 [vg_name, hypervisor_type])
886
887 @_RpcTimeout(_TMO_NORMAL)
889 """Modify hosts file with name
890
891 @type node: string
892 @param node: The node to call
893 @type mode: string
894 @param mode: The mode to operate. Currently "add" or "remove"
895 @type name: string
896 @param name: The host name to be modified
897 @type ip: string
898 @param ip: The ip of the entry (just valid if mode is "add")
899
900 """
901 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
902
903 @_RpcTimeout(_TMO_NORMAL)
905 """Request verification of given parameters.
906
907 This is a multi-node call.
908
909 """
910 return self._MultiNodeCall(node_list, "node_verify",
911 [checkdict, cluster_name])
912
913 @classmethod
914 @_RpcTimeout(_TMO_FAST)
916 """Tells a node to activate itself as a master.
917
918 This is a single-node call.
919
920 """
921 return cls._StaticSingleNodeCall(node, "node_start_master",
922 [start_daemons, no_voting])
923
924 @classmethod
925 @_RpcTimeout(_TMO_FAST)
927 """Tells a node to demote itself from master status.
928
929 This is a single-node call.
930
931 """
932 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
933
934 @classmethod
935 @_RpcTimeout(_TMO_URGENT)
937 """Query master info.
938
939 This is a multi-node call.
940
941 """
942
943 return cls._StaticMultiNodeCall(node_list, "master_info", [])
944
945 @classmethod
946 @_RpcTimeout(_TMO_URGENT)
948 """Query node version.
949
950 This is a multi-node call.
951
952 """
953 return cls._StaticMultiNodeCall(node_list, "version", [])
954
955 @_RpcTimeout(_TMO_NORMAL)
957 """Request creation of a given block device.
958
959 This is a single-node call.
960
961 """
962 return self._SingleNodeCall(node, "blockdev_create",
963 [bdev.ToDict(), size, owner, on_primary, info])
964
965 @_RpcTimeout(_TMO_SLOW)
967 """Request wipe at given offset with given size of a block device.
968
969 This is a single-node call.
970
971 """
972 return self._SingleNodeCall(node, "blockdev_wipe",
973 [bdev.ToDict(), offset, size])
974
975 @_RpcTimeout(_TMO_NORMAL)
977 """Request removal of a given block device.
978
979 This is a single-node call.
980
981 """
982 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
983
984 @_RpcTimeout(_TMO_NORMAL)
986 """Request rename of the given block devices.
987
988 This is a single-node call.
989
990 """
991 return self._SingleNodeCall(node, "blockdev_rename",
992 [(d.ToDict(), uid) for d, uid in devlist])
993
994 @_RpcTimeout(_TMO_NORMAL)
996 """Request assembling of a given block device.
997
998 This is a single-node call.
999
1000 """
1001 return self._SingleNodeCall(node, "blockdev_assemble",
1002 [disk.ToDict(), owner, on_primary])
1003
1004 @_RpcTimeout(_TMO_NORMAL)
1006 """Request shutdown of a given block device.
1007
1008 This is a single-node call.
1009
1010 """
1011 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1012
1013 @_RpcTimeout(_TMO_NORMAL)
1015 """Request adding a list of children to a (mirroring) device.
1016
1017 This is a single-node call.
1018
1019 """
1020 return self._SingleNodeCall(node, "blockdev_addchildren",
1021 [bdev.ToDict(),
1022 [disk.ToDict() for disk in ndevs]])
1023
1024 @_RpcTimeout(_TMO_NORMAL)
1026 """Request removing a list of children from a (mirroring) device.
1027
1028 This is a single-node call.
1029
1030 """
1031 return self._SingleNodeCall(node, "blockdev_removechildren",
1032 [bdev.ToDict(),
1033 [disk.ToDict() for disk in ndevs]])
1034
1035 @_RpcTimeout(_TMO_NORMAL)
1037 """Request status of a (mirroring) device.
1038
1039 This is a single-node call.
1040
1041 """
1042 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1043 [dsk.ToDict() for dsk in disks])
1044 if not result.fail_msg:
1045 result.payload = [objects.BlockDevStatus.FromDict(i)
1046 for i in result.payload]
1047 return result
1048
1049 @_RpcTimeout(_TMO_NORMAL)
1051 """Request status of (mirroring) devices from multiple nodes.
1052
1053 This is a multi-node call.
1054
1055 """
1056 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1057 [dict((name, [dsk.ToDict() for dsk in disks])
1058 for name, disks in node_disks.items())])
1059 for nres in result.values():
1060 if nres.fail_msg:
1061 continue
1062
1063 for idx, (success, status) in enumerate(nres.payload):
1064 if success:
1065 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1066
1067 return result
1068
1069 @_RpcTimeout(_TMO_NORMAL)
1071 """Request identification of a given block device.
1072
1073 This is a single-node call.
1074
1075 """
1076 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1077 if not result.fail_msg and result.payload is not None:
1078 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1079 return result
1080
1081 @_RpcTimeout(_TMO_NORMAL)
1083 """Closes the given block devices.
1084
1085 This is a single-node call.
1086
1087 """
1088 params = [instance_name, [cf.ToDict() for cf in disks]]
1089 return self._SingleNodeCall(node, "blockdev_close", params)
1090
1091 @_RpcTimeout(_TMO_NORMAL)
1093 """Returns the size of the given disks.
1094
1095 This is a single-node call.
1096
1097 """
1098 params = [[cf.ToDict() for cf in disks]]
1099 return self._SingleNodeCall(node, "blockdev_getsize", params)
1100
1101 @_RpcTimeout(_TMO_NORMAL)
1103 """Disconnects the network of the given drbd devices.
1104
1105 This is a multi-node call.
1106
1107 """
1108 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1109 [nodes_ip, [cf.ToDict() for cf in disks]])
1110
1111 @_RpcTimeout(_TMO_NORMAL)
1114 """Disconnects the given drbd devices.
1115
1116 This is a multi-node call.
1117
1118 """
1119 return self._MultiNodeCall(node_list, "drbd_attach_net",
1120 [nodes_ip, [cf.ToDict() for cf in disks],
1121 instance_name, multimaster])
1122
1123 @_RpcTimeout(_TMO_SLOW)
1125 """Waits for the synchronization of drbd devices is complete.
1126
1127 This is a multi-node call.
1128
1129 """
1130 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1131 [nodes_ip, [cf.ToDict() for cf in disks]])
1132
1133 @_RpcTimeout(_TMO_URGENT)
1135 """Gets drbd helper.
1136
1137 This is a multi-node call.
1138
1139 """
1140 return self._MultiNodeCall(node_list, "drbd_helper", [])
1141
1142 @classmethod
1143 @_RpcTimeout(_TMO_NORMAL)
1145 """Upload a file.
1146
1147 The node will refuse the operation in case the file is not on the
1148 approved file list.
1149
1150 This is a multi-node call.
1151
1152 @type node_list: list
1153 @param node_list: the list of node names to upload to
1154 @type file_name: str
1155 @param file_name: the filename to upload
1156 @type address_list: list or None
1157 @keyword address_list: an optional list of node addresses, in order
1158 to optimize the RPC speed
1159
1160 """
1161 file_contents = utils.ReadFile(file_name)
1162 data = cls._Compress(file_contents)
1163 st = os.stat(file_name)
1164 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1165 st.st_atime, st.st_mtime]
1166 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1167 address_list=address_list)
1168
1169 @classmethod
1170 @_RpcTimeout(_TMO_NORMAL)
1172 """Write ssconf files.
1173
1174 This is a multi-node call.
1175
1176 """
1177 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1178
1179 @_RpcTimeout(_TMO_FAST)
1181 """Request a diagnose of OS definitions.
1182
1183 This is a multi-node call.
1184
1185 """
1186 return self._MultiNodeCall(node_list, "os_diagnose", [])
1187
1188 @_RpcTimeout(_TMO_FAST)
1190 """Returns an OS definition.
1191
1192 This is a single-node call.
1193
1194 """
1195 result = self._SingleNodeCall(node, "os_get", [name])
1196 if not result.fail_msg and isinstance(result.payload, dict):
1197 result.payload = objects.OS.FromDict(result.payload)
1198 return result
1199
1200 @_RpcTimeout(_TMO_FAST)
1202 """Run a validation routine for a given OS.
1203
1204 This is a multi-node call.
1205
1206 """
1207 return self._MultiNodeCall(nodes, "os_validate",
1208 [required, name, checks, params])
1209
1210 @_RpcTimeout(_TMO_NORMAL)
1212 """Call the hooks runner.
1213
1214 Args:
1215 - op: the OpCode instance
1216 - env: a dictionary with the environment
1217
1218 This is a multi-node call.
1219
1220 """
1221 params = [hpath, phase, env]
1222 return self._MultiNodeCall(node_list, "hooks_runner", params)
1223
1224 @_RpcTimeout(_TMO_NORMAL)
1226 """Call an iallocator on a remote node
1227
1228 Args:
1229 - name: the iallocator name
1230 - input: the json-encoded input string
1231
1232 This is a single-node call.
1233
1234 """
1235 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1236
1237 @_RpcTimeout(_TMO_NORMAL)
1239 """Request a snapshot of the given block device.
1240
1241 This is a single-node call.
1242
1243 """
1244 return self._SingleNodeCall(node, "blockdev_grow",
1245 [cf_bdev.ToDict(), amount])
1246
1247 @_RpcTimeout(_TMO_1DAY)
1250 """Export a given disk to another node.
1251
1252 This is a single-node call.
1253
1254 """
1255 return self._SingleNodeCall(node, "blockdev_export",
1256 [cf_bdev.ToDict(), dest_node, dest_path,
1257 cluster_name])
1258
1259 @_RpcTimeout(_TMO_NORMAL)
1261 """Request a snapshot of the given block device.
1262
1263 This is a single-node call.
1264
1265 """
1266 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1267
1268 @_RpcTimeout(_TMO_NORMAL)
1270 """Request the completion of an export operation.
1271
1272 This writes the export config file, etc.
1273
1274 This is a single-node call.
1275
1276 """
1277 flat_disks = []
1278 for disk in snap_disks:
1279 if isinstance(disk, bool):
1280 flat_disks.append(disk)
1281 else:
1282 flat_disks.append(disk.ToDict())
1283
1284 return self._SingleNodeCall(node, "finalize_export",
1285 [self._InstDict(instance), flat_disks])
1286
1287 @_RpcTimeout(_TMO_FAST)
1289 """Queries the export information in a given path.
1290
1291 This is a single-node call.
1292
1293 """
1294 return self._SingleNodeCall(node, "export_info", [path])
1295
1296 @_RpcTimeout(_TMO_FAST)
1298 """Gets the stored exports list.
1299
1300 This is a multi-node call.
1301
1302 """
1303 return self._MultiNodeCall(node_list, "export_list", [])
1304
1305 @_RpcTimeout(_TMO_FAST)
1307 """Requests removal of a given export.
1308
1309 This is a single-node call.
1310
1311 """
1312 return self._SingleNodeCall(node, "export_remove", [export])
1313
1314 @classmethod
1315 @_RpcTimeout(_TMO_NORMAL)
1317 """Requests a node to clean the cluster information it has.
1318
1319 This will remove the configuration information from the ganeti data
1320 dir.
1321
1322 This is a single-node call.
1323
1324 """
1325 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1326 [modify_ssh_setup])
1327
1328 @_RpcTimeout(_TMO_FAST)
1330 """Gets all volumes on node(s).
1331
1332 This is a multi-node call.
1333
1334 """
1335 return self._MultiNodeCall(node_list, "node_volumes", [])
1336
1337 @_RpcTimeout(_TMO_FAST)
1339 """Demote a node from the master candidate role.
1340
1341 This is a single-node call.
1342
1343 """
1344 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1345
1346 @_RpcTimeout(_TMO_NORMAL)
1348 """Tries to powercycle a node.
1349
1350 This is a single-node call.
1351
1352 """
1353 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1354
1355 @_RpcTimeout(None)
1357 """Sleep for a fixed time on given node(s).
1358
1359 This is a multi-node call.
1360
1361 """
1362 return self._MultiNodeCall(node_list, "test_delay", [duration],
1363 read_timeout=int(duration + 5))
1364
1365 @_RpcTimeout(_TMO_FAST)
1367 """Create the given file storage directory.
1368
1369 This is a single-node call.
1370
1371 """
1372 return self._SingleNodeCall(node, "file_storage_dir_create",
1373 [file_storage_dir])
1374
1375 @_RpcTimeout(_TMO_FAST)
1377 """Remove the given file storage directory.
1378
1379 This is a single-node call.
1380
1381 """
1382 return self._SingleNodeCall(node, "file_storage_dir_remove",
1383 [file_storage_dir])
1384
1385 @_RpcTimeout(_TMO_FAST)
1388 """Rename file storage directory.
1389
1390 This is a single-node call.
1391
1392 """
1393 return self._SingleNodeCall(node, "file_storage_dir_rename",
1394 [old_file_storage_dir, new_file_storage_dir])
1395
1396 @classmethod
1397 @_RpcTimeout(_TMO_FAST)
1399 """Update job queue.
1400
1401 This is a multi-node call.
1402
1403 """
1404 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1405 [file_name, cls._Compress(content)],
1406 address_list=address_list)
1407
1408 @classmethod
1409 @_RpcTimeout(_TMO_NORMAL)
1411 """Purge job queue.
1412
1413 This is a single-node call.
1414
1415 """
1416 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1417
1418 @classmethod
1419 @_RpcTimeout(_TMO_FAST)
1421 """Rename a job queue file.
1422
1423 This is a multi-node call.
1424
1425 """
1426 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1427 address_list=address_list)
1428
1429 @_RpcTimeout(_TMO_NORMAL)
1431 """Validate the hypervisor params.
1432
1433 This is a multi-node call.
1434
1435 @type node_list: list
1436 @param node_list: the list of nodes to query
1437 @type hvname: string
1438 @param hvname: the hypervisor name
1439 @type hvparams: dict
1440 @param hvparams: the hypervisor parameters to be validated
1441
1442 """
1443 cluster = self._cfg.GetClusterInfo()
1444 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1445 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1446 [hvname, hv_full])
1447
1448 @_RpcTimeout(_TMO_NORMAL)
1450 """Creates a new X509 certificate for SSL/TLS.
1451
1452 This is a single-node call.
1453
1454 @type validity: int
1455 @param validity: Validity in seconds
1456
1457 """
1458 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1459
1460 @_RpcTimeout(_TMO_NORMAL)
1462 """Removes a X509 certificate.
1463
1464 This is a single-node call.
1465
1466 @type name: string
1467 @param name: Certificate name
1468
1469 """
1470 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1471
1472 @_RpcTimeout(_TMO_NORMAL)
1474 """Starts a listener for an import.
1475
1476 This is a single-node call.
1477
1478 @type node: string
1479 @param node: Node name
1480 @type instance: C{objects.Instance}
1481 @param instance: Instance object
1482
1483 """
1484 return self._SingleNodeCall(node, "import_start",
1485 [opts.ToDict(),
1486 self._InstDict(instance), dest,
1487 _EncodeImportExportIO(dest, dest_args)])
1488
1489 @_RpcTimeout(_TMO_NORMAL)
1490 - def call_export_start(self, node, opts, host, port,
1491 instance, source, source_args):
1492 """Starts an export daemon.
1493
1494 This is a single-node call.
1495
1496 @type node: string
1497 @param node: Node name
1498 @type instance: C{objects.Instance}
1499 @param instance: Instance object
1500
1501 """
1502 return self._SingleNodeCall(node, "export_start",
1503 [opts.ToDict(), host, port,
1504 self._InstDict(instance), source,
1505 _EncodeImportExportIO(source, source_args)])
1506
1507 @_RpcTimeout(_TMO_FAST)
1509 """Gets the status of an import or export.
1510
1511 This is a single-node call.
1512
1513 @type node: string
1514 @param node: Node name
1515 @type names: List of strings
1516 @param names: Import/export names
1517 @rtype: List of L{objects.ImportExportStatus} instances
1518 @return: Returns a list of the state of each named import/export or None if
1519 a status couldn't be retrieved
1520
1521 """
1522 result = self._SingleNodeCall(node, "impexp_status", [names])
1523
1524 if not result.fail_msg:
1525 decoded = []
1526
1527 for i in result.payload:
1528 if i is None:
1529 decoded.append(None)
1530 continue
1531 decoded.append(objects.ImportExportStatus.FromDict(i))
1532
1533 result.payload = decoded
1534
1535 return result
1536
1537 @_RpcTimeout(_TMO_NORMAL)
1539 """Aborts an import or export.
1540
1541 This is a single-node call.
1542
1543 @type node: string
1544 @param node: Node name
1545 @type name: string
1546 @param name: Import/export name
1547
1548 """
1549 return self._SingleNodeCall(node, "impexp_abort", [name])
1550
1551 @_RpcTimeout(_TMO_NORMAL)
1553 """Cleans up after an import or export.
1554
1555 This is a single-node call.
1556
1557 @type node: string
1558 @param node: Node name
1559 @type name: string
1560 @param name: Import/export name
1561
1562 """
1563 return self._SingleNodeCall(node, "impexp_cleanup", [name])
1564