1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Inter-node RPC library.
23
24 """
25
26
27
28
29
30
31
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47
48
49 import ganeti.http.client
50
51
52
53 _RPC_CONNECT_TIMEOUT = 5
54
55 _RPC_CLIENT_HEADERS = [
56 "Content-type: %s" % http.HTTP_APP_JSON,
57 "Expect:",
58 ]
59
60
61 _TMO_URGENT = 60
62 _TMO_FAST = 5 * 60
63 _TMO_NORMAL = 15 * 60
64 _TMO_SLOW = 3600
65 _TMO_4HRS = 4 * 3600
66 _TMO_1DAY = 86400
67
68
69
70
71
72
73
74
75 _TIMEOUTS = {
76 }
80 """Initializes the module-global HTTP client manager.
81
82 Must be called before using any RPC function and while exactly one thread is
83 running.
84
85 """
86
87
88
89 assert threading.activeCount() == 1, \
90 "Found more than one active thread when initializing pycURL"
91
92 logging.info("Using PycURL %s", pycurl.version)
93
94 pycurl.global_init(pycurl.GLOBAL_ALL)
95
98 """Stops the module-global HTTP client manager.
99
100 Must be called before quitting the program and while exactly one thread is
101 running.
102
103 """
104 pycurl.global_cleanup()
105
108 noded_cert = str(constants.NODED_CERT_FILE)
109
110 curl.setopt(pycurl.FOLLOWLOCATION, False)
111 curl.setopt(pycurl.CAINFO, noded_cert)
112 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113 curl.setopt(pycurl.SSL_VERIFYPEER, True)
114 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115 curl.setopt(pycurl.SSLCERT, noded_cert)
116 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117 curl.setopt(pycurl.SSLKEY, noded_cert)
118 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
119
123 """Returns a per-thread HTTP client pool.
124
125 @rtype: L{http.client.HttpClientPool}
126
127 """
128 try:
129 pool = self.hcp
130 except AttributeError:
131 pool = http.client.HttpClientPool(_ConfigRpcCurl)
132 self.hcp = pool
133
134 return pool
135
136
137 _thread_local = _RpcThreadLocal()
141 """Timeout decorator.
142
143 When applied to a rpc call_* function, it updates the global timeout
144 table with the given function/timeout.
145
146 """
147 def decorator(f):
148 name = f.__name__
149 assert name.startswith("call_")
150 _TIMEOUTS[name[len("call_"):]] = secs
151 return f
152 return decorator
153
156 """RPC-wrapper decorator.
157
158 When applied to a function, it runs it with the RPC system
159 initialized, and it shutsdown the system afterwards. This means the
160 function must be called without RPC being initialized.
161
162 """
163 def wrapper(*args, **kwargs):
164 Init()
165 try:
166 return fn(*args, **kwargs)
167 finally:
168 Shutdown()
169 return wrapper
170
173 """RPC Result class.
174
175 This class holds an RPC result. It is needed since in multi-node
176 calls we can't raise an exception just because one one out of many
177 failed, and therefore we use this class to encapsulate the result.
178
179 @ivar data: the data payload, for successful results, or None
180 @ivar call: the name of the RPC call
181 @ivar node: the name of the node to which we made the call
182 @ivar offline: whether the operation failed because the node was
183 offline, as opposed to actual failure; offline=True will always
184 imply failed=True, in order to allow simpler checking if
185 the user doesn't care about the exact failure mode
186 @ivar fail_msg: the error message if the call failed
187
188 """
189 - def __init__(self, data=None, failed=False, offline=False,
190 call=None, node=None):
191 self.offline = offline
192 self.call = call
193 self.node = node
194
195 if offline:
196 self.fail_msg = "Node is marked offline"
197 self.data = self.payload = None
198 elif failed:
199 self.fail_msg = self._EnsureErr(data)
200 self.data = self.payload = None
201 else:
202 self.data = data
203 if not isinstance(self.data, (tuple, list)):
204 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
205 type(self.data))
206 self.payload = None
207 elif len(data) != 2:
208 self.fail_msg = ("RPC layer error: invalid result length (%d), "
209 "expected 2" % len(self.data))
210 self.payload = None
211 elif not self.data[0]:
212 self.fail_msg = self._EnsureErr(self.data[1])
213 self.payload = None
214 else:
215
216 self.fail_msg = None
217 self.payload = data[1]
218
219 assert hasattr(self, "call")
220 assert hasattr(self, "data")
221 assert hasattr(self, "fail_msg")
222 assert hasattr(self, "node")
223 assert hasattr(self, "offline")
224 assert hasattr(self, "payload")
225
226 @staticmethod
228 """Helper to ensure we return a 'True' value for error."""
229 if val:
230 return val
231 else:
232 return "No error information"
233
234 - def Raise(self, msg, prereq=False, ecode=None):
235 """If the result has failed, raise an OpExecError.
236
237 This is used so that LU code doesn't have to check for each
238 result, but instead can call this function.
239
240 """
241 if not self.fail_msg:
242 return
243
244 if not msg:
245 msg = ("Call '%s' to node '%s' has failed: %s" %
246 (self.call, self.node, self.fail_msg))
247 else:
248 msg = "%s: %s" % (msg, self.fail_msg)
249 if prereq:
250 ec = errors.OpPrereqError
251 else:
252 ec = errors.OpExecError
253 if ecode is not None:
254 args = (msg, ecode)
255 else:
256 args = (msg, )
257 raise ec(*args)
258
261 """RPC Client class.
262
263 This class, given a (remote) method name, a list of parameters and a
264 list of nodes, will contact (in parallel) all nodes, and return a
265 dict of results (key: node name, value: result).
266
267 One current bug is that generic failure is still signaled by
268 'False' result, which is not good. This overloading of values can
269 cause bugs.
270
271 """
272 - def __init__(self, procedure, body, port):
273 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
274 " timeouts table")
275 self.procedure = procedure
276 self.body = body
277 self.port = port
278 self._request = {}
279
280 - def ConnectList(self, node_list, address_list=None, read_timeout=None):
281 """Add a list of nodes to the target nodes.
282
283 @type node_list: list
284 @param node_list: the list of node names to connect
285 @type address_list: list or None
286 @keyword address_list: either None or a list with node addresses,
287 which must have the same length as the node list
288 @type read_timeout: int
289 @param read_timeout: overwrites the default read timeout for the
290 given operation
291
292 """
293 if address_list is None:
294 address_list = [None for _ in node_list]
295 else:
296 assert len(node_list) == len(address_list), \
297 "Name and address lists should have the same length"
298 for node, address in zip(node_list, address_list):
299 self.ConnectNode(node, address, read_timeout=read_timeout)
300
301 - def ConnectNode(self, name, address=None, read_timeout=None):
302 """Add a node to the target list.
303
304 @type name: str
305 @param name: the node name
306 @type address: str
307 @keyword address: the node address, if known
308
309 """
310 if address is None:
311 address = name
312
313 if read_timeout is None:
314 read_timeout = _TIMEOUTS[self.procedure]
315
316 self._request[name] = \
317 http.client.HttpClientRequest(str(address), self.port,
318 http.HTTP_PUT, str("/%s" % self.procedure),
319 headers=_RPC_CLIENT_HEADERS,
320 post_data=str(self.body),
321 read_timeout=read_timeout)
322
324 """Call nodes and return results.
325
326 @rtype: list
327 @return: List of RPC results
328
329 """
330 if not http_pool:
331 http_pool = _thread_local.GetHttpClientPool()
332
333 http_pool.ProcessRequests(self._request.values())
334
335 results = {}
336
337 for name, req in self._request.iteritems():
338 if req.success and req.resp_status_code == http.HTTP_OK:
339 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
340 node=name, call=self.procedure)
341 continue
342
343
344 if req.error:
345 msg = req.error
346 else:
347 msg = req.resp_body
348
349 logging.error("RPC error in %s from node %s: %s",
350 self.procedure, name, msg)
351 results[name] = RpcResult(data=msg, failed=True, node=name,
352 call=self.procedure)
353
354 return results
355
358 """Encodes import/export I/O information.
359
360 """
361 if ieio == constants.IEIO_RAW_DISK:
362 assert len(ieioargs) == 1
363 return (ieioargs[0].ToDict(), )
364
365 if ieio == constants.IEIO_SCRIPT:
366 assert len(ieioargs) == 2
367 return (ieioargs[0].ToDict(), ieioargs[1])
368
369 return ieioargs
370
373 """RPC runner class"""
374
376 """Initialized the rpc runner.
377
378 @type cfg: C{config.ConfigWriter}
379 @param cfg: the configuration object that will be used to get data
380 about the cluster
381
382 """
383 self._cfg = cfg
384 self.port = netutils.GetDaemonPort(constants.NODED)
385
386 - def _InstDict(self, instance, hvp=None, bep=None, osp=None):
387 """Convert the given instance to a dict.
388
389 This is done via the instance's ToDict() method and additionally
390 we fill the hvparams with the cluster defaults.
391
392 @type instance: L{objects.Instance}
393 @param instance: an Instance object
394 @type hvp: dict or None
395 @param hvp: a dictionary with overridden hypervisor parameters
396 @type bep: dict or None
397 @param bep: a dictionary with overridden backend parameters
398 @type osp: dict or None
399 @param osp: a dictionary with overriden os parameters
400 @rtype: dict
401 @return: the instance dict, with the hvparams filled with the
402 cluster defaults
403
404 """
405 idict = instance.ToDict()
406 cluster = self._cfg.GetClusterInfo()
407 idict["hvparams"] = cluster.FillHV(instance)
408 if hvp is not None:
409 idict["hvparams"].update(hvp)
410 idict["beparams"] = cluster.FillBE(instance)
411 if bep is not None:
412 idict["beparams"].update(bep)
413 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
414 if osp is not None:
415 idict["osparams"].update(osp)
416 for nic in idict["nics"]:
417 nic['nicparams'] = objects.FillDict(
418 cluster.nicparams[constants.PP_DEFAULT],
419 nic['nicparams'])
420 return idict
421
422 - def _ConnectList(self, client, node_list, call, read_timeout=None):
423 """Helper for computing node addresses.
424
425 @type client: L{ganeti.rpc.Client}
426 @param client: a C{Client} instance
427 @type node_list: list
428 @param node_list: the node list we should connect
429 @type call: string
430 @param call: the name of the remote procedure call, for filling in
431 correctly any eventual offline nodes' results
432 @type read_timeout: int
433 @param read_timeout: overwrites the default read timeout for the
434 given operation
435
436 """
437 all_nodes = self._cfg.GetAllNodesInfo()
438 name_list = []
439 addr_list = []
440 skip_dict = {}
441 for node in node_list:
442 if node in all_nodes:
443 if all_nodes[node].offline:
444 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
445 continue
446 val = all_nodes[node].primary_ip
447 else:
448 val = None
449 addr_list.append(val)
450 name_list.append(node)
451 if name_list:
452 client.ConnectList(name_list, address_list=addr_list,
453 read_timeout=read_timeout)
454 return skip_dict
455
456 - def _ConnectNode(self, client, node, call, read_timeout=None):
457 """Helper for computing one node's address.
458
459 @type client: L{ganeti.rpc.Client}
460 @param client: a C{Client} instance
461 @type node: str
462 @param node: the node we should connect
463 @type call: string
464 @param call: the name of the remote procedure call, for filling in
465 correctly any eventual offline nodes' results
466 @type read_timeout: int
467 @param read_timeout: overwrites the default read timeout for the
468 given operation
469
470 """
471 node_info = self._cfg.GetNodeInfo(node)
472 if node_info is not None:
473 if node_info.offline:
474 return RpcResult(node=node, offline=True, call=call)
475 addr = node_info.primary_ip
476 else:
477 addr = None
478 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
479
480 - def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
481 """Helper for making a multi-node call
482
483 """
484 body = serializer.DumpJson(args, indent=False)
485 c = Client(procedure, body, self.port)
486 skip_dict = self._ConnectList(c, node_list, procedure,
487 read_timeout=read_timeout)
488 skip_dict.update(c.GetResults())
489 return skip_dict
490
491 @classmethod
492 - def _StaticMultiNodeCall(cls, node_list, procedure, args,
493 address_list=None, read_timeout=None):
502
504 """Helper for making a single-node call
505
506 """
507 body = serializer.DumpJson(args, indent=False)
508 c = Client(procedure, body, self.port)
509 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
510 if result is None:
511
512 result = c.GetResults()[node]
513 return result
514
515 @classmethod
524
525 @staticmethod
527 """Compresses a string for transport over RPC.
528
529 Small amounts of data are not compressed.
530
531 @type data: str
532 @param data: Data
533 @rtype: tuple
534 @return: Encoded data to send
535
536 """
537
538 if len(data) < 512:
539 return (constants.RPC_ENCODING_NONE, data)
540
541
542 return (constants.RPC_ENCODING_ZLIB_BASE64,
543 base64.b64encode(zlib.compress(data, 3)))
544
545
546
547
548
549 @_RpcTimeout(_TMO_URGENT)
551 """Gets the logical volumes present in a given volume group.
552
553 This is a multi-node call.
554
555 """
556 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
557
558 @_RpcTimeout(_TMO_URGENT)
560 """Gets the volume group list.
561
562 This is a multi-node call.
563
564 """
565 return self._MultiNodeCall(node_list, "vg_list", [])
566
567 @_RpcTimeout(_TMO_NORMAL)
569 """Get list of storage units.
570
571 This is a multi-node call.
572
573 """
574 return self._MultiNodeCall(node_list, "storage_list",
575 [su_name, su_args, name, fields])
576
577 @_RpcTimeout(_TMO_NORMAL)
579 """Modify a storage unit.
580
581 This is a single-node call.
582
583 """
584 return self._SingleNodeCall(node, "storage_modify",
585 [su_name, su_args, name, changes])
586
587 @_RpcTimeout(_TMO_NORMAL)
589 """Executes an operation on a storage unit.
590
591 This is a single-node call.
592
593 """
594 return self._SingleNodeCall(node, "storage_execute",
595 [su_name, su_args, name, op])
596
597 @_RpcTimeout(_TMO_URGENT)
599 """Checks if a node has all the bridges given.
600
601 This method checks if all bridges given in the bridges_list are
602 present on the remote node, so that an instance that uses interfaces
603 on those bridges can be started.
604
605 This is a single-node call.
606
607 """
608 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
609
610 @_RpcTimeout(_TMO_NORMAL)
612 """Starts an instance.
613
614 This is a single-node call.
615
616 """
617 idict = self._InstDict(instance, hvp=hvp, bep=bep)
618 return self._SingleNodeCall(node, "instance_start", [idict])
619
620 @_RpcTimeout(_TMO_NORMAL)
622 """Stops an instance.
623
624 This is a single-node call.
625
626 """
627 return self._SingleNodeCall(node, "instance_shutdown",
628 [self._InstDict(instance), timeout])
629
630 @_RpcTimeout(_TMO_NORMAL)
632 """Gather the information necessary to prepare an instance migration.
633
634 This is a single-node call.
635
636 @type node: string
637 @param node: the node on which the instance is currently running
638 @type instance: C{objects.Instance}
639 @param instance: the instance definition
640
641 """
642 return self._SingleNodeCall(node, "migration_info",
643 [self._InstDict(instance)])
644
645 @_RpcTimeout(_TMO_NORMAL)
647 """Prepare a node to accept an instance.
648
649 This is a single-node call.
650
651 @type node: string
652 @param node: the target node for the migration
653 @type instance: C{objects.Instance}
654 @param instance: the instance definition
655 @type info: opaque/hypervisor specific (string/data)
656 @param info: result for the call_migration_info call
657 @type target: string
658 @param target: target hostname (usually ip address) (on the node itself)
659
660 """
661 return self._SingleNodeCall(node, "accept_instance",
662 [self._InstDict(instance), info, target])
663
664 @_RpcTimeout(_TMO_NORMAL)
666 """Finalize any target-node migration specific operation.
667
668 This is called both in case of a successful migration and in case of error
669 (in which case it should abort the migration).
670
671 This is a single-node call.
672
673 @type node: string
674 @param node: the target node for the migration
675 @type instance: C{objects.Instance}
676 @param instance: the instance definition
677 @type info: opaque/hypervisor specific (string/data)
678 @param info: result for the call_migration_info call
679 @type success: boolean
680 @param success: whether the migration was a success or a failure
681
682 """
683 return self._SingleNodeCall(node, "finalize_migration",
684 [self._InstDict(instance), info, success])
685
686 @_RpcTimeout(_TMO_SLOW)
688 """Migrate an instance.
689
690 This is a single-node call.
691
692 @type node: string
693 @param node: the node on which the instance is currently running
694 @type instance: C{objects.Instance}
695 @param instance: the instance definition
696 @type target: string
697 @param target: the target node name
698 @type live: boolean
699 @param live: whether the migration should be done live or not (the
700 interpretation of this parameter is left to the hypervisor)
701
702 """
703 return self._SingleNodeCall(node, "instance_migrate",
704 [self._InstDict(instance), target, live])
705
706 @_RpcTimeout(_TMO_NORMAL)
708 """Reboots an instance.
709
710 This is a single-node call.
711
712 """
713 return self._SingleNodeCall(node, "instance_reboot",
714 [self._InstDict(inst), reboot_type,
715 shutdown_timeout])
716
717 @_RpcTimeout(_TMO_1DAY)
719 """Installs an OS on the given instance.
720
721 This is a single-node call.
722
723 """
724 return self._SingleNodeCall(node, "instance_os_add",
725 [self._InstDict(inst), reinstall, debug])
726
727 @_RpcTimeout(_TMO_SLOW)
729 """Run the OS rename script for an instance.
730
731 This is a single-node call.
732
733 """
734 return self._SingleNodeCall(node, "instance_run_rename",
735 [self._InstDict(inst), old_name, debug])
736
737 @_RpcTimeout(_TMO_URGENT)
739 """Returns information about a single instance.
740
741 This is a single-node call.
742
743 @type node: list
744 @param node: the list of nodes to query
745 @type instance: string
746 @param instance: the instance name
747 @type hname: string
748 @param hname: the hypervisor type of the instance
749
750 """
751 return self._SingleNodeCall(node, "instance_info", [instance, hname])
752
753 @_RpcTimeout(_TMO_NORMAL)
755 """Checks whether the given instance can be migrated.
756
757 This is a single-node call.
758
759 @param node: the node to query
760 @type instance: L{objects.Instance}
761 @param instance: the instance to check
762
763
764 """
765 return self._SingleNodeCall(node, "instance_migratable",
766 [self._InstDict(instance)])
767
768 @_RpcTimeout(_TMO_URGENT)
770 """Returns information about all instances on the given nodes.
771
772 This is a multi-node call.
773
774 @type node_list: list
775 @param node_list: the list of nodes to query
776 @type hypervisor_list: list
777 @param hypervisor_list: the hypervisors to query for instances
778
779 """
780 return self._MultiNodeCall(node_list, "all_instances_info",
781 [hypervisor_list])
782
783 @_RpcTimeout(_TMO_URGENT)
785 """Returns the list of running instances on a given node.
786
787 This is a multi-node call.
788
789 @type node_list: list
790 @param node_list: the list of nodes to query
791 @type hypervisor_list: list
792 @param hypervisor_list: the hypervisors to query for instances
793
794 """
795 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
796
797 @_RpcTimeout(_TMO_FAST)
800 """Do a TcpPing on the remote node
801
802 This is a single-node call.
803
804 """
805 return self._SingleNodeCall(node, "node_tcp_ping",
806 [source, target, port, timeout,
807 live_port_needed])
808
809 @_RpcTimeout(_TMO_FAST)
811 """Checks if a node has the given IP address.
812
813 This is a single-node call.
814
815 """
816 return self._SingleNodeCall(node, "node_has_ip_address", [address])
817
818 @_RpcTimeout(_TMO_URGENT)
820 """Return node information.
821
822 This will return memory information and volume group size and free
823 space.
824
825 This is a multi-node call.
826
827 @type node_list: list
828 @param node_list: the list of nodes to query
829 @type vg_name: C{string}
830 @param vg_name: the name of the volume group to ask for disk space
831 information
832 @type hypervisor_type: C{str}
833 @param hypervisor_type: the name of the hypervisor to ask for
834 memory information
835
836 """
837 return self._MultiNodeCall(node_list, "node_info",
838 [vg_name, hypervisor_type])
839
840 @_RpcTimeout(_TMO_NORMAL)
841 - def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
842 """Add a node to the cluster.
843
844 This is a single-node call.
845
846 """
847 return self._SingleNodeCall(node, "node_add",
848 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
849
850 @_RpcTimeout(_TMO_NORMAL)
852 """Request verification of given parameters.
853
854 This is a multi-node call.
855
856 """
857 return self._MultiNodeCall(node_list, "node_verify",
858 [checkdict, cluster_name])
859
860 @classmethod
861 @_RpcTimeout(_TMO_FAST)
863 """Tells a node to activate itself as a master.
864
865 This is a single-node call.
866
867 """
868 return cls._StaticSingleNodeCall(node, "node_start_master",
869 [start_daemons, no_voting])
870
871 @classmethod
872 @_RpcTimeout(_TMO_FAST)
874 """Tells a node to demote itself from master status.
875
876 This is a single-node call.
877
878 """
879 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
880
881 @classmethod
882 @_RpcTimeout(_TMO_URGENT)
884 """Query master info.
885
886 This is a multi-node call.
887
888 """
889
890 return cls._StaticMultiNodeCall(node_list, "master_info", [])
891
892 @classmethod
893 @_RpcTimeout(_TMO_URGENT)
895 """Query node version.
896
897 This is a multi-node call.
898
899 """
900 return cls._StaticMultiNodeCall(node_list, "version", [])
901
902 @_RpcTimeout(_TMO_NORMAL)
904 """Request creation of a given block device.
905
906 This is a single-node call.
907
908 """
909 return self._SingleNodeCall(node, "blockdev_create",
910 [bdev.ToDict(), size, owner, on_primary, info])
911
912 @_RpcTimeout(_TMO_NORMAL)
914 """Request removal of a given block device.
915
916 This is a single-node call.
917
918 """
919 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
920
921 @_RpcTimeout(_TMO_NORMAL)
923 """Request rename of the given block devices.
924
925 This is a single-node call.
926
927 """
928 return self._SingleNodeCall(node, "blockdev_rename",
929 [(d.ToDict(), uid) for d, uid in devlist])
930
931 @_RpcTimeout(_TMO_NORMAL)
933 """Request assembling of a given block device.
934
935 This is a single-node call.
936
937 """
938 return self._SingleNodeCall(node, "blockdev_assemble",
939 [disk.ToDict(), owner, on_primary])
940
941 @_RpcTimeout(_TMO_NORMAL)
943 """Request shutdown of a given block device.
944
945 This is a single-node call.
946
947 """
948 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
949
950 @_RpcTimeout(_TMO_NORMAL)
952 """Request adding a list of children to a (mirroring) device.
953
954 This is a single-node call.
955
956 """
957 return self._SingleNodeCall(node, "blockdev_addchildren",
958 [bdev.ToDict(),
959 [disk.ToDict() for disk in ndevs]])
960
961 @_RpcTimeout(_TMO_NORMAL)
963 """Request removing a list of children from a (mirroring) device.
964
965 This is a single-node call.
966
967 """
968 return self._SingleNodeCall(node, "blockdev_removechildren",
969 [bdev.ToDict(),
970 [disk.ToDict() for disk in ndevs]])
971
972 @_RpcTimeout(_TMO_NORMAL)
974 """Request status of a (mirroring) device.
975
976 This is a single-node call.
977
978 """
979 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
980 [dsk.ToDict() for dsk in disks])
981 if not result.fail_msg:
982 result.payload = [objects.BlockDevStatus.FromDict(i)
983 for i in result.payload]
984 return result
985
986 @_RpcTimeout(_TMO_NORMAL)
988 """Request identification of a given block device.
989
990 This is a single-node call.
991
992 """
993 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
994 if not result.fail_msg and result.payload is not None:
995 result.payload = objects.BlockDevStatus.FromDict(result.payload)
996 return result
997
998 @_RpcTimeout(_TMO_NORMAL)
1000 """Closes the given block devices.
1001
1002 This is a single-node call.
1003
1004 """
1005 params = [instance_name, [cf.ToDict() for cf in disks]]
1006 return self._SingleNodeCall(node, "blockdev_close", params)
1007
1008 @_RpcTimeout(_TMO_NORMAL)
1010 """Returns the size of the given disks.
1011
1012 This is a single-node call.
1013
1014 """
1015 params = [[cf.ToDict() for cf in disks]]
1016 return self._SingleNodeCall(node, "blockdev_getsize", params)
1017
1018 @_RpcTimeout(_TMO_NORMAL)
1020 """Disconnects the network of the given drbd devices.
1021
1022 This is a multi-node call.
1023
1024 """
1025 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1026 [nodes_ip, [cf.ToDict() for cf in disks]])
1027
1028 @_RpcTimeout(_TMO_NORMAL)
1031 """Disconnects the given drbd devices.
1032
1033 This is a multi-node call.
1034
1035 """
1036 return self._MultiNodeCall(node_list, "drbd_attach_net",
1037 [nodes_ip, [cf.ToDict() for cf in disks],
1038 instance_name, multimaster])
1039
1040 @_RpcTimeout(_TMO_SLOW)
1042 """Waits for the synchronization of drbd devices is complete.
1043
1044 This is a multi-node call.
1045
1046 """
1047 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1048 [nodes_ip, [cf.ToDict() for cf in disks]])
1049
1050 @_RpcTimeout(_TMO_URGENT)
1052 """Gets drbd helper.
1053
1054 This is a multi-node call.
1055
1056 """
1057 return self._MultiNodeCall(node_list, "drbd_helper", [])
1058
1059 @classmethod
1060 @_RpcTimeout(_TMO_NORMAL)
1062 """Upload a file.
1063
1064 The node will refuse the operation in case the file is not on the
1065 approved file list.
1066
1067 This is a multi-node call.
1068
1069 @type node_list: list
1070 @param node_list: the list of node names to upload to
1071 @type file_name: str
1072 @param file_name: the filename to upload
1073 @type address_list: list or None
1074 @keyword address_list: an optional list of node addresses, in order
1075 to optimize the RPC speed
1076
1077 """
1078 file_contents = utils.ReadFile(file_name)
1079 data = cls._Compress(file_contents)
1080 st = os.stat(file_name)
1081 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1082 st.st_atime, st.st_mtime]
1083 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1084 address_list=address_list)
1085
1086 @classmethod
1087 @_RpcTimeout(_TMO_NORMAL)
1089 """Write ssconf files.
1090
1091 This is a multi-node call.
1092
1093 """
1094 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1095
1096 @_RpcTimeout(_TMO_FAST)
1098 """Request a diagnose of OS definitions.
1099
1100 This is a multi-node call.
1101
1102 """
1103 return self._MultiNodeCall(node_list, "os_diagnose", [])
1104
1105 @_RpcTimeout(_TMO_FAST)
1107 """Returns an OS definition.
1108
1109 This is a single-node call.
1110
1111 """
1112 result = self._SingleNodeCall(node, "os_get", [name])
1113 if not result.fail_msg and isinstance(result.payload, dict):
1114 result.payload = objects.OS.FromDict(result.payload)
1115 return result
1116
1117 @_RpcTimeout(_TMO_FAST)
1119 """Run a validation routine for a given OS.
1120
1121 This is a multi-node call.
1122
1123 """
1124 return self._MultiNodeCall(nodes, "os_validate",
1125 [required, name, checks, params])
1126
1127 @_RpcTimeout(_TMO_NORMAL)
1129 """Call the hooks runner.
1130
1131 Args:
1132 - op: the OpCode instance
1133 - env: a dictionary with the environment
1134
1135 This is a multi-node call.
1136
1137 """
1138 params = [hpath, phase, env]
1139 return self._MultiNodeCall(node_list, "hooks_runner", params)
1140
1141 @_RpcTimeout(_TMO_NORMAL)
1143 """Call an iallocator on a remote node
1144
1145 Args:
1146 - name: the iallocator name
1147 - input: the json-encoded input string
1148
1149 This is a single-node call.
1150
1151 """
1152 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1153
1154 @_RpcTimeout(_TMO_NORMAL)
1156 """Request a snapshot of the given block device.
1157
1158 This is a single-node call.
1159
1160 """
1161 return self._SingleNodeCall(node, "blockdev_grow",
1162 [cf_bdev.ToDict(), amount])
1163
1164 @_RpcTimeout(_TMO_1DAY)
1167 """Export a given disk to another node.
1168
1169 This is a single-node call.
1170
1171 """
1172 return self._SingleNodeCall(node, "blockdev_export",
1173 [cf_bdev.ToDict(), dest_node, dest_path,
1174 cluster_name])
1175
1176 @_RpcTimeout(_TMO_NORMAL)
1178 """Request a snapshot of the given block device.
1179
1180 This is a single-node call.
1181
1182 """
1183 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1184
1185 @_RpcTimeout(_TMO_NORMAL)
1187 """Request the completion of an export operation.
1188
1189 This writes the export config file, etc.
1190
1191 This is a single-node call.
1192
1193 """
1194 flat_disks = []
1195 for disk in snap_disks:
1196 if isinstance(disk, bool):
1197 flat_disks.append(disk)
1198 else:
1199 flat_disks.append(disk.ToDict())
1200
1201 return self._SingleNodeCall(node, "finalize_export",
1202 [self._InstDict(instance), flat_disks])
1203
1204 @_RpcTimeout(_TMO_FAST)
1206 """Queries the export information in a given path.
1207
1208 This is a single-node call.
1209
1210 """
1211 return self._SingleNodeCall(node, "export_info", [path])
1212
1213 @_RpcTimeout(_TMO_FAST)
1215 """Gets the stored exports list.
1216
1217 This is a multi-node call.
1218
1219 """
1220 return self._MultiNodeCall(node_list, "export_list", [])
1221
1222 @_RpcTimeout(_TMO_FAST)
1224 """Requests removal of a given export.
1225
1226 This is a single-node call.
1227
1228 """
1229 return self._SingleNodeCall(node, "export_remove", [export])
1230
1231 @classmethod
1232 @_RpcTimeout(_TMO_NORMAL)
1234 """Requests a node to clean the cluster information it has.
1235
1236 This will remove the configuration information from the ganeti data
1237 dir.
1238
1239 This is a single-node call.
1240
1241 """
1242 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1243 [modify_ssh_setup])
1244
1245 @_RpcTimeout(_TMO_FAST)
1247 """Gets all volumes on node(s).
1248
1249 This is a multi-node call.
1250
1251 """
1252 return self._MultiNodeCall(node_list, "node_volumes", [])
1253
1254 @_RpcTimeout(_TMO_FAST)
1256 """Demote a node from the master candidate role.
1257
1258 This is a single-node call.
1259
1260 """
1261 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1262
1263 @_RpcTimeout(_TMO_NORMAL)
1265 """Tries to powercycle a node.
1266
1267 This is a single-node call.
1268
1269 """
1270 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1271
1272 @_RpcTimeout(None)
1274 """Sleep for a fixed time on given node(s).
1275
1276 This is a multi-node call.
1277
1278 """
1279 return self._MultiNodeCall(node_list, "test_delay", [duration],
1280 read_timeout=int(duration + 5))
1281
1282 @_RpcTimeout(_TMO_FAST)
1284 """Create the given file storage directory.
1285
1286 This is a single-node call.
1287
1288 """
1289 return self._SingleNodeCall(node, "file_storage_dir_create",
1290 [file_storage_dir])
1291
1292 @_RpcTimeout(_TMO_FAST)
1294 """Remove the given file storage directory.
1295
1296 This is a single-node call.
1297
1298 """
1299 return self._SingleNodeCall(node, "file_storage_dir_remove",
1300 [file_storage_dir])
1301
1302 @_RpcTimeout(_TMO_FAST)
1305 """Rename file storage directory.
1306
1307 This is a single-node call.
1308
1309 """
1310 return self._SingleNodeCall(node, "file_storage_dir_rename",
1311 [old_file_storage_dir, new_file_storage_dir])
1312
1313 @classmethod
1314 @_RpcTimeout(_TMO_FAST)
1316 """Update job queue.
1317
1318 This is a multi-node call.
1319
1320 """
1321 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1322 [file_name, cls._Compress(content)],
1323 address_list=address_list)
1324
1325 @classmethod
1326 @_RpcTimeout(_TMO_NORMAL)
1328 """Purge job queue.
1329
1330 This is a single-node call.
1331
1332 """
1333 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1334
1335 @classmethod
1336 @_RpcTimeout(_TMO_FAST)
1338 """Rename a job queue file.
1339
1340 This is a multi-node call.
1341
1342 """
1343 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1344 address_list=address_list)
1345
1346 @_RpcTimeout(_TMO_NORMAL)
1348 """Validate the hypervisor params.
1349
1350 This is a multi-node call.
1351
1352 @type node_list: list
1353 @param node_list: the list of nodes to query
1354 @type hvname: string
1355 @param hvname: the hypervisor name
1356 @type hvparams: dict
1357 @param hvparams: the hypervisor parameters to be validated
1358
1359 """
1360 cluster = self._cfg.GetClusterInfo()
1361 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1362 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1363 [hvname, hv_full])
1364
1365 @_RpcTimeout(_TMO_NORMAL)
1367 """Creates a new X509 certificate for SSL/TLS.
1368
1369 This is a single-node call.
1370
1371 @type validity: int
1372 @param validity: Validity in seconds
1373
1374 """
1375 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1376
1377 @_RpcTimeout(_TMO_NORMAL)
1379 """Removes a X509 certificate.
1380
1381 This is a single-node call.
1382
1383 @type name: string
1384 @param name: Certificate name
1385
1386 """
1387 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1388
1389 @_RpcTimeout(_TMO_NORMAL)
1391 """Starts a listener for an import.
1392
1393 This is a single-node call.
1394
1395 @type node: string
1396 @param node: Node name
1397 @type instance: C{objects.Instance}
1398 @param instance: Instance object
1399
1400 """
1401 return self._SingleNodeCall(node, "import_start",
1402 [opts.ToDict(),
1403 self._InstDict(instance), dest,
1404 _EncodeImportExportIO(dest, dest_args)])
1405
1406 @_RpcTimeout(_TMO_NORMAL)
1407 - def call_export_start(self, node, opts, host, port,
1408 instance, source, source_args):
1409 """Starts an export daemon.
1410
1411 This is a single-node call.
1412
1413 @type node: string
1414 @param node: Node name
1415 @type instance: C{objects.Instance}
1416 @param instance: Instance object
1417
1418 """
1419 return self._SingleNodeCall(node, "export_start",
1420 [opts.ToDict(), host, port,
1421 self._InstDict(instance), source,
1422 _EncodeImportExportIO(source, source_args)])
1423
1424 @_RpcTimeout(_TMO_FAST)
1426 """Gets the status of an import or export.
1427
1428 This is a single-node call.
1429
1430 @type node: string
1431 @param node: Node name
1432 @type names: List of strings
1433 @param names: Import/export names
1434 @rtype: List of L{objects.ImportExportStatus} instances
1435 @return: Returns a list of the state of each named import/export or None if
1436 a status couldn't be retrieved
1437
1438 """
1439 result = self._SingleNodeCall(node, "impexp_status", [names])
1440
1441 if not result.fail_msg:
1442 decoded = []
1443
1444 for i in result.payload:
1445 if i is None:
1446 decoded.append(None)
1447 continue
1448 decoded.append(objects.ImportExportStatus.FromDict(i))
1449
1450 result.payload = decoded
1451
1452 return result
1453
1454 @_RpcTimeout(_TMO_NORMAL)
1456 """Aborts an import or export.
1457
1458 This is a single-node call.
1459
1460 @type node: string
1461 @param node: Node name
1462 @type name: string
1463 @param name: Import/export name
1464
1465 """
1466 return self._SingleNodeCall(node, "impexp_abort", [name])
1467
1468 @_RpcTimeout(_TMO_NORMAL)
1470 """Cleans up after an import or export.
1471
1472 This is a single-node call.
1473
1474 @type node: string
1475 @param node: Node name
1476 @type name: string
1477 @param name: Import/export name
1478
1479 """
1480 return self._SingleNodeCall(node, "impexp_cleanup", [name])
1481