1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
39 """Local exception to report import/export errors.
40
41 """
42
45
46 DEFAULT_READY_TIMEOUT = 10
47
48
49 DEFAULT_ERROR_TIMEOUT = 10
50
51
52 DEFAULT_LISTEN_TIMEOUT = 10
53
54
55 DEFAULT_PROGRESS_INTERVAL = 60
56
57 __slots__ = [
58 "error",
59 "ready",
60 "listen",
61 "connect",
62 "progress",
63 ]
64
70 """Initializes this class.
71
72 @type connect: number
73 @param connect: Timeout for establishing connection
74 @type listen: number
75 @param listen: Timeout for starting to listen for connections
76 @type error: number
77 @param error: Length of time until errors cause hard failure
78 @type ready: number
79 @param ready: Timeout for daemon to become ready
80 @type progress: number
81 @param progress: Progress update interval
82
83 """
84 self.error = error
85 self.ready = ready
86 self.listen = listen
87 self.connect = connect
88 self.progress = progress
89
92 """Callbacks for disk import/export.
93
94 """
96 """Called when daemon started listening.
97
98 @type ie: Subclass of L{_DiskImportExportBase}
99 @param ie: Import/export object
100 @param private: Private data passed to import/export object
101 @param component: transfer component name
102
103 """
104
106 """Called when a connection has been established.
107
108 @type ie: Subclass of L{_DiskImportExportBase}
109 @param ie: Import/export object
110 @param private: Private data passed to import/export object
111
112 """
113
115 """Called when new progress information should be reported.
116
117 @type ie: Subclass of L{_DiskImportExportBase}
118 @param ie: Import/export object
119 @param private: Private data passed to import/export object
120
121 """
122
124 """Called when a transfer has finished.
125
126 @type ie: Subclass of L{_DiskImportExportBase}
127 @param ie: Import/export object
128 @param private: Private data passed to import/export object
129
130 """
131
134 """Checks whether a timeout has expired.
135
136 """
137 return _time_fn() > (epoch + timeout)
138
141 MODE_TEXT = None
142
143 - def __init__(self, lu, node_name, opts,
144 instance, component, timeouts, cbs, private=None):
145 """Initializes this class.
146
147 @param lu: Logical unit instance
148 @type node_name: string
149 @param node_name: Node name for import
150 @type opts: L{objects.ImportExportOptions}
151 @param opts: Import/export daemon options
152 @type instance: L{objects.Instance}
153 @param instance: Instance object
154 @type component: string
155 @param component: which part of the instance is being imported
156 @type timeouts: L{ImportExportTimeouts}
157 @param timeouts: Timeouts for this import
158 @type cbs: L{ImportExportCbBase}
159 @param cbs: Callbacks
160 @param private: Private data for callback functions
161
162 """
163 assert self.MODE_TEXT
164
165 self._lu = lu
166 self.node_name = node_name
167 self._opts = opts.Copy()
168 self._instance = instance
169 self._component = component
170 self._timeouts = timeouts
171 self._cbs = cbs
172 self._private = private
173
174
175 assert self._opts.connect_timeout is None
176 self._opts.connect_timeout = timeouts.connect
177
178
179 self._loop = None
180
181
182 self._ts_begin = None
183 self._ts_connected = None
184 self._ts_finished = None
185 self._ts_cleanup = None
186 self._ts_last_progress = None
187 self._ts_last_error = None
188
189
190 self.success = None
191 self.final_message = None
192
193
194 self._daemon_name = None
195 self._daemon = None
196
197 @property
199 """Returns the most recent output from the daemon.
200
201 """
202 if self._daemon:
203 return "\n".join(self._daemon.recent_output)
204
205 return None
206
207 @property
209 """Returns transfer progress information.
210
211 """
212 if not self._daemon:
213 return None
214
215 return (self._daemon.progress_mbytes,
216 self._daemon.progress_throughput,
217 self._daemon.progress_percent,
218 self._daemon.progress_eta)
219
220 @property
222 """Returns the magic value for this import/export.
223
224 """
225 return self._opts.magic
226
227 @property
229 """Determines whether this transport is still active.
230
231 """
232 return self.success is None
233
234 @property
236 """Returns parent loop.
237
238 @rtype: L{ImportExportLoop}
239
240 """
241 return self._loop
242
244 """Sets the parent loop.
245
246 @type loop: L{ImportExportLoop}
247
248 """
249 if self._loop:
250 raise errors.ProgrammerError("Loop can only be set once")
251
252 self._loop = loop
253
255 """Starts the import/export daemon.
256
257 """
258 raise NotImplementedError()
259
261 """Checks whether daemon has been started and if not, starts it.
262
263 @rtype: string
264 @return: Daemon name
265
266 """
267 assert self._ts_cleanup is None
268
269 if self._daemon_name is None:
270 assert self._ts_begin is None
271
272 result = self._StartDaemon()
273 if result.fail_msg:
274 raise _ImportExportError("Failed to start %s on %s: %s" %
275 (self.MODE_TEXT, self.node_name,
276 result.fail_msg))
277
278 daemon_name = result.payload
279
280 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
281 self.node_name)
282
283 self._ts_begin = time.time()
284 self._daemon_name = daemon_name
285
286 return self._daemon_name
287
289 """Returns the daemon name.
290
291 """
292 assert self._daemon_name, "Daemon has not been started"
293 assert self._ts_cleanup is None
294 return self._daemon_name
295
297 """Sends SIGTERM to import/export daemon (if still active).
298
299 """
300 if self._daemon_name:
301 self._lu.LogWarning("Aborting %s '%s' on %s",
302 self.MODE_TEXT, self._daemon_name, self.node_name)
303 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
304 if result.fail_msg:
305 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
306 self.MODE_TEXT, self._daemon_name,
307 self.node_name, result.fail_msg)
308 return False
309
310 return True
311
313 """Internal function for updating status daemon data.
314
315 @type data: L{objects.ImportExportStatus}
316 @param data: Daemon status data
317
318 """
319 assert self._ts_begin is not None
320
321 if not data:
322 if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
323 raise _ImportExportError("Didn't become ready after %s seconds" %
324 self._timeouts.ready)
325
326 return False
327
328 self._daemon = data
329
330 return True
331
333 """Updates daemon status data.
334
335 @type success: bool
336 @param success: Whether fetching data was successful or not
337 @type data: L{objects.ImportExportStatus}
338 @param data: Daemon status data
339
340 """
341 if not success:
342 if self._ts_last_error is None:
343 self._ts_last_error = time.time()
344
345 elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
346 raise _ImportExportError("Too many errors while updating data")
347
348 return False
349
350 self._ts_last_error = None
351
352 return self._SetDaemonData(data)
353
355 """Checks whether the daemon is listening.
356
357 """
358 raise NotImplementedError()
359
361 """Returns timeout to calculate connect timeout.
362
363 """
364 raise NotImplementedError()
365
367 """Checks whether the daemon is connected.
368
369 @rtype: bool
370 @return: Whether the daemon is connected
371
372 """
373 assert self._daemon, "Daemon status missing"
374
375 if self._ts_connected is not None:
376 return True
377
378 if self._daemon.connected:
379 self._ts_connected = time.time()
380
381
382 logging.debug("%s '%s' on %s is now connected",
383 self.MODE_TEXT, self._daemon_name, self.node_name)
384
385 self._cbs.ReportConnected(self, self._private)
386
387 return True
388
389 if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
390 raise _ImportExportError("Not connected after %s seconds" %
391 self._timeouts.connect)
392
393 return False
394
396 """Checks whether a progress update should be reported.
397
398 """
399 if ((self._ts_last_progress is None or
400 _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
401 self._daemon and
402 self._daemon.progress_mbytes is not None and
403 self._daemon.progress_throughput is not None):
404 self._cbs.ReportProgress(self, self._private)
405 self._ts_last_progress = time.time()
406
408 """Checks whether the daemon exited.
409
410 @rtype: bool
411 @return: Whether the transfer is finished
412
413 """
414 assert self._daemon, "Daemon status missing"
415
416 if self._ts_finished:
417 return True
418
419 if self._daemon.exit_status is None:
420
421 self._CheckProgress()
422 return False
423
424 self._ts_finished = time.time()
425
426 self._ReportFinished(self._daemon.exit_status == 0,
427 self._daemon.error_message)
428
429 return True
430
432 """Transfer is finished or daemon exited.
433
434 @type success: bool
435 @param success: Whether the transfer was successful
436 @type message: string
437 @param message: Error message
438
439 """
440 assert self.success is None
441
442 self.success = success
443 self.final_message = message
444
445 if success:
446 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
447 self._daemon_name, self.node_name)
448 elif self._daemon_name:
449 self._lu.LogWarning("%s '%s' on %s failed: %s",
450 self.MODE_TEXT, self._daemon_name, self.node_name,
451 message)
452 else:
453 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
454 self.node_name, message)
455
456 self._cbs.ReportFinished(self, self._private)
457
459 """Makes the RPC call to finalize this import/export.
460
461 """
462 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
463
465 """Finalizes this import/export.
466
467 """
468 if self._daemon_name:
469 logging.info("Finalizing %s '%s' on %s",
470 self.MODE_TEXT, self._daemon_name, self.node_name)
471
472 result = self._Finalize()
473 if result.fail_msg:
474 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
475 self.MODE_TEXT, self._daemon_name,
476 self.node_name, result.fail_msg)
477 return False
478
479
480 self._daemon_name = None
481 self._ts_cleanup = time.time()
482
483 if error:
484 self._ReportFinished(False, error)
485
486 return True
487
490 MODE_TEXT = "import"
491
492 - def __init__(self, lu, node_name, opts, instance, component,
493 dest, dest_args, timeouts, cbs, private=None):
494 """Initializes this class.
495
496 @param lu: Logical unit instance
497 @type node_name: string
498 @param node_name: Node name for import
499 @type opts: L{objects.ImportExportOptions}
500 @param opts: Import/export daemon options
501 @type instance: L{objects.Instance}
502 @param instance: Instance object
503 @type component: string
504 @param component: which part of the instance is being imported
505 @param dest: I/O destination
506 @param dest_args: I/O arguments
507 @type timeouts: L{ImportExportTimeouts}
508 @param timeouts: Timeouts for this import
509 @type cbs: L{ImportExportCbBase}
510 @param cbs: Callbacks
511 @param private: Private data for callback functions
512
513 """
514 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
515 component, timeouts, cbs, private)
516 self._dest = dest
517 self._dest_args = dest_args
518
519
520 self._ts_listening = None
521
522 @property
524 """Returns the port the daemon is listening on.
525
526 """
527 if self._daemon:
528 return self._daemon.listen_port
529
530 return None
531
533 """Starts the import daemon.
534
535 """
536 return self._lu.rpc.call_import_start(self.node_name, self._opts,
537 self._instance, self._component,
538 self._dest, self._dest_args)
539
541 """Checks whether the daemon is listening.
542
543 @rtype: bool
544 @return: Whether the daemon is listening
545
546 """
547 assert self._daemon, "Daemon status missing"
548
549 if self._ts_listening is not None:
550 return True
551
552 port = self._daemon.listen_port
553 if port is not None:
554 self._ts_listening = time.time()
555
556 logging.debug("Import '%s' on %s is now listening on port %s",
557 self._daemon_name, self.node_name, port)
558
559 self._cbs.ReportListening(self, self._private, self._component)
560
561 return True
562
563 if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
564 raise _ImportExportError("Not listening after %s seconds" %
565 self._timeouts.listen)
566
567 return False
568
570 """Returns the time since we started listening.
571
572 """
573 assert self._ts_listening is not None, \
574 ("Checking whether an import is connected is only useful"
575 " once it's been listening")
576
577 return self._ts_listening
578
581 MODE_TEXT = "export"
582
583 - def __init__(self, lu, node_name, opts, dest_host, dest_port,
584 instance, component, source, source_args,
585 timeouts, cbs, private=None):
586 """Initializes this class.
587
588 @param lu: Logical unit instance
589 @type node_name: string
590 @param node_name: Node name for import
591 @type opts: L{objects.ImportExportOptions}
592 @param opts: Import/export daemon options
593 @type dest_host: string
594 @param dest_host: Destination host name or IP address
595 @type dest_port: number
596 @param dest_port: Destination port number
597 @type instance: L{objects.Instance}
598 @param instance: Instance object
599 @type component: string
600 @param component: which part of the instance is being imported
601 @param source: I/O source
602 @param source_args: I/O source
603 @type timeouts: L{ImportExportTimeouts}
604 @param timeouts: Timeouts for this import
605 @type cbs: L{ImportExportCbBase}
606 @param cbs: Callbacks
607 @param private: Private data for callback functions
608
609 """
610 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
611 component, timeouts, cbs, private)
612 self._dest_host = dest_host
613 self._dest_port = dest_port
614 self._source = source
615 self._source_args = source_args
616
618 """Starts the export daemon.
619
620 """
621 return self._lu.rpc.call_export_start(self.node_name, self._opts,
622 self._dest_host, self._dest_port,
623 self._instance, self._component,
624 self._source, self._source_args)
625
627 """Checks whether the daemon is listening.
628
629 """
630
631 return True
632
634 """Returns the time since the daemon started.
635
636 """
637 assert self._ts_begin is not None
638
639 return self._ts_begin
640
662
665 MIN_DELAY = 1.0
666 MAX_DELAY = 20.0
667
669 """Initializes this class.
670
671 """
672 self._lu = lu
673 self._queue = []
674 self._pending_add = []
675
676 - def Add(self, diskie):
677 """Adds an import/export object to the loop.
678
679 @type diskie: Subclass of L{_DiskImportExportBase}
680 @param diskie: Import/export object
681
682 """
683 assert diskie not in self._pending_add
684 assert diskie.loop is None
685
686 diskie.SetLoop(self)
687
688
689
690
691 self._pending_add.append(diskie)
692
693 @staticmethod
695 """Collects the status for all import/export daemons.
696
697 """
698 daemon_status = {}
699
700 for node_name, names in daemons.iteritems():
701 result = lu.rpc.call_impexp_status(node_name, names)
702 if result.fail_msg:
703 lu.LogWarning("Failed to get daemon status on %s: %s",
704 node_name, result.fail_msg)
705 continue
706
707 assert len(names) == len(result.payload)
708
709 daemon_status[node_name] = dict(zip(names, result.payload))
710
711 return daemon_status
712
713 @staticmethod
715 """Gets the names of all active daemons.
716
717 """
718 result = {}
719 for diskie in queue:
720 if not diskie.active:
721 continue
722
723 try:
724
725 daemon_name = diskie.CheckDaemon()
726 except _ImportExportError, err:
727 logging.exception("%s failed", diskie.MODE_TEXT)
728 diskie.Finalize(error=str(err))
729 continue
730
731 result.setdefault(diskie.node_name, []).append(daemon_name)
732
733 assert len(queue) >= len(result)
734 assert len(queue) >= sum([len(names) for names in result.itervalues()])
735
736 logging.debug("daemons=%r", result)
737
738 return result
739
741 """Adds all pending import/export objects to the internal queue.
742
743 """
744 assert compat.all(diskie not in self._queue and diskie.loop == self
745 for diskie in self._pending_add)
746
747 self._queue.extend(self._pending_add)
748
749 del self._pending_add[:]
750
752 """Utility main loop.
753
754 """
755 while True:
756 self._AddPendingToQueue()
757
758
759 daemons = self._GetActiveDaemonNames(self._queue)
760 if not daemons:
761 break
762
763
764 data = self._CollectDaemonStatus(self._lu, daemons)
765
766
767 delay = self.MAX_DELAY
768 for diskie in self._queue:
769 if not diskie.active:
770 continue
771
772 try:
773 try:
774 all_daemon_data = data[diskie.node_name]
775 except KeyError:
776 result = diskie.SetDaemonData(False, None)
777 else:
778 result = \
779 diskie.SetDaemonData(True,
780 all_daemon_data[diskie.GetDaemonName()])
781
782 if not result:
783
784 delay = min(3.0, delay)
785 continue
786
787 if diskie.CheckFinished():
788
789 diskie.Finalize()
790 continue
791
792
793 delay = min(5.0, delay)
794
795 if not diskie.CheckListening():
796
797 delay = min(1.0, delay)
798 continue
799
800 if not diskie.CheckConnected():
801
802 delay = min(1.0, delay)
803 continue
804
805 except _ImportExportError, err:
806 logging.exception("%s failed", diskie.MODE_TEXT)
807 diskie.Finalize(error=str(err))
808
809 if not compat.any(diskie.active for diskie in self._queue):
810 break
811
812
813 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
814 logging.debug("Waiting for %ss", delay)
815 time.sleep(delay)
816
818 """Finalizes all pending transfers.
819
820 """
821 success = True
822
823 for diskie in self._queue:
824 success = diskie.Finalize() and success
825
826 return success
827
830 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
831 dest_node, dest_ip):
832 """Initializes this class.
833
834 """
835 ImportExportCbBase.__init__(self)
836
837 self.lu = lu
838 self.feedback_fn = feedback_fn
839 self.instance = instance
840 self.timeouts = timeouts
841 self.src_node = src_node
842 self.src_cbs = src_cbs
843 self.dest_node = dest_node
844 self.dest_ip = dest_ip
845
849 """Called when a connection has been established.
850
851 """
852 assert self.src_cbs is None
853 assert dtp.src_export == ie
854 assert dtp.dest_import
855
856 self.feedback_fn("%s is sending data on %s" %
857 (dtp.data.name, ie.node_name))
858
868
870 """Called when a transfer has finished.
871
872 """
873 assert self.src_cbs is None
874 assert dtp.src_export == ie
875 assert dtp.dest_import
876
877 if ie.success:
878 self.feedback_fn("%s finished sending data" % dtp.data.name)
879 else:
880 self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
881 (dtp.data.name, ie.final_message, ie.recent_output))
882
883 dtp.RecordResult(ie.success)
884
885 cb = dtp.data.finished_fn
886 if cb:
887 cb()
888
889
890
891 if dtp.dest_import and not ie.success:
892 dtp.dest_import.Abort()
893
897 """Called when daemon started listening.
898
899 """
900 assert self.src_cbs
901 assert dtp.src_export is None
902 assert dtp.dest_import
903 assert dtp.export_opts
904
905 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
906
907
908 de = DiskExport(self.lu, self.src_node, dtp.export_opts,
909 self.dest_ip, ie.listen_port, self.instance,
910 component, dtp.data.src_io, dtp.data.src_ioargs,
911 self.timeouts, self.src_cbs, private=dtp)
912 ie.loop.Add(de)
913
914 dtp.src_export = de
915
917 """Called when a connection has been established.
918
919 """
920 self.feedback_fn("%s is receiving data on %s" %
921 (dtp.data.name, self.dest_node))
922
924 """Called when a transfer has finished.
925
926 """
927 if ie.success:
928 self.feedback_fn("%s finished receiving data" % dtp.data.name)
929 else:
930 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
931 (dtp.data.name, ie.final_message, ie.recent_output))
932
933 dtp.RecordResult(ie.success)
934
935
936
937 if dtp.src_export and not ie.success:
938 dtp.src_export.Abort()
939
942 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
943 finished_fn):
944 """Initializes this class.
945
946 @type name: string
947 @param name: User-visible name for this transfer (e.g. "disk/0")
948 @param src_io: Source I/O type
949 @param src_ioargs: Source I/O arguments
950 @param dest_io: Destination I/O type
951 @param dest_ioargs: Destination I/O arguments
952 @type finished_fn: callable
953 @param finished_fn: Function called once transfer has finished
954
955 """
956 self.name = name
957
958 self.src_io = src_io
959 self.src_ioargs = src_ioargs
960
961 self.dest_io = dest_io
962 self.dest_ioargs = dest_ioargs
963
964 self.finished_fn = finished_fn
965
968 - def __init__(self, data, success, export_opts):
969 """Initializes this class.
970
971 @type data: L{DiskTransfer}
972 @type success: bool
973
974 """
975 self.data = data
976 self.success = success
977 self.export_opts = export_opts
978
979 self.src_export = None
980 self.dest_import = None
981
983 """Updates the status.
984
985 One failed part will cause the whole transfer to fail.
986
987 """
988 self.success = self.success and success
989
992 """Computes the magic value for a disk export or import.
993
994 @type base: string
995 @param base: Random seed value (can be the same for all disks of a transfer)
996 @type instance_name: string
997 @param instance_name: Name of instance
998 @type index: number
999 @param index: Disk index
1000
1001 """
1002 h = compat.sha1_hash()
1003 h.update(str(constants.RIE_VERSION))
1004 h.update(base)
1005 h.update(instance_name)
1006 h.update(str(index))
1007 return h.hexdigest()
1008
1009
1010 -def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1011 instance, all_transfers):
1012 """Transfers an instance's data from one node to another.
1013
1014 @param lu: Logical unit instance
1015 @param feedback_fn: Feedback function
1016 @type src_node: string
1017 @param src_node: Source node name
1018 @type dest_node: string
1019 @param dest_node: Destination node name
1020 @type dest_ip: string
1021 @param dest_ip: IP address of destination node
1022 @type instance: L{objects.Instance}
1023 @param instance: Instance object
1024 @type all_transfers: list of L{DiskTransfer} instances
1025 @param all_transfers: List of all disk transfers to be made
1026 @rtype: list
1027 @return: List with a boolean (True=successful, False=failed) for success for
1028 each transfer
1029
1030 """
1031
1032 compress = constants.IEC_NONE
1033
1034 logging.debug("Source node %s, destination node %s, compression '%s'",
1035 src_node, dest_node, compress)
1036
1037 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1038 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1039 src_node, None, dest_node, dest_ip)
1040 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1041 src_node, src_cbs, dest_node, dest_ip)
1042
1043 all_dtp = []
1044
1045 base_magic = utils.GenerateSecret(6)
1046
1047 ieloop = ImportExportLoop(lu)
1048 try:
1049 for idx, transfer in enumerate(all_transfers):
1050 if transfer:
1051 feedback_fn("Exporting %s from %s to %s" %
1052 (transfer.name, src_node, dest_node))
1053
1054 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1055 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1056 compress=compress, magic=magic)
1057
1058 dtp = _DiskTransferPrivate(transfer, True, opts)
1059
1060 di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1061 transfer.dest_io, transfer.dest_ioargs,
1062 timeouts, dest_cbs, private=dtp)
1063 ieloop.Add(di)
1064
1065 dtp.dest_import = di
1066 else:
1067 dtp = _DiskTransferPrivate(None, False, None)
1068
1069 all_dtp.append(dtp)
1070
1071 ieloop.Run()
1072 finally:
1073 ieloop.FinalizeAll()
1074
1075 assert len(all_dtp) == len(all_transfers)
1076 assert compat.all((dtp.src_export is None or
1077 dtp.src_export.success is not None) and
1078 (dtp.dest_import is None or
1079 dtp.dest_import.success is not None)
1080 for dtp in all_dtp), \
1081 "Not all imports/exports are finalized"
1082
1083 return [bool(dtp.success) for dtp in all_dtp]
1084
1087 - def __init__(self, feedback_fn, disk_count):
1088 """Initializes this class.
1089
1090 """
1091 ImportExportCbBase.__init__(self)
1092 self._feedback_fn = feedback_fn
1093 self._dresults = [None] * disk_count
1094
1095 @property
1097 """Returns per-disk results.
1098
1099 """
1100 return self._dresults
1101
1103 """Called when a connection has been established.
1104
1105 """
1106 (idx, _) = private
1107
1108 self._feedback_fn("Disk %s is now sending data" % idx)
1109
1111 """Called when new progress information should be reported.
1112
1113 """
1114 (idx, _) = private
1115
1116 progress = ie.progress
1117 if not progress:
1118 return
1119
1120 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1121
1123 """Called when a transfer has finished.
1124
1125 """
1126 (idx, finished_fn) = private
1127
1128 if ie.success:
1129 self._feedback_fn("Disk %s finished sending data" % idx)
1130 else:
1131 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1132 (idx, ie.final_message, ie.recent_output))
1133
1134 self._dresults[idx] = bool(ie.success)
1135
1136 if finished_fn:
1137 finished_fn()
1138
1141 - def __init__(self, lu, feedback_fn, instance):
1142 """Initializes this class.
1143
1144 @param lu: Logical unit instance
1145 @param feedback_fn: Feedback function
1146 @type instance: L{objects.Instance}
1147 @param instance: Instance object
1148
1149 """
1150 self._lu = lu
1151 self._feedback_fn = feedback_fn
1152 self._instance = instance
1153
1154 self._snap_disks = []
1155 self._removed_snaps = [False] * len(instance.disks)
1156
1158 """Creates an LVM snapshot for every disk of the instance.
1159
1160 """
1161 assert not self._snap_disks
1162
1163 instance = self._instance
1164 src_node = instance.primary_node
1165
1166 for idx, disk in enumerate(instance.disks):
1167 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1168 (idx, src_node))
1169
1170
1171
1172 result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1173 new_dev = False
1174 msg = result.fail_msg
1175 if msg:
1176 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1177 idx, src_node, msg)
1178 elif (not isinstance(result.payload, (tuple, list)) or
1179 len(result.payload) != 2):
1180 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1181 " result '%s'", idx, src_node, result.payload)
1182 else:
1183 disk_id = tuple(result.payload)
1184 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1185 logical_id=disk_id, physical_id=disk_id,
1186 iv_name=disk.iv_name)
1187
1188 self._snap_disks.append(new_dev)
1189
1190 assert len(self._snap_disks) == len(instance.disks)
1191 assert len(self._removed_snaps) == len(instance.disks)
1192
1194 """Removes an LVM snapshot.
1195
1196 @type disk_index: number
1197 @param disk_index: Index of the snapshot to be removed
1198
1199 """
1200 disk = self._snap_disks[disk_index]
1201 if disk and not self._removed_snaps[disk_index]:
1202 src_node = self._instance.primary_node
1203
1204 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1205 (disk_index, src_node))
1206
1207 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1208 if result.fail_msg:
1209 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1210 " %s: %s", disk_index, src_node, result.fail_msg)
1211 else:
1212 self._removed_snaps[disk_index] = True
1213
1215 """Intra-cluster instance export.
1216
1217 @type dest_node: L{objects.Node}
1218 @param dest_node: Destination node
1219
1220 """
1221 instance = self._instance
1222 src_node = instance.primary_node
1223
1224 assert len(self._snap_disks) == len(instance.disks)
1225
1226 transfers = []
1227
1228 for idx, dev in enumerate(self._snap_disks):
1229 if not dev:
1230 transfers.append(None)
1231 continue
1232
1233 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1234 dev.physical_id[1])
1235
1236 finished_fn = compat.partial(self._TransferFinished, idx)
1237
1238
1239 dt = DiskTransfer("snapshot/%s" % idx,
1240 constants.IEIO_SCRIPT, (dev, idx),
1241 constants.IEIO_FILE, (path, ),
1242 finished_fn)
1243 transfers.append(dt)
1244
1245
1246 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1247 src_node, dest_node.name,
1248 dest_node.secondary_ip,
1249 instance, transfers)
1250
1251 assert len(dresults) == len(instance.disks)
1252
1253 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1254 result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1255 self._snap_disks)
1256 msg = result.fail_msg
1257 fin_resu = not msg
1258 if msg:
1259 self._lu.LogWarning("Could not finalize export for instance %s"
1260 " on node %s: %s", instance.name, dest_node.name, msg)
1261
1262 return (fin_resu, dresults)
1263
1264 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1265 """Inter-cluster instance export.
1266
1267 @type disk_info: list
1268 @param disk_info: Per-disk destination information
1269 @type key_name: string
1270 @param key_name: Name of X509 key to use
1271 @type dest_ca_pem: string
1272 @param dest_ca_pem: Destination X509 CA in PEM format
1273 @type timeouts: L{ImportExportTimeouts}
1274 @param timeouts: Timeouts for this import
1275
1276 """
1277 instance = self._instance
1278
1279 assert len(disk_info) == len(instance.disks)
1280
1281 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1282
1283 ieloop = ImportExportLoop(self._lu)
1284 try:
1285 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1286 disk_info)):
1287
1288 ipv6 = netutils.IP6Address.IsValid(host)
1289
1290 opts = objects.ImportExportOptions(key_name=key_name,
1291 ca_pem=dest_ca_pem,
1292 magic=magic, ipv6=ipv6)
1293
1294 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1295 finished_fn = compat.partial(self._TransferFinished, idx)
1296 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1297 opts, host, port, instance, "disk%d" % idx,
1298 constants.IEIO_SCRIPT, (dev, idx),
1299 timeouts, cbs, private=(idx, finished_fn)))
1300
1301 ieloop.Run()
1302 finally:
1303 ieloop.FinalizeAll()
1304
1305 return (True, cbs.disk_results)
1306
1308 """Called once a transfer has finished.
1309
1310 @type idx: number
1311 @param idx: Disk index
1312
1313 """
1314 logging.debug("Transfer %s finished", idx)
1315 self._RemoveSnapshot(idx)
1316
1318 """Remove all snapshots.
1319
1320 """
1321 assert len(self._removed_snaps) == len(self._instance.disks)
1322 for idx in range(len(self._instance.disks)):
1323 self._RemoveSnapshot(idx)
1324
1327 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1328 external_address):
1329 """Initializes this class.
1330
1331 @type cds: string
1332 @param cds: Cluster domain secret
1333 @type x509_cert_pem: string
1334 @param x509_cert_pem: CA used for signing import key
1335 @type disk_count: number
1336 @param disk_count: Number of disks
1337 @type external_address: string
1338 @param external_address: External address of destination node
1339
1340 """
1341 ImportExportCbBase.__init__(self)
1342 self._feedback_fn = feedback_fn
1343 self._cds = cds
1344 self._x509_cert_pem = x509_cert_pem
1345 self._disk_count = disk_count
1346 self._external_address = external_address
1347
1348 self._dresults = [None] * disk_count
1349 self._daemon_port = [None] * disk_count
1350
1351 self._salt = utils.GenerateSecret(8)
1352
1353 @property
1355 """Returns per-disk results.
1356
1357 """
1358 return self._dresults
1359
1361 """Checks whether all daemons are listening.
1362
1363 If all daemons are listening, the information is sent to the client.
1364
1365 """
1366 if not compat.all(dp is not None for dp in self._daemon_port):
1367 return
1368
1369 host = self._external_address
1370
1371 disks = []
1372 for idx, (port, magic) in enumerate(self._daemon_port):
1373 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1374 idx, host, port, magic))
1375
1376 assert len(disks) == self._disk_count
1377
1378 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1379 "disks": disks,
1380 "x509_ca": self._x509_cert_pem,
1381 })
1382
1384 """Called when daemon started listening.
1385
1386 """
1387 (idx, ) = private
1388
1389 self._feedback_fn("Disk %s is now listening" % idx)
1390
1391 assert self._daemon_port[idx] is None
1392
1393 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1394
1395 self._CheckAllListening()
1396
1398 """Called when a connection has been established.
1399
1400 """
1401 (idx, ) = private
1402
1403 self._feedback_fn("Disk %s is now receiving data" % idx)
1404
1406 """Called when a transfer has finished.
1407
1408 """
1409 (idx, ) = private
1410
1411
1412 self._daemon_port[idx] = None
1413
1414 if ie.success:
1415 self._feedback_fn("Disk %s finished receiving data" % idx)
1416 else:
1417 self._feedback_fn(("Disk %s failed to receive data: %s"
1418 " (recent output: %s)") %
1419 (idx, ie.final_message, ie.recent_output))
1420
1421 self._dresults[idx] = bool(ie.success)
1422
1423
1424 -def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1425 cds, timeouts):
1426 """Imports an instance from another cluster.
1427
1428 @param lu: Logical unit instance
1429 @param feedback_fn: Feedback function
1430 @type instance: L{objects.Instance}
1431 @param instance: Instance object
1432 @type pnode: L{objects.Node}
1433 @param pnode: Primary node of instance as an object
1434 @type source_x509_ca: OpenSSL.crypto.X509
1435 @param source_x509_ca: Import source's X509 CA
1436 @type cds: string
1437 @param cds: Cluster domain secret
1438 @type timeouts: L{ImportExportTimeouts}
1439 @param timeouts: Timeouts for this import
1440
1441 """
1442 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1443 source_x509_ca)
1444
1445 magic_base = utils.GenerateSecret(6)
1446
1447
1448 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1449
1450
1451 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1452 constants.RIE_CERT_VALIDITY)
1453 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1454
1455 (x509_key_name, x509_cert_pem) = result.payload
1456 try:
1457
1458 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1459 x509_cert_pem)
1460
1461
1462 signed_x509_cert_pem = \
1463 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1464
1465 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1466 len(instance.disks), pnode.primary_ip)
1467
1468 ieloop = ImportExportLoop(lu)
1469 try:
1470 for idx, dev in enumerate(instance.disks):
1471 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1472
1473
1474 opts = objects.ImportExportOptions(key_name=x509_key_name,
1475 ca_pem=source_ca_pem,
1476 magic=magic, ipv6=ipv6)
1477
1478 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1479 "disk%d" % idx,
1480 constants.IEIO_SCRIPT, (dev, idx),
1481 timeouts, cbs, private=(idx, )))
1482
1483 ieloop.Run()
1484 finally:
1485 ieloop.FinalizeAll()
1486 finally:
1487
1488 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1489 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1490
1491 return cbs.disk_results
1492
1495 """Returns the handshake message for a RIE protocol version.
1496
1497 @type version: number
1498
1499 """
1500 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1501
1513
1516 """Checks the handshake of a remote import/export.
1517
1518 @type cds: string
1519 @param cds: Cluster domain secret
1520 @type handshake: sequence
1521 @param handshake: Handshake sent by remote peer
1522
1523 """
1524 try:
1525 (version, hmac_digest, hmac_salt) = handshake
1526 except (TypeError, ValueError), err:
1527 return "Invalid data: %s" % err
1528
1529 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1530 hmac_digest, salt=hmac_salt):
1531 return "Hash didn't match, clusters don't share the same domain secret"
1532
1533 if version != constants.RIE_VERSION:
1534 return ("Clusters don't have the same remote import/export protocol"
1535 " (local=%s, remote=%s)" %
1536 (constants.RIE_VERSION, version))
1537
1538 return None
1539
1542 """Returns the hashed text for import/export disk information.
1543
1544 @type disk_index: number
1545 @param disk_index: Index of disk (included in hash)
1546 @type host: string
1547 @param host: Hostname
1548 @type port: number
1549 @param port: Daemon port
1550 @type magic: string
1551 @param magic: Magic value
1552
1553 """
1554 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1555
1558 """Verifies received disk information for an export.
1559
1560 @type cds: string
1561 @param cds: Cluster domain secret
1562 @type disk_index: number
1563 @param disk_index: Index of disk (included in hash)
1564 @type disk_info: sequence
1565 @param disk_info: Disk information sent by remote peer
1566
1567 """
1568 try:
1569 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1570 except (TypeError, ValueError), err:
1571 raise errors.GenericError("Invalid data: %s" % err)
1572
1573 if not (host and port and magic):
1574 raise errors.GenericError("Missing destination host, port or magic")
1575
1576 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1577
1578 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1579 raise errors.GenericError("HMAC is wrong")
1580
1581 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1582 destination = host
1583 else:
1584 destination = netutils.Hostname.GetNormalizedName(host)
1585
1586 return (destination,
1587 utils.ValidateServiceName(port),
1588 magic)
1589
1592 """Computes the signed disk information for a remote import.
1593
1594 @type cds: string
1595 @param cds: Cluster domain secret
1596 @type salt: string
1597 @param salt: HMAC salt
1598 @type disk_index: number
1599 @param disk_index: Index of disk (included in hash)
1600 @type host: string
1601 @param host: Hostname
1602 @type port: number
1603 @param port: Daemon port
1604 @type magic: string
1605 @param magic: Magic value
1606
1607 """
1608 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1609 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1610 return (host, port, magic, hmac_digest, salt)
1611