1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
39 """Local exception to report import/export errors.
40
41 """
42
45
46 DEFAULT_READY_TIMEOUT = 10
47
48
49 DEFAULT_ERROR_TIMEOUT = 10
50
51
52 DEFAULT_LISTEN_TIMEOUT = 10
53
54
55 DEFAULT_PROGRESS_INTERVAL = 60
56
57 __slots__ = [
58 "error",
59 "ready",
60 "listen",
61 "connect",
62 "progress",
63 ]
64
70 """Initializes this class.
71
72 @type connect: number
73 @param connect: Timeout for establishing connection
74 @type listen: number
75 @param listen: Timeout for starting to listen for connections
76 @type error: number
77 @param error: Length of time until errors cause hard failure
78 @type ready: number
79 @param ready: Timeout for daemon to become ready
80 @type progress: number
81 @param progress: Progress update interval
82
83 """
84 self.error = error
85 self.ready = ready
86 self.listen = listen
87 self.connect = connect
88 self.progress = progress
89
92 """Callbacks for disk import/export.
93
94 """
96 """Called when daemon started listening.
97
98 @type ie: Subclass of L{_DiskImportExportBase}
99 @param ie: Import/export object
100 @param private: Private data passed to import/export object
101
102 """
103
105 """Called when a connection has been established.
106
107 @type ie: Subclass of L{_DiskImportExportBase}
108 @param ie: Import/export object
109 @param private: Private data passed to import/export object
110
111 """
112
114 """Called when new progress information should be reported.
115
116 @type ie: Subclass of L{_DiskImportExportBase}
117 @param ie: Import/export object
118 @param private: Private data passed to import/export object
119
120 """
121
123 """Called when a transfer has finished.
124
125 @type ie: Subclass of L{_DiskImportExportBase}
126 @param ie: Import/export object
127 @param private: Private data passed to import/export object
128
129 """
130
133 """Checks whether a timeout has expired.
134
135 """
136 return _time_fn() > (epoch + timeout)
137
140 MODE_TEXT = None
141
142 - def __init__(self, lu, node_name, opts,
143 instance, timeouts, cbs, private=None):
144 """Initializes this class.
145
146 @param lu: Logical unit instance
147 @type node_name: string
148 @param node_name: Node name for import
149 @type opts: L{objects.ImportExportOptions}
150 @param opts: Import/export daemon options
151 @type instance: L{objects.Instance}
152 @param instance: Instance object
153 @type timeouts: L{ImportExportTimeouts}
154 @param timeouts: Timeouts for this import
155 @type cbs: L{ImportExportCbBase}
156 @param cbs: Callbacks
157 @param private: Private data for callback functions
158
159 """
160 assert self.MODE_TEXT
161
162 self._lu = lu
163 self.node_name = node_name
164 self._opts = opts.Copy()
165 self._instance = instance
166 self._timeouts = timeouts
167 self._cbs = cbs
168 self._private = private
169
170
171 assert self._opts.connect_timeout is None
172 self._opts.connect_timeout = timeouts.connect
173
174
175 self._loop = None
176
177
178 self._ts_begin = None
179 self._ts_connected = None
180 self._ts_finished = None
181 self._ts_cleanup = None
182 self._ts_last_progress = None
183 self._ts_last_error = None
184
185
186 self.success = None
187 self.final_message = None
188
189
190 self._daemon_name = None
191 self._daemon = None
192
193 @property
195 """Returns the most recent output from the daemon.
196
197 """
198 if self._daemon:
199 return "\n".join(self._daemon.recent_output)
200
201 return None
202
203 @property
205 """Returns transfer progress information.
206
207 """
208 if not self._daemon:
209 return None
210
211 return (self._daemon.progress_mbytes,
212 self._daemon.progress_throughput,
213 self._daemon.progress_percent,
214 self._daemon.progress_eta)
215
216 @property
218 """Returns the magic value for this import/export.
219
220 """
221 return self._opts.magic
222
223 @property
225 """Determines whether this transport is still active.
226
227 """
228 return self.success is None
229
230 @property
232 """Returns parent loop.
233
234 @rtype: L{ImportExportLoop}
235
236 """
237 return self._loop
238
240 """Sets the parent loop.
241
242 @type loop: L{ImportExportLoop}
243
244 """
245 if self._loop:
246 raise errors.ProgrammerError("Loop can only be set once")
247
248 self._loop = loop
249
251 """Starts the import/export daemon.
252
253 """
254 raise NotImplementedError()
255
257 """Checks whether daemon has been started and if not, starts it.
258
259 @rtype: string
260 @return: Daemon name
261
262 """
263 assert self._ts_cleanup is None
264
265 if self._daemon_name is None:
266 assert self._ts_begin is None
267
268 result = self._StartDaemon()
269 if result.fail_msg:
270 raise _ImportExportError("Failed to start %s on %s: %s" %
271 (self.MODE_TEXT, self.node_name,
272 result.fail_msg))
273
274 daemon_name = result.payload
275
276 logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
277 self.node_name)
278
279 self._ts_begin = time.time()
280 self._daemon_name = daemon_name
281
282 return self._daemon_name
283
285 """Returns the daemon name.
286
287 """
288 assert self._daemon_name, "Daemon has not been started"
289 assert self._ts_cleanup is None
290 return self._daemon_name
291
293 """Sends SIGTERM to import/export daemon (if still active).
294
295 """
296 if self._daemon_name:
297 self._lu.LogWarning("Aborting %s %r on %s",
298 self.MODE_TEXT, self._daemon_name, self.node_name)
299 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
300 if result.fail_msg:
301 self._lu.LogWarning("Failed to abort %s %r on %s: %s",
302 self.MODE_TEXT, self._daemon_name,
303 self.node_name, result.fail_msg)
304 return False
305
306 return True
307
309 """Internal function for updating status daemon data.
310
311 @type data: L{objects.ImportExportStatus}
312 @param data: Daemon status data
313
314 """
315 assert self._ts_begin is not None
316
317 if not data:
318 if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
319 raise _ImportExportError("Didn't become ready after %s seconds" %
320 self._timeouts.ready)
321
322 return False
323
324 self._daemon = data
325
326 return True
327
329 """Updates daemon status data.
330
331 @type success: bool
332 @param success: Whether fetching data was successful or not
333 @type data: L{objects.ImportExportStatus}
334 @param data: Daemon status data
335
336 """
337 if not success:
338 if self._ts_last_error is None:
339 self._ts_last_error = time.time()
340
341 elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
342 raise _ImportExportError("Too many errors while updating data")
343
344 return False
345
346 self._ts_last_error = None
347
348 return self._SetDaemonData(data)
349
351 """Checks whether the daemon is listening.
352
353 """
354 raise NotImplementedError()
355
357 """Returns timeout to calculate connect timeout.
358
359 """
360 raise NotImplementedError()
361
363 """Checks whether the daemon is connected.
364
365 @rtype: bool
366 @return: Whether the daemon is connected
367
368 """
369 assert self._daemon, "Daemon status missing"
370
371 if self._ts_connected is not None:
372 return True
373
374 if self._daemon.connected:
375 self._ts_connected = time.time()
376
377
378 logging.debug("%s %r on %s is now connected",
379 self.MODE_TEXT, self._daemon_name, self.node_name)
380
381 self._cbs.ReportConnected(self, self._private)
382
383 return True
384
385 if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
386 raise _ImportExportError("Not connected after %s seconds" %
387 self._timeouts.connect)
388
389 return False
390
392 """Checks whether a progress update should be reported.
393
394 """
395 if ((self._ts_last_progress is None or
396 _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
397 self._daemon and
398 self._daemon.progress_mbytes is not None and
399 self._daemon.progress_throughput is not None):
400 self._cbs.ReportProgress(self, self._private)
401 self._ts_last_progress = time.time()
402
404 """Checks whether the daemon exited.
405
406 @rtype: bool
407 @return: Whether the transfer is finished
408
409 """
410 assert self._daemon, "Daemon status missing"
411
412 if self._ts_finished:
413 return True
414
415 if self._daemon.exit_status is None:
416
417 self._CheckProgress()
418 return False
419
420 self._ts_finished = time.time()
421
422 self._ReportFinished(self._daemon.exit_status == 0,
423 self._daemon.error_message)
424
425 return True
426
428 """Transfer is finished or daemon exited.
429
430 @type success: bool
431 @param success: Whether the transfer was successful
432 @type message: string
433 @param message: Error message
434
435 """
436 assert self.success is None
437
438 self.success = success
439 self.final_message = message
440
441 if success:
442 logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
443 self.node_name)
444 elif self._daemon_name:
445 self._lu.LogWarning("%s %r on %s failed: %s",
446 self.MODE_TEXT, self._daemon_name, self.node_name,
447 message)
448 else:
449 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
450 self.node_name, message)
451
452 self._cbs.ReportFinished(self, self._private)
453
455 """Makes the RPC call to finalize this import/export.
456
457 """
458 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
459
461 """Finalizes this import/export.
462
463 """
464 if self._daemon_name:
465 logging.info("Finalizing %s %r on %s",
466 self.MODE_TEXT, self._daemon_name, self.node_name)
467
468 result = self._Finalize()
469 if result.fail_msg:
470 self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
471 self.MODE_TEXT, self._daemon_name,
472 self.node_name, result.fail_msg)
473 return False
474
475
476 self._daemon_name = None
477 self._ts_cleanup = time.time()
478
479 if error:
480 self._ReportFinished(False, error)
481
482 return True
483
486 MODE_TEXT = "import"
487
488 - def __init__(self, lu, node_name, opts, instance,
489 dest, dest_args, timeouts, cbs, private=None):
490 """Initializes this class.
491
492 @param lu: Logical unit instance
493 @type node_name: string
494 @param node_name: Node name for import
495 @type opts: L{objects.ImportExportOptions}
496 @param opts: Import/export daemon options
497 @type instance: L{objects.Instance}
498 @param instance: Instance object
499 @param dest: I/O destination
500 @param dest_args: I/O arguments
501 @type timeouts: L{ImportExportTimeouts}
502 @param timeouts: Timeouts for this import
503 @type cbs: L{ImportExportCbBase}
504 @param cbs: Callbacks
505 @param private: Private data for callback functions
506
507 """
508 _DiskImportExportBase.__init__(self, lu, node_name, opts,
509 instance, timeouts, cbs, private)
510 self._dest = dest
511 self._dest_args = dest_args
512
513
514 self._ts_listening = None
515
516 @property
518 """Returns the port the daemon is listening on.
519
520 """
521 if self._daemon:
522 return self._daemon.listen_port
523
524 return None
525
527 """Starts the import daemon.
528
529 """
530 return self._lu.rpc.call_import_start(self.node_name, self._opts,
531 self._instance,
532 self._dest, self._dest_args)
533
535 """Checks whether the daemon is listening.
536
537 @rtype: bool
538 @return: Whether the daemon is listening
539
540 """
541 assert self._daemon, "Daemon status missing"
542
543 if self._ts_listening is not None:
544 return True
545
546 port = self._daemon.listen_port
547 if port is not None:
548 self._ts_listening = time.time()
549
550 logging.debug("Import %r on %s is now listening on port %s",
551 self._daemon_name, self.node_name, port)
552
553 self._cbs.ReportListening(self, self._private)
554
555 return True
556
557 if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
558 raise _ImportExportError("Not listening after %s seconds" %
559 self._timeouts.listen)
560
561 return False
562
564 """Returns the time since we started listening.
565
566 """
567 assert self._ts_listening is not None, \
568 ("Checking whether an import is connected is only useful"
569 " once it's been listening")
570
571 return self._ts_listening
572
575 MODE_TEXT = "export"
576
577 - def __init__(self, lu, node_name, opts,
578 dest_host, dest_port, instance, source, source_args,
579 timeouts, cbs, private=None):
580 """Initializes this class.
581
582 @param lu: Logical unit instance
583 @type node_name: string
584 @param node_name: Node name for import
585 @type opts: L{objects.ImportExportOptions}
586 @param opts: Import/export daemon options
587 @type dest_host: string
588 @param dest_host: Destination host name or IP address
589 @type dest_port: number
590 @param dest_port: Destination port number
591 @type instance: L{objects.Instance}
592 @param instance: Instance object
593 @param source: I/O source
594 @param source_args: I/O source
595 @type timeouts: L{ImportExportTimeouts}
596 @param timeouts: Timeouts for this import
597 @type cbs: L{ImportExportCbBase}
598 @param cbs: Callbacks
599 @param private: Private data for callback functions
600
601 """
602 _DiskImportExportBase.__init__(self, lu, node_name, opts,
603 instance, timeouts, cbs, private)
604 self._dest_host = dest_host
605 self._dest_port = dest_port
606 self._source = source
607 self._source_args = source_args
608
610 """Starts the export daemon.
611
612 """
613 return self._lu.rpc.call_export_start(self.node_name, self._opts,
614 self._dest_host, self._dest_port,
615 self._instance, self._source,
616 self._source_args)
617
619 """Checks whether the daemon is listening.
620
621 """
622
623 return True
624
626 """Returns the time since the daemon started.
627
628 """
629 assert self._ts_begin is not None
630
631 return self._ts_begin
632
654
657 MIN_DELAY = 1.0
658 MAX_DELAY = 20.0
659
661 """Initializes this class.
662
663 """
664 self._lu = lu
665 self._queue = []
666 self._pending_add = []
667
668 - def Add(self, diskie):
669 """Adds an import/export object to the loop.
670
671 @type diskie: Subclass of L{_DiskImportExportBase}
672 @param diskie: Import/export object
673
674 """
675 assert diskie not in self._pending_add
676 assert diskie.loop is None
677
678 diskie.SetLoop(self)
679
680
681
682
683 self._pending_add.append(diskie)
684
685 @staticmethod
687 """Collects the status for all import/export daemons.
688
689 """
690 daemon_status = {}
691
692 for node_name, names in daemons.iteritems():
693 result = lu.rpc.call_impexp_status(node_name, names)
694 if result.fail_msg:
695 lu.LogWarning("Failed to get daemon status on %s: %s",
696 node_name, result.fail_msg)
697 continue
698
699 assert len(names) == len(result.payload)
700
701 daemon_status[node_name] = dict(zip(names, result.payload))
702
703 return daemon_status
704
705 @staticmethod
707 """Gets the names of all active daemons.
708
709 """
710 result = {}
711 for diskie in queue:
712 if not diskie.active:
713 continue
714
715 try:
716
717 daemon_name = diskie.CheckDaemon()
718 except _ImportExportError, err:
719 logging.exception("%s failed", diskie.MODE_TEXT)
720 diskie.Finalize(error=str(err))
721 continue
722
723 result.setdefault(diskie.node_name, []).append(daemon_name)
724
725 assert len(queue) >= len(result)
726 assert len(queue) >= sum([len(names) for names in result.itervalues()])
727
728 logging.debug("daemons=%r", result)
729
730 return result
731
733 """Adds all pending import/export objects to the internal queue.
734
735 """
736 assert compat.all(diskie not in self._queue and diskie.loop == self
737 for diskie in self._pending_add)
738
739 self._queue.extend(self._pending_add)
740
741 del self._pending_add[:]
742
744 """Utility main loop.
745
746 """
747 while True:
748 self._AddPendingToQueue()
749
750
751 daemons = self._GetActiveDaemonNames(self._queue)
752 if not daemons:
753 break
754
755
756 data = self._CollectDaemonStatus(self._lu, daemons)
757
758
759 delay = self.MAX_DELAY
760 for diskie in self._queue:
761 if not diskie.active:
762 continue
763
764 try:
765 try:
766 all_daemon_data = data[diskie.node_name]
767 except KeyError:
768 result = diskie.SetDaemonData(False, None)
769 else:
770 result = \
771 diskie.SetDaemonData(True,
772 all_daemon_data[diskie.GetDaemonName()])
773
774 if not result:
775
776 delay = min(3.0, delay)
777 continue
778
779 if diskie.CheckFinished():
780
781 diskie.Finalize()
782 continue
783
784
785 delay = min(5.0, delay)
786
787 if not diskie.CheckListening():
788
789 delay = min(1.0, delay)
790 continue
791
792 if not diskie.CheckConnected():
793
794 delay = min(1.0, delay)
795 continue
796
797 except _ImportExportError, err:
798 logging.exception("%s failed", diskie.MODE_TEXT)
799 diskie.Finalize(error=str(err))
800
801 if not compat.any(diskie.active for diskie in self._queue):
802 break
803
804
805 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
806 logging.debug("Waiting for %ss", delay)
807 time.sleep(delay)
808
810 """Finalizes all pending transfers.
811
812 """
813 success = True
814
815 for diskie in self._queue:
816 success = diskie.Finalize() and success
817
818 return success
819
822 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
823 dest_node, dest_ip):
824 """Initializes this class.
825
826 """
827 ImportExportCbBase.__init__(self)
828
829 self.lu = lu
830 self.feedback_fn = feedback_fn
831 self.instance = instance
832 self.timeouts = timeouts
833 self.src_node = src_node
834 self.src_cbs = src_cbs
835 self.dest_node = dest_node
836 self.dest_ip = dest_ip
837
841 """Called when a connection has been established.
842
843 """
844 assert self.src_cbs is None
845 assert dtp.src_export == ie
846 assert dtp.dest_import
847
848 self.feedback_fn("%s is sending data on %s" %
849 (dtp.data.name, ie.node_name))
850
860
862 """Called when a transfer has finished.
863
864 """
865 assert self.src_cbs is None
866 assert dtp.src_export == ie
867 assert dtp.dest_import
868
869 if ie.success:
870 self.feedback_fn("%s finished sending data" % dtp.data.name)
871 else:
872 self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
873 (dtp.data.name, ie.final_message, ie.recent_output))
874
875 dtp.RecordResult(ie.success)
876
877 cb = dtp.data.finished_fn
878 if cb:
879 cb()
880
881
882
883 if dtp.dest_import and not ie.success:
884 dtp.dest_import.Abort()
885
889 """Called when daemon started listening.
890
891 """
892 assert self.src_cbs
893 assert dtp.src_export is None
894 assert dtp.dest_import
895 assert dtp.export_opts
896
897 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
898
899
900 de = DiskExport(self.lu, self.src_node, dtp.export_opts,
901 self.dest_ip, ie.listen_port,
902 self.instance, dtp.data.src_io, dtp.data.src_ioargs,
903 self.timeouts, self.src_cbs, private=dtp)
904 ie.loop.Add(de)
905
906 dtp.src_export = de
907
909 """Called when a connection has been established.
910
911 """
912 self.feedback_fn("%s is receiving data on %s" %
913 (dtp.data.name, self.dest_node))
914
916 """Called when a transfer has finished.
917
918 """
919 if ie.success:
920 self.feedback_fn("%s finished receiving data" % dtp.data.name)
921 else:
922 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
923 (dtp.data.name, ie.final_message, ie.recent_output))
924
925 dtp.RecordResult(ie.success)
926
927
928
929 if dtp.src_export and not ie.success:
930 dtp.src_export.Abort()
931
934 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
935 finished_fn):
936 """Initializes this class.
937
938 @type name: string
939 @param name: User-visible name for this transfer (e.g. "disk/0")
940 @param src_io: Source I/O type
941 @param src_ioargs: Source I/O arguments
942 @param dest_io: Destination I/O type
943 @param dest_ioargs: Destination I/O arguments
944 @type finished_fn: callable
945 @param finished_fn: Function called once transfer has finished
946
947 """
948 self.name = name
949
950 self.src_io = src_io
951 self.src_ioargs = src_ioargs
952
953 self.dest_io = dest_io
954 self.dest_ioargs = dest_ioargs
955
956 self.finished_fn = finished_fn
957
960 - def __init__(self, data, success, export_opts):
961 """Initializes this class.
962
963 @type data: L{DiskTransfer}
964 @type success: bool
965
966 """
967 self.data = data
968 self.success = success
969 self.export_opts = export_opts
970
971 self.src_export = None
972 self.dest_import = None
973
975 """Updates the status.
976
977 One failed part will cause the whole transfer to fail.
978
979 """
980 self.success = self.success and success
981
984 """Computes the magic value for a disk export or import.
985
986 @type base: string
987 @param base: Random seed value (can be the same for all disks of a transfer)
988 @type instance_name: string
989 @param instance_name: Name of instance
990 @type index: number
991 @param index: Disk index
992
993 """
994 h = compat.sha1_hash()
995 h.update(str(constants.RIE_VERSION))
996 h.update(base)
997 h.update(instance_name)
998 h.update(str(index))
999 return h.hexdigest()
1000
1001
1002 -def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1003 instance, all_transfers):
1004 """Transfers an instance's data from one node to another.
1005
1006 @param lu: Logical unit instance
1007 @param feedback_fn: Feedback function
1008 @type src_node: string
1009 @param src_node: Source node name
1010 @type dest_node: string
1011 @param dest_node: Destination node name
1012 @type dest_ip: string
1013 @param dest_ip: IP address of destination node
1014 @type instance: L{objects.Instance}
1015 @param instance: Instance object
1016 @type all_transfers: list of L{DiskTransfer} instances
1017 @param all_transfers: List of all disk transfers to be made
1018 @rtype: list
1019 @return: List with a boolean (True=successful, False=failed) for success for
1020 each transfer
1021
1022 """
1023
1024 compress = constants.IEC_NONE
1025
1026 logging.debug("Source node %s, destination node %s, compression '%s'",
1027 src_node, dest_node, compress)
1028
1029 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1030 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1031 src_node, None, dest_node, dest_ip)
1032 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1033 src_node, src_cbs, dest_node, dest_ip)
1034
1035 all_dtp = []
1036
1037 base_magic = utils.GenerateSecret(6)
1038
1039 ieloop = ImportExportLoop(lu)
1040 try:
1041 for idx, transfer in enumerate(all_transfers):
1042 if transfer:
1043 feedback_fn("Exporting %s from %s to %s" %
1044 (transfer.name, src_node, dest_node))
1045
1046 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1047 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1048 compress=compress, magic=magic)
1049
1050 dtp = _DiskTransferPrivate(transfer, True, opts)
1051
1052 di = DiskImport(lu, dest_node, opts, instance,
1053 transfer.dest_io, transfer.dest_ioargs,
1054 timeouts, dest_cbs, private=dtp)
1055 ieloop.Add(di)
1056
1057 dtp.dest_import = di
1058 else:
1059 dtp = _DiskTransferPrivate(None, False, None)
1060
1061 all_dtp.append(dtp)
1062
1063 ieloop.Run()
1064 finally:
1065 ieloop.FinalizeAll()
1066
1067 assert len(all_dtp) == len(all_transfers)
1068 assert compat.all((dtp.src_export is None or
1069 dtp.src_export.success is not None) and
1070 (dtp.dest_import is None or
1071 dtp.dest_import.success is not None)
1072 for dtp in all_dtp), \
1073 "Not all imports/exports are finalized"
1074
1075 return [bool(dtp.success) for dtp in all_dtp]
1076
1079 - def __init__(self, feedback_fn, disk_count):
1080 """Initializes this class.
1081
1082 """
1083 ImportExportCbBase.__init__(self)
1084 self._feedback_fn = feedback_fn
1085 self._dresults = [None] * disk_count
1086
1087 @property
1089 """Returns per-disk results.
1090
1091 """
1092 return self._dresults
1093
1095 """Called when a connection has been established.
1096
1097 """
1098 (idx, _) = private
1099
1100 self._feedback_fn("Disk %s is now sending data" % idx)
1101
1103 """Called when new progress information should be reported.
1104
1105 """
1106 (idx, _) = private
1107
1108 progress = ie.progress
1109 if not progress:
1110 return
1111
1112 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1113
1115 """Called when a transfer has finished.
1116
1117 """
1118 (idx, finished_fn) = private
1119
1120 if ie.success:
1121 self._feedback_fn("Disk %s finished sending data" % idx)
1122 else:
1123 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1124 (idx, ie.final_message, ie.recent_output))
1125
1126 self._dresults[idx] = bool(ie.success)
1127
1128 if finished_fn:
1129 finished_fn()
1130
1133 - def __init__(self, lu, feedback_fn, instance):
1134 """Initializes this class.
1135
1136 @param lu: Logical unit instance
1137 @param feedback_fn: Feedback function
1138 @type instance: L{objects.Instance}
1139 @param instance: Instance object
1140
1141 """
1142 self._lu = lu
1143 self._feedback_fn = feedback_fn
1144 self._instance = instance
1145
1146 self._snap_disks = []
1147 self._removed_snaps = [False] * len(instance.disks)
1148
1150 """Creates an LVM snapshot for every disk of the instance.
1151
1152 """
1153 assert not self._snap_disks
1154
1155 instance = self._instance
1156 src_node = instance.primary_node
1157
1158 for idx, disk in enumerate(instance.disks):
1159 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1160 (idx, src_node))
1161
1162
1163
1164 result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1165 new_dev = False
1166 msg = result.fail_msg
1167 if msg:
1168 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1169 idx, src_node, msg)
1170 elif (not isinstance(result.payload, (tuple, list)) or
1171 len(result.payload) != 2):
1172 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1173 " result '%s'", idx, src_node, result.payload)
1174 else:
1175 disk_id = tuple(result.payload)
1176 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1177 logical_id=disk_id, physical_id=disk_id,
1178 iv_name=disk.iv_name)
1179
1180 self._snap_disks.append(new_dev)
1181
1182 assert len(self._snap_disks) == len(instance.disks)
1183 assert len(self._removed_snaps) == len(instance.disks)
1184
1186 """Removes an LVM snapshot.
1187
1188 @type disk_index: number
1189 @param disk_index: Index of the snapshot to be removed
1190
1191 """
1192 disk = self._snap_disks[disk_index]
1193 if disk and not self._removed_snaps[disk_index]:
1194 src_node = self._instance.primary_node
1195
1196 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1197 (disk_index, src_node))
1198
1199 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1200 if result.fail_msg:
1201 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1202 " %s: %s", disk_index, src_node, result.fail_msg)
1203 else:
1204 self._removed_snaps[disk_index] = True
1205
1207 """Intra-cluster instance export.
1208
1209 @type dest_node: L{objects.Node}
1210 @param dest_node: Destination node
1211
1212 """
1213 instance = self._instance
1214 src_node = instance.primary_node
1215
1216 assert len(self._snap_disks) == len(instance.disks)
1217
1218 transfers = []
1219
1220 for idx, dev in enumerate(self._snap_disks):
1221 if not dev:
1222 transfers.append(None)
1223 continue
1224
1225 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1226 dev.physical_id[1])
1227
1228 finished_fn = compat.partial(self._TransferFinished, idx)
1229
1230
1231 dt = DiskTransfer("snapshot/%s" % idx,
1232 constants.IEIO_SCRIPT, (dev, idx),
1233 constants.IEIO_FILE, (path, ),
1234 finished_fn)
1235 transfers.append(dt)
1236
1237
1238 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1239 src_node, dest_node.name,
1240 dest_node.secondary_ip,
1241 instance, transfers)
1242
1243 assert len(dresults) == len(instance.disks)
1244
1245 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1246 result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1247 self._snap_disks)
1248 msg = result.fail_msg
1249 fin_resu = not msg
1250 if msg:
1251 self._lu.LogWarning("Could not finalize export for instance %s"
1252 " on node %s: %s", instance.name, dest_node.name, msg)
1253
1254 return (fin_resu, dresults)
1255
1256 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1257 """Inter-cluster instance export.
1258
1259 @type disk_info: list
1260 @param disk_info: Per-disk destination information
1261 @type key_name: string
1262 @param key_name: Name of X509 key to use
1263 @type dest_ca_pem: string
1264 @param dest_ca_pem: Destination X509 CA in PEM format
1265 @type timeouts: L{ImportExportTimeouts}
1266 @param timeouts: Timeouts for this import
1267
1268 """
1269 instance = self._instance
1270
1271 assert len(disk_info) == len(instance.disks)
1272
1273 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1274
1275 ieloop = ImportExportLoop(self._lu)
1276 try:
1277 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1278 disk_info)):
1279
1280 ipv6 = netutils.IP6Address.IsValid(host)
1281
1282 opts = objects.ImportExportOptions(key_name=key_name,
1283 ca_pem=dest_ca_pem,
1284 magic=magic, ipv6=ipv6)
1285
1286 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1287 finished_fn = compat.partial(self._TransferFinished, idx)
1288 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1289 opts, host, port, instance,
1290 constants.IEIO_SCRIPT, (dev, idx),
1291 timeouts, cbs, private=(idx, finished_fn)))
1292
1293 ieloop.Run()
1294 finally:
1295 ieloop.FinalizeAll()
1296
1297 return (True, cbs.disk_results)
1298
1300 """Called once a transfer has finished.
1301
1302 @type idx: number
1303 @param idx: Disk index
1304
1305 """
1306 logging.debug("Transfer %s finished", idx)
1307 self._RemoveSnapshot(idx)
1308
1310 """Remove all snapshots.
1311
1312 """
1313 assert len(self._removed_snaps) == len(self._instance.disks)
1314 for idx in range(len(self._instance.disks)):
1315 self._RemoveSnapshot(idx)
1316
1319 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1320 external_address):
1321 """Initializes this class.
1322
1323 @type cds: string
1324 @param cds: Cluster domain secret
1325 @type x509_cert_pem: string
1326 @param x509_cert_pem: CA used for signing import key
1327 @type disk_count: number
1328 @param disk_count: Number of disks
1329 @type external_address: string
1330 @param external_address: External address of destination node
1331
1332 """
1333 ImportExportCbBase.__init__(self)
1334 self._feedback_fn = feedback_fn
1335 self._cds = cds
1336 self._x509_cert_pem = x509_cert_pem
1337 self._disk_count = disk_count
1338 self._external_address = external_address
1339
1340 self._dresults = [None] * disk_count
1341 self._daemon_port = [None] * disk_count
1342
1343 self._salt = utils.GenerateSecret(8)
1344
1345 @property
1347 """Returns per-disk results.
1348
1349 """
1350 return self._dresults
1351
1353 """Checks whether all daemons are listening.
1354
1355 If all daemons are listening, the information is sent to the client.
1356
1357 """
1358 if not compat.all(dp is not None for dp in self._daemon_port):
1359 return
1360
1361 host = self._external_address
1362
1363 disks = []
1364 for idx, (port, magic) in enumerate(self._daemon_port):
1365 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1366 idx, host, port, magic))
1367
1368 assert len(disks) == self._disk_count
1369
1370 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1371 "disks": disks,
1372 "x509_ca": self._x509_cert_pem,
1373 })
1374
1376 """Called when daemon started listening.
1377
1378 """
1379 (idx, ) = private
1380
1381 self._feedback_fn("Disk %s is now listening" % idx)
1382
1383 assert self._daemon_port[idx] is None
1384
1385 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1386
1387 self._CheckAllListening()
1388
1390 """Called when a connection has been established.
1391
1392 """
1393 (idx, ) = private
1394
1395 self._feedback_fn("Disk %s is now receiving data" % idx)
1396
1398 """Called when a transfer has finished.
1399
1400 """
1401 (idx, ) = private
1402
1403
1404 self._daemon_port[idx] = None
1405
1406 if ie.success:
1407 self._feedback_fn("Disk %s finished receiving data" % idx)
1408 else:
1409 self._feedback_fn(("Disk %s failed to receive data: %s"
1410 " (recent output: %s)") %
1411 (idx, ie.final_message, ie.recent_output))
1412
1413 self._dresults[idx] = bool(ie.success)
1414
1415
1416 -def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1417 cds, timeouts):
1418 """Imports an instance from another cluster.
1419
1420 @param lu: Logical unit instance
1421 @param feedback_fn: Feedback function
1422 @type instance: L{objects.Instance}
1423 @param instance: Instance object
1424 @type pnode: L{objects.Node}
1425 @param pnode: Primary node of instance as an object
1426 @type source_x509_ca: OpenSSL.crypto.X509
1427 @param source_x509_ca: Import source's X509 CA
1428 @type cds: string
1429 @param cds: Cluster domain secret
1430 @type timeouts: L{ImportExportTimeouts}
1431 @param timeouts: Timeouts for this import
1432
1433 """
1434 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1435 source_x509_ca)
1436
1437 magic_base = utils.GenerateSecret(6)
1438
1439
1440 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1441
1442
1443 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1444 constants.RIE_CERT_VALIDITY)
1445 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1446
1447 (x509_key_name, x509_cert_pem) = result.payload
1448 try:
1449
1450 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1451 x509_cert_pem)
1452
1453
1454 signed_x509_cert_pem = \
1455 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1456
1457 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1458 len(instance.disks), pnode.primary_ip)
1459
1460 ieloop = ImportExportLoop(lu)
1461 try:
1462 for idx, dev in enumerate(instance.disks):
1463 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1464
1465
1466 opts = objects.ImportExportOptions(key_name=x509_key_name,
1467 ca_pem=source_ca_pem,
1468 magic=magic, ipv6=ipv6)
1469
1470 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1471 constants.IEIO_SCRIPT, (dev, idx),
1472 timeouts, cbs, private=(idx, )))
1473
1474 ieloop.Run()
1475 finally:
1476 ieloop.FinalizeAll()
1477 finally:
1478
1479 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1480 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1481
1482 return cbs.disk_results
1483
1486 """Returns the handshake message for a RIE protocol version.
1487
1488 @type version: number
1489
1490 """
1491 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1492
1504
1507 """Checks the handshake of a remote import/export.
1508
1509 @type cds: string
1510 @param cds: Cluster domain secret
1511 @type handshake: sequence
1512 @param handshake: Handshake sent by remote peer
1513
1514 """
1515 try:
1516 (version, hmac_digest, hmac_salt) = handshake
1517 except (TypeError, ValueError), err:
1518 return "Invalid data: %s" % err
1519
1520 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1521 hmac_digest, salt=hmac_salt):
1522 return "Hash didn't match, clusters don't share the same domain secret"
1523
1524 if version != constants.RIE_VERSION:
1525 return ("Clusters don't have the same remote import/export protocol"
1526 " (local=%s, remote=%s)" %
1527 (constants.RIE_VERSION, version))
1528
1529 return None
1530
1533 """Returns the hashed text for import/export disk information.
1534
1535 @type disk_index: number
1536 @param disk_index: Index of disk (included in hash)
1537 @type host: string
1538 @param host: Hostname
1539 @type port: number
1540 @param port: Daemon port
1541 @type magic: string
1542 @param magic: Magic value
1543
1544 """
1545 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1546
1549 """Verifies received disk information for an export.
1550
1551 @type cds: string
1552 @param cds: Cluster domain secret
1553 @type disk_index: number
1554 @param disk_index: Index of disk (included in hash)
1555 @type disk_info: sequence
1556 @param disk_info: Disk information sent by remote peer
1557
1558 """
1559 try:
1560 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1561 except (TypeError, ValueError), err:
1562 raise errors.GenericError("Invalid data: %s" % err)
1563
1564 if not (host and port and magic):
1565 raise errors.GenericError("Missing destination host, port or magic")
1566
1567 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1568
1569 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1570 raise errors.GenericError("HMAC is wrong")
1571
1572 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1573 destination = host
1574 else:
1575 destination = netutils.Hostname.GetNormalizedName(host)
1576
1577 return (destination,
1578 utils.ValidateServiceName(port),
1579 magic)
1580
1583 """Computes the signed disk information for a remote import.
1584
1585 @type cds: string
1586 @param cds: Cluster domain secret
1587 @type salt: string
1588 @param salt: HMAC salt
1589 @type disk_index: number
1590 @param disk_index: Index of disk (included in hash)
1591 @type host: string
1592 @param host: Hostname
1593 @type port: number
1594 @param port: Daemon port
1595 @type magic: string
1596 @param magic: Magic value
1597
1598 """
1599 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1600 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1601 return (host, port, magic, hmac_digest, salt)
1602