1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
39 """Local exception to report import/export errors.
40
41 """
42
45
46 DEFAULT_READY_TIMEOUT = 10
47
48
49 DEFAULT_ERROR_TIMEOUT = 10
50
51
52 DEFAULT_LISTEN_TIMEOUT = 10
53
54
55 DEFAULT_PROGRESS_INTERVAL = 60
56
57 __slots__ = [
58 "error",
59 "ready",
60 "listen",
61 "connect",
62 "progress",
63 ]
64
70 """Initializes this class.
71
72 @type connect: number
73 @param connect: Timeout for establishing connection
74 @type listen: number
75 @param listen: Timeout for starting to listen for connections
76 @type error: number
77 @param error: Length of time until errors cause hard failure
78 @type ready: number
79 @param ready: Timeout for daemon to become ready
80 @type progress: number
81 @param progress: Progress update interval
82
83 """
84 self.error = error
85 self.ready = ready
86 self.listen = listen
87 self.connect = connect
88 self.progress = progress
89
92 """Callbacks for disk import/export.
93
94 """
96 """Called when daemon started listening.
97
98 @type ie: Subclass of L{_DiskImportExportBase}
99 @param ie: Import/export object
100 @param private: Private data passed to import/export object
101
102 """
103
105 """Called when a connection has been established.
106
107 @type ie: Subclass of L{_DiskImportExportBase}
108 @param ie: Import/export object
109 @param private: Private data passed to import/export object
110
111 """
112
114 """Called when new progress information should be reported.
115
116 @type ie: Subclass of L{_DiskImportExportBase}
117 @param ie: Import/export object
118 @param private: Private data passed to import/export object
119
120 """
121
123 """Called when a transfer has finished.
124
125 @type ie: Subclass of L{_DiskImportExportBase}
126 @param ie: Import/export object
127 @param private: Private data passed to import/export object
128
129 """
130
133 """Checks whether a timeout has expired.
134
135 """
136 return _time_fn() > (epoch + timeout)
137
140 MODE_TEXT = None
141
142 - def __init__(self, lu, node_name, opts,
143 instance, timeouts, cbs, private=None):
144 """Initializes this class.
145
146 @param lu: Logical unit instance
147 @type node_name: string
148 @param node_name: Node name for import
149 @type opts: L{objects.ImportExportOptions}
150 @param opts: Import/export daemon options
151 @type instance: L{objects.Instance}
152 @param instance: Instance object
153 @type timeouts: L{ImportExportTimeouts}
154 @param timeouts: Timeouts for this import
155 @type cbs: L{ImportExportCbBase}
156 @param cbs: Callbacks
157 @param private: Private data for callback functions
158
159 """
160 assert self.MODE_TEXT
161
162 self._lu = lu
163 self.node_name = node_name
164 self._opts = opts
165 self._instance = instance
166 self._timeouts = timeouts
167 self._cbs = cbs
168 self._private = private
169
170
171 self._loop = None
172
173
174 self._ts_begin = None
175 self._ts_connected = None
176 self._ts_finished = None
177 self._ts_cleanup = None
178 self._ts_last_progress = None
179 self._ts_last_error = None
180
181
182 self.success = None
183 self.final_message = None
184
185
186 self._daemon_name = None
187 self._daemon = None
188
189 @property
191 """Returns the most recent output from the daemon.
192
193 """
194 if self._daemon:
195 return self._daemon.recent_output
196
197 return None
198
199 @property
201 """Returns transfer progress information.
202
203 """
204 if not self._daemon:
205 return None
206
207 return (self._daemon.progress_mbytes,
208 self._daemon.progress_throughput,
209 self._daemon.progress_percent,
210 self._daemon.progress_eta)
211
212 @property
214 """Returns the magic value for this import/export.
215
216 """
217 return self._opts.magic
218
219 @property
221 """Determines whether this transport is still active.
222
223 """
224 return self.success is None
225
226 @property
228 """Returns parent loop.
229
230 @rtype: L{ImportExportLoop}
231
232 """
233 return self._loop
234
236 """Sets the parent loop.
237
238 @type loop: L{ImportExportLoop}
239
240 """
241 if self._loop:
242 raise errors.ProgrammerError("Loop can only be set once")
243
244 self._loop = loop
245
247 """Starts the import/export daemon.
248
249 """
250 raise NotImplementedError()
251
253 """Checks whether daemon has been started and if not, starts it.
254
255 @rtype: string
256 @return: Daemon name
257
258 """
259 assert self._ts_cleanup is None
260
261 if self._daemon_name is None:
262 assert self._ts_begin is None
263
264 result = self._StartDaemon()
265 if result.fail_msg:
266 raise _ImportExportError("Failed to start %s on %s: %s" %
267 (self.MODE_TEXT, self.node_name,
268 result.fail_msg))
269
270 daemon_name = result.payload
271
272 logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
273 self.node_name)
274
275 self._ts_begin = time.time()
276 self._daemon_name = daemon_name
277
278 return self._daemon_name
279
281 """Returns the daemon name.
282
283 """
284 assert self._daemon_name, "Daemon has not been started"
285 assert self._ts_cleanup is None
286 return self._daemon_name
287
289 """Sends SIGTERM to import/export daemon (if still active).
290
291 """
292 if self._daemon_name:
293 self._lu.LogWarning("Aborting %s %r on %s",
294 self.MODE_TEXT, self._daemon_name, self.node_name)
295 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
296 if result.fail_msg:
297 self._lu.LogWarning("Failed to abort %s %r on %s: %s",
298 self.MODE_TEXT, self._daemon_name,
299 self.node_name, result.fail_msg)
300 return False
301
302 return True
303
305 """Internal function for updating status daemon data.
306
307 @type data: L{objects.ImportExportStatus}
308 @param data: Daemon status data
309
310 """
311 assert self._ts_begin is not None
312
313 if not data:
314 if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
315 raise _ImportExportError("Didn't become ready after %s seconds" %
316 self._timeouts.ready)
317
318 return False
319
320 self._daemon = data
321
322 return True
323
325 """Updates daemon status data.
326
327 @type success: bool
328 @param success: Whether fetching data was successful or not
329 @type data: L{objects.ImportExportStatus}
330 @param data: Daemon status data
331
332 """
333 if not success:
334 if self._ts_last_error is None:
335 self._ts_last_error = time.time()
336
337 elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
338 raise _ImportExportError("Too many errors while updating data")
339
340 return False
341
342 self._ts_last_error = None
343
344 return self._SetDaemonData(data)
345
347 """Checks whether the daemon is listening.
348
349 """
350 raise NotImplementedError()
351
353 """Returns timeout to calculate connect timeout.
354
355 """
356 raise NotImplementedError()
357
359 """Checks whether the daemon is connected.
360
361 @rtype: bool
362 @return: Whether the daemon is connected
363
364 """
365 assert self._daemon, "Daemon status missing"
366
367 if self._ts_connected is not None:
368 return True
369
370 if self._daemon.connected:
371 self._ts_connected = time.time()
372
373
374 logging.debug("%s %r on %s is now connected",
375 self.MODE_TEXT, self._daemon_name, self.node_name)
376
377 self._cbs.ReportConnected(self, self._private)
378
379 return True
380
381 if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
382 raise _ImportExportError("Not connected after %s seconds" %
383 self._timeouts.connect)
384
385 return False
386
388 """Checks whether a progress update should be reported.
389
390 """
391 if ((self._ts_last_progress is None or
392 _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
393 self._daemon and
394 self._daemon.progress_mbytes is not None and
395 self._daemon.progress_throughput is not None):
396 self._cbs.ReportProgress(self, self._private)
397 self._ts_last_progress = time.time()
398
400 """Checks whether the daemon exited.
401
402 @rtype: bool
403 @return: Whether the transfer is finished
404
405 """
406 assert self._daemon, "Daemon status missing"
407
408 if self._ts_finished:
409 return True
410
411 if self._daemon.exit_status is None:
412
413 self._CheckProgress()
414 return False
415
416 self._ts_finished = time.time()
417
418 self._ReportFinished(self._daemon.exit_status == 0,
419 self._daemon.error_message)
420
421 return True
422
424 """Transfer is finished or daemon exited.
425
426 @type success: bool
427 @param success: Whether the transfer was successful
428 @type message: string
429 @param message: Error message
430
431 """
432 assert self.success is None
433
434 self.success = success
435 self.final_message = message
436
437 if success:
438 logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
439 self.node_name)
440 elif self._daemon_name:
441 self._lu.LogWarning("%s %r on %s failed: %s",
442 self.MODE_TEXT, self._daemon_name, self.node_name,
443 message)
444 else:
445 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
446 self.node_name, message)
447
448 self._cbs.ReportFinished(self, self._private)
449
451 """Makes the RPC call to finalize this import/export.
452
453 """
454 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
455
457 """Finalizes this import/export.
458
459 """
460 if self._daemon_name:
461 logging.info("Finalizing %s %r on %s",
462 self.MODE_TEXT, self._daemon_name, self.node_name)
463
464 result = self._Finalize()
465 if result.fail_msg:
466 self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
467 self.MODE_TEXT, self._daemon_name,
468 self.node_name, result.fail_msg)
469 return False
470
471
472 self._daemon_name = None
473 self._ts_cleanup = time.time()
474
475 if error:
476 self._ReportFinished(False, error)
477
478 return True
479
482 MODE_TEXT = "import"
483
484 - def __init__(self, lu, node_name, opts, instance,
485 dest, dest_args, timeouts, cbs, private=None):
486 """Initializes this class.
487
488 @param lu: Logical unit instance
489 @type node_name: string
490 @param node_name: Node name for import
491 @type opts: L{objects.ImportExportOptions}
492 @param opts: Import/export daemon options
493 @type instance: L{objects.Instance}
494 @param instance: Instance object
495 @param dest: I/O destination
496 @param dest_args: I/O arguments
497 @type timeouts: L{ImportExportTimeouts}
498 @param timeouts: Timeouts for this import
499 @type cbs: L{ImportExportCbBase}
500 @param cbs: Callbacks
501 @param private: Private data for callback functions
502
503 """
504 _DiskImportExportBase.__init__(self, lu, node_name, opts,
505 instance, timeouts, cbs, private)
506 self._dest = dest
507 self._dest_args = dest_args
508
509
510 self._ts_listening = None
511
512 @property
514 """Returns the port the daemon is listening on.
515
516 """
517 if self._daemon:
518 return self._daemon.listen_port
519
520 return None
521
523 """Starts the import daemon.
524
525 """
526 return self._lu.rpc.call_import_start(self.node_name, self._opts,
527 self._instance,
528 self._dest, self._dest_args)
529
531 """Checks whether the daemon is listening.
532
533 @rtype: bool
534 @return: Whether the daemon is listening
535
536 """
537 assert self._daemon, "Daemon status missing"
538
539 if self._ts_listening is not None:
540 return True
541
542 port = self._daemon.listen_port
543 if port is not None:
544 self._ts_listening = time.time()
545
546 logging.debug("Import %r on %s is now listening on port %s",
547 self._daemon_name, self.node_name, port)
548
549 self._cbs.ReportListening(self, self._private)
550
551 return True
552
553 if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
554 raise _ImportExportError("Not listening after %s seconds" %
555 self._timeouts.listen)
556
557 return False
558
560 """Returns the time since we started listening.
561
562 """
563 assert self._ts_listening is not None, \
564 ("Checking whether an import is connected is only useful"
565 " once it's been listening")
566
567 return self._ts_listening
568
571 MODE_TEXT = "export"
572
573 - def __init__(self, lu, node_name, opts,
574 dest_host, dest_port, instance, source, source_args,
575 timeouts, cbs, private=None):
576 """Initializes this class.
577
578 @param lu: Logical unit instance
579 @type node_name: string
580 @param node_name: Node name for import
581 @type opts: L{objects.ImportExportOptions}
582 @param opts: Import/export daemon options
583 @type dest_host: string
584 @param dest_host: Destination host name or IP address
585 @type dest_port: number
586 @param dest_port: Destination port number
587 @type instance: L{objects.Instance}
588 @param instance: Instance object
589 @param source: I/O source
590 @param source_args: I/O source
591 @type timeouts: L{ImportExportTimeouts}
592 @param timeouts: Timeouts for this import
593 @type cbs: L{ImportExportCbBase}
594 @param cbs: Callbacks
595 @param private: Private data for callback functions
596
597 """
598 _DiskImportExportBase.__init__(self, lu, node_name, opts,
599 instance, timeouts, cbs, private)
600 self._dest_host = dest_host
601 self._dest_port = dest_port
602 self._source = source
603 self._source_args = source_args
604
606 """Starts the export daemon.
607
608 """
609 return self._lu.rpc.call_export_start(self.node_name, self._opts,
610 self._dest_host, self._dest_port,
611 self._instance, self._source,
612 self._source_args)
613
615 """Checks whether the daemon is listening.
616
617 """
618
619 return True
620
622 """Returns the time since the daemon started.
623
624 """
625 assert self._ts_begin is not None
626
627 return self._ts_begin
628
650
653 MIN_DELAY = 1.0
654 MAX_DELAY = 20.0
655
657 """Initializes this class.
658
659 """
660 self._lu = lu
661 self._queue = []
662 self._pending_add = []
663
664 - def Add(self, diskie):
665 """Adds an import/export object to the loop.
666
667 @type diskie: Subclass of L{_DiskImportExportBase}
668 @param diskie: Import/export object
669
670 """
671 assert diskie not in self._pending_add
672 assert diskie.loop is None
673
674 diskie.SetLoop(self)
675
676
677
678
679 self._pending_add.append(diskie)
680
681 @staticmethod
683 """Collects the status for all import/export daemons.
684
685 """
686 daemon_status = {}
687
688 for node_name, names in daemons.iteritems():
689 result = lu.rpc.call_impexp_status(node_name, names)
690 if result.fail_msg:
691 lu.LogWarning("Failed to get daemon status on %s: %s",
692 node_name, result.fail_msg)
693 continue
694
695 assert len(names) == len(result.payload)
696
697 daemon_status[node_name] = dict(zip(names, result.payload))
698
699 return daemon_status
700
701 @staticmethod
703 """Gets the names of all active daemons.
704
705 """
706 result = {}
707 for diskie in queue:
708 if not diskie.active:
709 continue
710
711 try:
712
713 daemon_name = diskie.CheckDaemon()
714 except _ImportExportError, err:
715 logging.exception("%s failed", diskie.MODE_TEXT)
716 diskie.Finalize(error=str(err))
717 continue
718
719 result.setdefault(diskie.node_name, []).append(daemon_name)
720
721 assert len(queue) >= len(result)
722 assert len(queue) >= sum([len(names) for names in result.itervalues()])
723
724 logging.debug("daemons=%r", result)
725
726 return result
727
729 """Adds all pending import/export objects to the internal queue.
730
731 """
732 assert compat.all(diskie not in self._queue and diskie.loop == self
733 for diskie in self._pending_add)
734
735 self._queue.extend(self._pending_add)
736
737 del self._pending_add[:]
738
740 """Utility main loop.
741
742 """
743 while True:
744 self._AddPendingToQueue()
745
746
747 daemons = self._GetActiveDaemonNames(self._queue)
748 if not daemons:
749 break
750
751
752 data = self._CollectDaemonStatus(self._lu, daemons)
753
754
755 delay = self.MAX_DELAY
756 for diskie in self._queue:
757 if not diskie.active:
758 continue
759
760 try:
761 try:
762 all_daemon_data = data[diskie.node_name]
763 except KeyError:
764 result = diskie.SetDaemonData(False, None)
765 else:
766 result = \
767 diskie.SetDaemonData(True,
768 all_daemon_data[diskie.GetDaemonName()])
769
770 if not result:
771
772 delay = min(3.0, delay)
773 continue
774
775 if diskie.CheckFinished():
776
777 diskie.Finalize()
778 continue
779
780
781 delay = min(5.0, delay)
782
783 if not diskie.CheckListening():
784
785 delay = min(1.0, delay)
786 continue
787
788 if not diskie.CheckConnected():
789
790 delay = min(1.0, delay)
791 continue
792
793 except _ImportExportError, err:
794 logging.exception("%s failed", diskie.MODE_TEXT)
795 diskie.Finalize(error=str(err))
796
797 if not compat.any(diskie.active for diskie in self._queue):
798 break
799
800
801 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
802 logging.debug("Waiting for %ss", delay)
803 time.sleep(delay)
804
806 """Finalizes all pending transfers.
807
808 """
809 success = True
810
811 for diskie in self._queue:
812 success = diskie.Finalize() and success
813
814 return success
815
818 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
819 dest_node, dest_ip):
820 """Initializes this class.
821
822 """
823 ImportExportCbBase.__init__(self)
824
825 self.lu = lu
826 self.feedback_fn = feedback_fn
827 self.instance = instance
828 self.timeouts = timeouts
829 self.src_node = src_node
830 self.src_cbs = src_cbs
831 self.dest_node = dest_node
832 self.dest_ip = dest_ip
833
837 """Called when a connection has been established.
838
839 """
840 assert self.src_cbs is None
841 assert dtp.src_export == ie
842 assert dtp.dest_import
843
844 self.feedback_fn("%s is sending data on %s" %
845 (dtp.data.name, ie.node_name))
846
856
858 """Called when a transfer has finished.
859
860 """
861 assert self.src_cbs is None
862 assert dtp.src_export == ie
863 assert dtp.dest_import
864
865 if ie.success:
866 self.feedback_fn("%s finished sending data" % dtp.data.name)
867 else:
868 self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
869 (dtp.data.name, ie.final_message, ie.recent_output))
870
871 dtp.RecordResult(ie.success)
872
873 cb = dtp.data.finished_fn
874 if cb:
875 cb()
876
877
878
879 if dtp.dest_import and not ie.success:
880 dtp.dest_import.Abort()
881
885 """Called when daemon started listening.
886
887 """
888 assert self.src_cbs
889 assert dtp.src_export is None
890 assert dtp.dest_import
891 assert dtp.export_opts
892
893 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
894
895
896 de = DiskExport(self.lu, self.src_node, dtp.export_opts,
897 self.dest_ip, ie.listen_port,
898 self.instance, dtp.data.src_io, dtp.data.src_ioargs,
899 self.timeouts, self.src_cbs, private=dtp)
900 ie.loop.Add(de)
901
902 dtp.src_export = de
903
905 """Called when a connection has been established.
906
907 """
908 self.feedback_fn("%s is receiving data on %s" %
909 (dtp.data.name, self.dest_node))
910
912 """Called when a transfer has finished.
913
914 """
915 if ie.success:
916 self.feedback_fn("%s finished receiving data" % dtp.data.name)
917 else:
918 self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
919 (dtp.data.name, ie.final_message, ie.recent_output))
920
921 dtp.RecordResult(ie.success)
922
923
924
925 if dtp.src_export and not ie.success:
926 dtp.src_export.Abort()
927
930 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
931 finished_fn):
932 """Initializes this class.
933
934 @type name: string
935 @param name: User-visible name for this transfer (e.g. "disk/0")
936 @param src_io: Source I/O type
937 @param src_ioargs: Source I/O arguments
938 @param dest_io: Destination I/O type
939 @param dest_ioargs: Destination I/O arguments
940 @type finished_fn: callable
941 @param finished_fn: Function called once transfer has finished
942
943 """
944 self.name = name
945
946 self.src_io = src_io
947 self.src_ioargs = src_ioargs
948
949 self.dest_io = dest_io
950 self.dest_ioargs = dest_ioargs
951
952 self.finished_fn = finished_fn
953
956 - def __init__(self, data, success, export_opts):
957 """Initializes this class.
958
959 @type data: L{DiskTransfer}
960 @type success: bool
961
962 """
963 self.data = data
964 self.success = success
965 self.export_opts = export_opts
966
967 self.src_export = None
968 self.dest_import = None
969
971 """Updates the status.
972
973 One failed part will cause the whole transfer to fail.
974
975 """
976 self.success = self.success and success
977
980 """Computes the magic value for a disk export or import.
981
982 @type base: string
983 @param base: Random seed value (can be the same for all disks of a transfer)
984 @type instance_name: string
985 @param instance_name: Name of instance
986 @type index: number
987 @param index: Disk index
988
989 """
990 h = compat.sha1_hash()
991 h.update(str(constants.RIE_VERSION))
992 h.update(base)
993 h.update(instance_name)
994 h.update(str(index))
995 return h.hexdigest()
996
997
998 -def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
999 instance, all_transfers):
1000 """Transfers an instance's data from one node to another.
1001
1002 @param lu: Logical unit instance
1003 @param feedback_fn: Feedback function
1004 @type src_node: string
1005 @param src_node: Source node name
1006 @type dest_node: string
1007 @param dest_node: Destination node name
1008 @type dest_ip: string
1009 @param dest_ip: IP address of destination node
1010 @type instance: L{objects.Instance}
1011 @param instance: Instance object
1012 @type all_transfers: list of L{DiskTransfer} instances
1013 @param all_transfers: List of all disk transfers to be made
1014 @rtype: list
1015 @return: List with a boolean (True=successful, False=failed) for success for
1016 each transfer
1017
1018 """
1019
1020 compress = constants.IEC_NONE
1021
1022 logging.debug("Source node %s, destination node %s, compression '%s'",
1023 src_node, dest_node, compress)
1024
1025 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1026 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1027 src_node, None, dest_node, dest_ip)
1028 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1029 src_node, src_cbs, dest_node, dest_ip)
1030
1031 all_dtp = []
1032
1033 base_magic = utils.GenerateSecret(6)
1034
1035 ieloop = ImportExportLoop(lu)
1036 try:
1037 for idx, transfer in enumerate(all_transfers):
1038 if transfer:
1039 feedback_fn("Exporting %s from %s to %s" %
1040 (transfer.name, src_node, dest_node))
1041
1042 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1043 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1044 compress=compress, magic=magic)
1045
1046 dtp = _DiskTransferPrivate(transfer, True, opts)
1047
1048 di = DiskImport(lu, dest_node, opts, instance,
1049 transfer.dest_io, transfer.dest_ioargs,
1050 timeouts, dest_cbs, private=dtp)
1051 ieloop.Add(di)
1052
1053 dtp.dest_import = di
1054 else:
1055 dtp = _DiskTransferPrivate(None, False, None)
1056
1057 all_dtp.append(dtp)
1058
1059 ieloop.Run()
1060 finally:
1061 ieloop.FinalizeAll()
1062
1063 assert len(all_dtp) == len(all_transfers)
1064 assert compat.all((dtp.src_export is None or
1065 dtp.src_export.success is not None) and
1066 (dtp.dest_import is None or
1067 dtp.dest_import.success is not None)
1068 for dtp in all_dtp), \
1069 "Not all imports/exports are finalized"
1070
1071 return [bool(dtp.success) for dtp in all_dtp]
1072
1075 - def __init__(self, feedback_fn, disk_count):
1076 """Initializes this class.
1077
1078 """
1079 ImportExportCbBase.__init__(self)
1080 self._feedback_fn = feedback_fn
1081 self._dresults = [None] * disk_count
1082
1083 @property
1085 """Returns per-disk results.
1086
1087 """
1088 return self._dresults
1089
1091 """Called when a connection has been established.
1092
1093 """
1094 (idx, _) = private
1095
1096 self._feedback_fn("Disk %s is now sending data" % idx)
1097
1099 """Called when new progress information should be reported.
1100
1101 """
1102 (idx, _) = private
1103
1104 progress = ie.progress
1105 if not progress:
1106 return
1107
1108 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1109
1111 """Called when a transfer has finished.
1112
1113 """
1114 (idx, finished_fn) = private
1115
1116 if ie.success:
1117 self._feedback_fn("Disk %s finished sending data" % idx)
1118 else:
1119 self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1120 (idx, ie.final_message, ie.recent_output))
1121
1122 self._dresults[idx] = bool(ie.success)
1123
1124 if finished_fn:
1125 finished_fn()
1126
1129 - def __init__(self, lu, feedback_fn, instance):
1130 """Initializes this class.
1131
1132 @param lu: Logical unit instance
1133 @param feedback_fn: Feedback function
1134 @type instance: L{objects.Instance}
1135 @param instance: Instance object
1136
1137 """
1138 self._lu = lu
1139 self._feedback_fn = feedback_fn
1140 self._instance = instance
1141
1142 self._snap_disks = []
1143 self._removed_snaps = [False] * len(instance.disks)
1144
1146 """Creates an LVM snapshot for every disk of the instance.
1147
1148 """
1149 assert not self._snap_disks
1150
1151 instance = self._instance
1152 src_node = instance.primary_node
1153
1154 vgname = self._lu.cfg.GetVGName()
1155
1156 for idx, disk in enumerate(instance.disks):
1157 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1158 (idx, src_node))
1159
1160
1161
1162 result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1163 msg = result.fail_msg
1164 if msg:
1165 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1166 idx, src_node, msg)
1167 new_dev = False
1168 else:
1169 disk_id = (vgname, result.payload)
1170 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1171 logical_id=disk_id, physical_id=disk_id,
1172 iv_name=disk.iv_name)
1173
1174 self._snap_disks.append(new_dev)
1175
1176 assert len(self._snap_disks) == len(instance.disks)
1177 assert len(self._removed_snaps) == len(instance.disks)
1178
1180 """Removes an LVM snapshot.
1181
1182 @type disk_index: number
1183 @param disk_index: Index of the snapshot to be removed
1184
1185 """
1186 disk = self._snap_disks[disk_index]
1187 if disk and not self._removed_snaps[disk_index]:
1188 src_node = self._instance.primary_node
1189
1190 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1191 (disk_index, src_node))
1192
1193 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1194 if result.fail_msg:
1195 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1196 " %s: %s", disk_index, src_node, result.fail_msg)
1197 else:
1198 self._removed_snaps[disk_index] = True
1199
1201 """Intra-cluster instance export.
1202
1203 @type dest_node: L{objects.Node}
1204 @param dest_node: Destination node
1205
1206 """
1207 instance = self._instance
1208 src_node = instance.primary_node
1209
1210 assert len(self._snap_disks) == len(instance.disks)
1211
1212 transfers = []
1213
1214 for idx, dev in enumerate(self._snap_disks):
1215 if not dev:
1216 transfers.append(None)
1217 continue
1218
1219 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1220 dev.physical_id[1])
1221
1222 finished_fn = compat.partial(self._TransferFinished, idx)
1223
1224
1225 dt = DiskTransfer("snapshot/%s" % idx,
1226 constants.IEIO_SCRIPT, (dev, idx),
1227 constants.IEIO_FILE, (path, ),
1228 finished_fn)
1229 transfers.append(dt)
1230
1231
1232 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1233 src_node, dest_node.name,
1234 dest_node.secondary_ip,
1235 instance, transfers)
1236
1237 assert len(dresults) == len(instance.disks)
1238
1239 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1240 result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1241 self._snap_disks)
1242 msg = result.fail_msg
1243 fin_resu = not msg
1244 if msg:
1245 self._lu.LogWarning("Could not finalize export for instance %s"
1246 " on node %s: %s", instance.name, dest_node.name, msg)
1247
1248 return (fin_resu, dresults)
1249
1250 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1251 """Inter-cluster instance export.
1252
1253 @type disk_info: list
1254 @param disk_info: Per-disk destination information
1255 @type key_name: string
1256 @param key_name: Name of X509 key to use
1257 @type dest_ca_pem: string
1258 @param dest_ca_pem: Destination X509 CA in PEM format
1259 @type timeouts: L{ImportExportTimeouts}
1260 @param timeouts: Timeouts for this import
1261
1262 """
1263 instance = self._instance
1264
1265 assert len(disk_info) == len(instance.disks)
1266
1267 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1268
1269 ieloop = ImportExportLoop(self._lu)
1270 try:
1271 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1272 disk_info)):
1273 opts = objects.ImportExportOptions(key_name=key_name,
1274 ca_pem=dest_ca_pem,
1275 magic=magic)
1276
1277 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1278 finished_fn = compat.partial(self._TransferFinished, idx)
1279 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1280 opts, host, port, instance,
1281 constants.IEIO_SCRIPT, (dev, idx),
1282 timeouts, cbs, private=(idx, finished_fn)))
1283
1284 ieloop.Run()
1285 finally:
1286 ieloop.FinalizeAll()
1287
1288 return (True, cbs.disk_results)
1289
1291 """Called once a transfer has finished.
1292
1293 @type idx: number
1294 @param idx: Disk index
1295
1296 """
1297 logging.debug("Transfer %s finished", idx)
1298 self._RemoveSnapshot(idx)
1299
1301 """Remove all snapshots.
1302
1303 """
1304 assert len(self._removed_snaps) == len(self._instance.disks)
1305 for idx in range(len(self._instance.disks)):
1306 self._RemoveSnapshot(idx)
1307
1310 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1311 external_address):
1312 """Initializes this class.
1313
1314 @type cds: string
1315 @param cds: Cluster domain secret
1316 @type x509_cert_pem: string
1317 @param x509_cert_pem: CA used for signing import key
1318 @type disk_count: number
1319 @param disk_count: Number of disks
1320 @type external_address: string
1321 @param external_address: External address of destination node
1322
1323 """
1324 ImportExportCbBase.__init__(self)
1325 self._feedback_fn = feedback_fn
1326 self._cds = cds
1327 self._x509_cert_pem = x509_cert_pem
1328 self._disk_count = disk_count
1329 self._external_address = external_address
1330
1331 self._dresults = [None] * disk_count
1332 self._daemon_port = [None] * disk_count
1333
1334 self._salt = utils.GenerateSecret(8)
1335
1336 @property
1338 """Returns per-disk results.
1339
1340 """
1341 return self._dresults
1342
1344 """Checks whether all daemons are listening.
1345
1346 If all daemons are listening, the information is sent to the client.
1347
1348 """
1349 if not compat.all(dp is not None for dp in self._daemon_port):
1350 return
1351
1352 host = self._external_address
1353
1354 disks = []
1355 for idx, (port, magic) in enumerate(self._daemon_port):
1356 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1357 idx, host, port, magic))
1358
1359 assert len(disks) == self._disk_count
1360
1361 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1362 "disks": disks,
1363 "x509_ca": self._x509_cert_pem,
1364 })
1365
1367 """Called when daemon started listening.
1368
1369 """
1370 (idx, ) = private
1371
1372 self._feedback_fn("Disk %s is now listening" % idx)
1373
1374 assert self._daemon_port[idx] is None
1375
1376 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1377
1378 self._CheckAllListening()
1379
1381 """Called when a connection has been established.
1382
1383 """
1384 (idx, ) = private
1385
1386 self._feedback_fn("Disk %s is now receiving data" % idx)
1387
1389 """Called when a transfer has finished.
1390
1391 """
1392 (idx, ) = private
1393
1394
1395 self._daemon_port[idx] = None
1396
1397 if ie.success:
1398 self._feedback_fn("Disk %s finished receiving data" % idx)
1399 else:
1400 self._feedback_fn(("Disk %s failed to receive data: %s"
1401 " (recent output: %r)") %
1402 (idx, ie.final_message, ie.recent_output))
1403
1404 self._dresults[idx] = bool(ie.success)
1405
1406
1407 -def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1408 """Imports an instance from another cluster.
1409
1410 @param lu: Logical unit instance
1411 @param feedback_fn: Feedback function
1412 @type instance: L{objects.Instance}
1413 @param instance: Instance object
1414 @type source_x509_ca: OpenSSL.crypto.X509
1415 @param source_x509_ca: Import source's X509 CA
1416 @type cds: string
1417 @param cds: Cluster domain secret
1418 @type timeouts: L{ImportExportTimeouts}
1419 @param timeouts: Timeouts for this import
1420
1421 """
1422 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1423 source_x509_ca)
1424
1425 magic_base = utils.GenerateSecret(6)
1426
1427
1428 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1429 constants.RIE_CERT_VALIDITY)
1430 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1431
1432 (x509_key_name, x509_cert_pem) = result.payload
1433 try:
1434
1435 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1436 x509_cert_pem)
1437
1438
1439 signed_x509_cert_pem = \
1440 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1441
1442 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1443 len(instance.disks), instance.primary_node)
1444
1445 ieloop = ImportExportLoop(lu)
1446 try:
1447 for idx, dev in enumerate(instance.disks):
1448 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1449
1450
1451 opts = objects.ImportExportOptions(key_name=x509_key_name,
1452 ca_pem=source_ca_pem,
1453 magic=magic)
1454
1455 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1456 constants.IEIO_SCRIPT, (dev, idx),
1457 timeouts, cbs, private=(idx, )))
1458
1459 ieloop.Run()
1460 finally:
1461 ieloop.FinalizeAll()
1462 finally:
1463
1464 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1465 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1466
1467 return cbs.disk_results
1468
1471 """Returns the handshake message for a RIE protocol version.
1472
1473 @type version: number
1474
1475 """
1476 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1477
1489
1492 """Checks the handshake of a remote import/export.
1493
1494 @type cds: string
1495 @param cds: Cluster domain secret
1496 @type handshake: sequence
1497 @param handshake: Handshake sent by remote peer
1498
1499 """
1500 try:
1501 (version, hmac_digest, hmac_salt) = handshake
1502 except (TypeError, ValueError), err:
1503 return "Invalid data: %s" % err
1504
1505 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1506 hmac_digest, salt=hmac_salt):
1507 return "Hash didn't match, clusters don't share the same domain secret"
1508
1509 if version != constants.RIE_VERSION:
1510 return ("Clusters don't have the same remote import/export protocol"
1511 " (local=%s, remote=%s)" %
1512 (constants.RIE_VERSION, version))
1513
1514 return None
1515
1518 """Returns the hashed text for import/export disk information.
1519
1520 @type disk_index: number
1521 @param disk_index: Index of disk (included in hash)
1522 @type host: string
1523 @param host: Hostname
1524 @type port: number
1525 @param port: Daemon port
1526 @type magic: string
1527 @param magic: Magic value
1528
1529 """
1530 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1531
1534 """Verifies received disk information for an export.
1535
1536 @type cds: string
1537 @param cds: Cluster domain secret
1538 @type disk_index: number
1539 @param disk_index: Index of disk (included in hash)
1540 @type disk_info: sequence
1541 @param disk_info: Disk information sent by remote peer
1542
1543 """
1544 try:
1545 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1546 except (TypeError, ValueError), err:
1547 raise errors.GenericError("Invalid data: %s" % err)
1548
1549 if not (host and port and magic):
1550 raise errors.GenericError("Missing destination host, port or magic")
1551
1552 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1553
1554 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1555 raise errors.GenericError("HMAC is wrong")
1556
1557 return (netutils.Hostname.GetNormalizedName(host),
1558 utils.ValidateServiceName(port),
1559 magic)
1560
1563 """Computes the signed disk information for a remote import.
1564
1565 @type cds: string
1566 @param cds: Cluster domain secret
1567 @type salt: string
1568 @param salt: HMAC salt
1569 @type disk_index: number
1570 @param disk_index: Index of disk (included in hash)
1571 @type host: string
1572 @param host: Hostname
1573 @type port: number
1574 @param port: Daemon port
1575 @type magic: string
1576 @param magic: Magic value
1577
1578 """
1579 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1580 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1581 return (host, port, magic, hmac_digest, salt)
1582