1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
39 """Local exception to report import/export errors.
40
41 """
42
45
46 DEFAULT_READY_TIMEOUT = 10
47
48
49 DEFAULT_ERROR_TIMEOUT = 10
50
51
52 DEFAULT_LISTEN_TIMEOUT = 10
53
54
55 DEFAULT_PROGRESS_INTERVAL = 60
56
57 __slots__ = [
58 "error",
59 "ready",
60 "listen",
61 "connect",
62 "progress",
63 ]
64
70 """Initializes this class.
71
72 @type connect: number
73 @param connect: Timeout for establishing connection
74 @type listen: number
75 @param listen: Timeout for starting to listen for connections
76 @type error: number
77 @param error: Length of time until errors cause hard failure
78 @type ready: number
79 @param ready: Timeout for daemon to become ready
80 @type progress: number
81 @param progress: Progress update interval
82
83 """
84 self.error = error
85 self.ready = ready
86 self.listen = listen
87 self.connect = connect
88 self.progress = progress
89
92 """Callbacks for disk import/export.
93
94 """
96 """Called when daemon started listening.
97
98 @type ie: Subclass of L{_DiskImportExportBase}
99 @param ie: Import/export object
100 @param private: Private data passed to import/export object
101 @param component: transfer component name
102
103 """
104
106 """Called when a connection has been established.
107
108 @type ie: Subclass of L{_DiskImportExportBase}
109 @param ie: Import/export object
110 @param private: Private data passed to import/export object
111
112 """
113
115 """Called when new progress information should be reported.
116
117 @type ie: Subclass of L{_DiskImportExportBase}
118 @param ie: Import/export object
119 @param private: Private data passed to import/export object
120
121 """
122
124 """Called when a transfer has finished.
125
126 @type ie: Subclass of L{_DiskImportExportBase}
127 @param ie: Import/export object
128 @param private: Private data passed to import/export object
129
130 """
131
134 MODE_TEXT = None
135
136 - def __init__(self, lu, node_name, opts,
137 instance, component, timeouts, cbs, private=None):
138 """Initializes this class.
139
140 @param lu: Logical unit instance
141 @type node_name: string
142 @param node_name: Node name for import
143 @type opts: L{objects.ImportExportOptions}
144 @param opts: Import/export daemon options
145 @type instance: L{objects.Instance}
146 @param instance: Instance object
147 @type component: string
148 @param component: which part of the instance is being imported
149 @type timeouts: L{ImportExportTimeouts}
150 @param timeouts: Timeouts for this import
151 @type cbs: L{ImportExportCbBase}
152 @param cbs: Callbacks
153 @param private: Private data for callback functions
154
155 """
156 assert self.MODE_TEXT
157
158 self._lu = lu
159 self.node_name = node_name
160 self._opts = opts.Copy()
161 self._instance = instance
162 self._component = component
163 self._timeouts = timeouts
164 self._cbs = cbs
165 self._private = private
166
167
168 assert self._opts.connect_timeout is None
169 self._opts.connect_timeout = timeouts.connect
170
171
172 self._loop = None
173
174
175 self._ts_begin = None
176 self._ts_connected = None
177 self._ts_finished = None
178 self._ts_cleanup = None
179 self._ts_last_progress = None
180 self._ts_last_error = None
181
182
183 self.success = None
184 self.final_message = None
185
186
187 self._daemon_name = None
188 self._daemon = None
189
190 @property
192 """Returns the most recent output from the daemon.
193
194 """
195 if self._daemon:
196 return "\n".join(self._daemon.recent_output)
197
198 return None
199
200 @property
202 """Returns transfer progress information.
203
204 """
205 if not self._daemon:
206 return None
207
208 return (self._daemon.progress_mbytes,
209 self._daemon.progress_throughput,
210 self._daemon.progress_percent,
211 self._daemon.progress_eta)
212
213 @property
215 """Returns the magic value for this import/export.
216
217 """
218 return self._opts.magic
219
220 @property
222 """Determines whether this transport is still active.
223
224 """
225 return self.success is None
226
227 @property
229 """Returns parent loop.
230
231 @rtype: L{ImportExportLoop}
232
233 """
234 return self._loop
235
237 """Sets the parent loop.
238
239 @type loop: L{ImportExportLoop}
240
241 """
242 if self._loop:
243 raise errors.ProgrammerError("Loop can only be set once")
244
245 self._loop = loop
246
248 """Starts the import/export daemon.
249
250 """
251 raise NotImplementedError()
252
254 """Checks whether daemon has been started and if not, starts it.
255
256 @rtype: string
257 @return: Daemon name
258
259 """
260 assert self._ts_cleanup is None
261
262 if self._daemon_name is None:
263 assert self._ts_begin is None
264
265 result = self._StartDaemon()
266 if result.fail_msg:
267 raise _ImportExportError("Failed to start %s on %s: %s" %
268 (self.MODE_TEXT, self.node_name,
269 result.fail_msg))
270
271 daemon_name = result.payload
272
273 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
274 self.node_name)
275
276 self._ts_begin = time.time()
277 self._daemon_name = daemon_name
278
279 return self._daemon_name
280
282 """Returns the daemon name.
283
284 """
285 assert self._daemon_name, "Daemon has not been started"
286 assert self._ts_cleanup is None
287 return self._daemon_name
288
290 """Sends SIGTERM to import/export daemon (if still active).
291
292 """
293 if self._daemon_name:
294 self._lu.LogWarning("Aborting %s '%s' on %s",
295 self.MODE_TEXT, self._daemon_name, self.node_name)
296 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
297 if result.fail_msg:
298 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
299 self.MODE_TEXT, self._daemon_name,
300 self.node_name, result.fail_msg)
301 return False
302
303 return True
304
306 """Internal function for updating status daemon data.
307
308 @type data: L{objects.ImportExportStatus}
309 @param data: Daemon status data
310
311 """
312 assert self._ts_begin is not None
313
314 if not data:
315 if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
316 raise _ImportExportError("Didn't become ready after %s seconds" %
317 self._timeouts.ready)
318
319 return False
320
321 self._daemon = data
322
323 return True
324
326 """Updates daemon status data.
327
328 @type success: bool
329 @param success: Whether fetching data was successful or not
330 @type data: L{objects.ImportExportStatus}
331 @param data: Daemon status data
332
333 """
334 if not success:
335 if self._ts_last_error is None:
336 self._ts_last_error = time.time()
337
338 elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
339 raise _ImportExportError("Too many errors while updating data")
340
341 return False
342
343 self._ts_last_error = None
344
345 return self._SetDaemonData(data)
346
348 """Checks whether the daemon is listening.
349
350 """
351 raise NotImplementedError()
352
354 """Returns timeout to calculate connect timeout.
355
356 """
357 raise NotImplementedError()
358
360 """Checks whether the daemon is connected.
361
362 @rtype: bool
363 @return: Whether the daemon is connected
364
365 """
366 assert self._daemon, "Daemon status missing"
367
368 if self._ts_connected is not None:
369 return True
370
371 if self._daemon.connected:
372 self._ts_connected = time.time()
373
374
375 logging.debug("%s '%s' on %s is now connected",
376 self.MODE_TEXT, self._daemon_name, self.node_name)
377
378 self._cbs.ReportConnected(self, self._private)
379
380 return True
381
382 if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
383 self._timeouts.connect):
384 raise _ImportExportError("Not connected after %s seconds" %
385 self._timeouts.connect)
386
387 return False
388
390 """Checks whether a progress update should be reported.
391
392 """
393 if ((self._ts_last_progress is None or
394 utils.TimeoutExpired(self._ts_last_progress,
395 self._timeouts.progress)) and
396 self._daemon and
397 self._daemon.progress_mbytes is not None and
398 self._daemon.progress_throughput is not None):
399 self._cbs.ReportProgress(self, self._private)
400 self._ts_last_progress = time.time()
401
403 """Checks whether the daemon exited.
404
405 @rtype: bool
406 @return: Whether the transfer is finished
407
408 """
409 assert self._daemon, "Daemon status missing"
410
411 if self._ts_finished:
412 return True
413
414 if self._daemon.exit_status is None:
415
416 self._CheckProgress()
417 return False
418
419 self._ts_finished = time.time()
420
421 self._ReportFinished(self._daemon.exit_status == 0,
422 self._daemon.error_message)
423
424 return True
425
427 """Transfer is finished or daemon exited.
428
429 @type success: bool
430 @param success: Whether the transfer was successful
431 @type message: string
432 @param message: Error message
433
434 """
435 assert self.success is None
436
437 self.success = success
438 self.final_message = message
439
440 if success:
441 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
442 self._daemon_name, self.node_name)
443 elif self._daemon_name:
444 self._lu.LogWarning("%s '%s' on %s failed: %s",
445 self.MODE_TEXT, self._daemon_name, self.node_name,
446 message)
447 else:
448 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
449 self.node_name, message)
450
451 self._cbs.ReportFinished(self, self._private)
452
454 """Makes the RPC call to finalize this import/export.
455
456 """
457 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
458
460 """Finalizes this import/export.
461
462 """
463 if self._daemon_name:
464 logging.info("Finalizing %s '%s' on %s",
465 self.MODE_TEXT, self._daemon_name, self.node_name)
466
467 result = self._Finalize()
468 if result.fail_msg:
469 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
470 self.MODE_TEXT, self._daemon_name,
471 self.node_name, result.fail_msg)
472 return False
473
474
475 self._daemon_name = None
476 self._ts_cleanup = time.time()
477
478 if error:
479 self._ReportFinished(False, error)
480
481 return True
482
485 MODE_TEXT = "import"
486
487 - def __init__(self, lu, node_name, opts, instance, component,
488 dest, dest_args, timeouts, cbs, private=None):
489 """Initializes this class.
490
491 @param lu: Logical unit instance
492 @type node_name: string
493 @param node_name: Node name for import
494 @type opts: L{objects.ImportExportOptions}
495 @param opts: Import/export daemon options
496 @type instance: L{objects.Instance}
497 @param instance: Instance object
498 @type component: string
499 @param component: which part of the instance is being imported
500 @param dest: I/O destination
501 @param dest_args: I/O arguments
502 @type timeouts: L{ImportExportTimeouts}
503 @param timeouts: Timeouts for this import
504 @type cbs: L{ImportExportCbBase}
505 @param cbs: Callbacks
506 @param private: Private data for callback functions
507
508 """
509 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
510 component, timeouts, cbs, private)
511 self._dest = dest
512 self._dest_args = dest_args
513
514
515 self._ts_listening = None
516
517 @property
519 """Returns the port the daemon is listening on.
520
521 """
522 if self._daemon:
523 return self._daemon.listen_port
524
525 return None
526
528 """Starts the import daemon.
529
530 """
531 return self._lu.rpc.call_import_start(self.node_name, self._opts,
532 self._instance, self._component,
533 (self._dest, self._dest_args))
534
536 """Checks whether the daemon is listening.
537
538 @rtype: bool
539 @return: Whether the daemon is listening
540
541 """
542 assert self._daemon, "Daemon status missing"
543
544 if self._ts_listening is not None:
545 return True
546
547 port = self._daemon.listen_port
548 if port is not None:
549 self._ts_listening = time.time()
550
551 logging.debug("Import '%s' on %s is now listening on port %s",
552 self._daemon_name, self.node_name, port)
553
554 self._cbs.ReportListening(self, self._private, self._component)
555
556 return True
557
558 if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
559 raise _ImportExportError("Not listening after %s seconds" %
560 self._timeouts.listen)
561
562 return False
563
565 """Returns the time since we started listening.
566
567 """
568 assert self._ts_listening is not None, \
569 ("Checking whether an import is connected is only useful"
570 " once it's been listening")
571
572 return self._ts_listening
573
576 MODE_TEXT = "export"
577
578 - def __init__(self, lu, node_name, opts, dest_host, dest_port,
579 instance, component, source, source_args,
580 timeouts, cbs, private=None):
581 """Initializes this class.
582
583 @param lu: Logical unit instance
584 @type node_name: string
585 @param node_name: Node name for import
586 @type opts: L{objects.ImportExportOptions}
587 @param opts: Import/export daemon options
588 @type dest_host: string
589 @param dest_host: Destination host name or IP address
590 @type dest_port: number
591 @param dest_port: Destination port number
592 @type instance: L{objects.Instance}
593 @param instance: Instance object
594 @type component: string
595 @param component: which part of the instance is being imported
596 @param source: I/O source
597 @param source_args: I/O source
598 @type timeouts: L{ImportExportTimeouts}
599 @param timeouts: Timeouts for this import
600 @type cbs: L{ImportExportCbBase}
601 @param cbs: Callbacks
602 @param private: Private data for callback functions
603
604 """
605 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
606 component, timeouts, cbs, private)
607 self._dest_host = dest_host
608 self._dest_port = dest_port
609 self._source = source
610 self._source_args = source_args
611
613 """Starts the export daemon.
614
615 """
616 return self._lu.rpc.call_export_start(self.node_name, self._opts,
617 self._dest_host, self._dest_port,
618 self._instance, self._component,
619 (self._source, self._source_args))
620
622 """Checks whether the daemon is listening.
623
624 """
625
626 return True
627
629 """Returns the time since the daemon started.
630
631 """
632 assert self._ts_begin is not None
633
634 return self._ts_begin
635
657
660 MIN_DELAY = 1.0
661 MAX_DELAY = 20.0
662
664 """Initializes this class.
665
666 """
667 self._lu = lu
668 self._queue = []
669 self._pending_add = []
670
671 - def Add(self, diskie):
672 """Adds an import/export object to the loop.
673
674 @type diskie: Subclass of L{_DiskImportExportBase}
675 @param diskie: Import/export object
676
677 """
678 assert diskie not in self._pending_add
679 assert diskie.loop is None
680
681 diskie.SetLoop(self)
682
683
684
685
686 self._pending_add.append(diskie)
687
688 @staticmethod
690 """Collects the status for all import/export daemons.
691
692 """
693 daemon_status = {}
694
695 for node_name, names in daemons.iteritems():
696 result = lu.rpc.call_impexp_status(node_name, names)
697 if result.fail_msg:
698 lu.LogWarning("Failed to get daemon status on %s: %s",
699 node_name, result.fail_msg)
700 continue
701
702 assert len(names) == len(result.payload)
703
704 daemon_status[node_name] = dict(zip(names, result.payload))
705
706 return daemon_status
707
708 @staticmethod
710 """Gets the names of all active daemons.
711
712 """
713 result = {}
714 for diskie in queue:
715 if not diskie.active:
716 continue
717
718 try:
719
720 daemon_name = diskie.CheckDaemon()
721 except _ImportExportError, err:
722 logging.exception("%s failed", diskie.MODE_TEXT)
723 diskie.Finalize(error=str(err))
724 continue
725
726 result.setdefault(diskie.node_name, []).append(daemon_name)
727
728 assert len(queue) >= len(result)
729 assert len(queue) >= sum([len(names) for names in result.itervalues()])
730
731 logging.debug("daemons=%r", result)
732
733 return result
734
736 """Adds all pending import/export objects to the internal queue.
737
738 """
739 assert compat.all(diskie not in self._queue and diskie.loop == self
740 for diskie in self._pending_add)
741
742 self._queue.extend(self._pending_add)
743
744 del self._pending_add[:]
745
747 """Utility main loop.
748
749 """
750 while True:
751 self._AddPendingToQueue()
752
753
754 daemons = self._GetActiveDaemonNames(self._queue)
755 if not daemons:
756 break
757
758
759 data = self._CollectDaemonStatus(self._lu, daemons)
760
761
762 delay = self.MAX_DELAY
763 for diskie in self._queue:
764 if not diskie.active:
765 continue
766
767 try:
768 try:
769 all_daemon_data = data[diskie.node_name]
770 except KeyError:
771 result = diskie.SetDaemonData(False, None)
772 else:
773 result = \
774 diskie.SetDaemonData(True,
775 all_daemon_data[diskie.GetDaemonName()])
776
777 if not result:
778
779 delay = min(3.0, delay)
780 continue
781
782 if diskie.CheckFinished():
783
784 diskie.Finalize()
785 continue
786
787
788 delay = min(5.0, delay)
789
790 if not diskie.CheckListening():
791
792 delay = min(1.0, delay)
793 continue
794
795 if not diskie.CheckConnected():
796
797 delay = min(1.0, delay)
798 continue
799
800 except _ImportExportError, err:
801 logging.exception("%s failed", diskie.MODE_TEXT)
802 diskie.Finalize(error=str(err))
803
804 if not compat.any(diskie.active for diskie in self._queue):
805 break
806
807
808 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
809 logging.debug("Waiting for %ss", delay)
810 time.sleep(delay)
811
813 """Finalizes all pending transfers.
814
815 """
816 success = True
817
818 for diskie in self._queue:
819 success = diskie.Finalize() and success
820
821 return success
822
825 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
826 dest_node, dest_ip):
827 """Initializes this class.
828
829 """
830 ImportExportCbBase.__init__(self)
831
832 self.lu = lu
833 self.feedback_fn = feedback_fn
834 self.instance = instance
835 self.timeouts = timeouts
836 self.src_node = src_node
837 self.src_cbs = src_cbs
838 self.dest_node = dest_node
839 self.dest_ip = dest_ip
840
844 """Called when a connection has been established.
845
846 """
847 assert self.src_cbs is None
848 assert dtp.src_export == ie
849 assert dtp.dest_import
850
851 self.feedback_fn("%s is sending data on %s" %
852 (dtp.data.name, ie.node_name))
853
863
865 """Called when a transfer has finished.
866
867 """
868 assert self.src_cbs is None
869 assert dtp.src_export == ie
870 assert dtp.dest_import
871
872 if ie.success:
873 self.feedback_fn("%s finished sending data" % dtp.data.name)
874 else:
875 self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
876 (dtp.data.name, ie.final_message, ie.recent_output))
877
878 dtp.RecordResult(ie.success)
879
880 cb = dtp.data.finished_fn
881 if cb:
882 cb()
883
884
885
886 if dtp.dest_import and not ie.success:
887 dtp.dest_import.Abort()
888
892 """Called when daemon started listening.
893
894 """
895 assert self.src_cbs
896 assert dtp.src_export is None
897 assert dtp.dest_import
898 assert dtp.export_opts
899
900 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
901
902
903 de = DiskExport(self.lu, self.src_node, dtp.export_opts,
904 self.dest_ip, ie.listen_port, self.instance,
905 component, dtp.data.src_io, dtp.data.src_ioargs,
906 self.timeouts, self.src_cbs, private=dtp)
907 ie.loop.Add(de)
908
909 dtp.src_export = de
910
912 """Called when a connection has been established.
913
914 """
915 self.feedback_fn("%s is receiving data on %s" %
916 (dtp.data.name, self.dest_node))
917
919 """Called when a transfer has finished.
920
921 """
922 if ie.success:
923 self.feedback_fn("%s finished receiving data" % dtp.data.name)
924 else:
925 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
926 (dtp.data.name, ie.final_message, ie.recent_output))
927
928 dtp.RecordResult(ie.success)
929
930
931
932 if dtp.src_export and not ie.success:
933 dtp.src_export.Abort()
934
937 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
938 finished_fn):
939 """Initializes this class.
940
941 @type name: string
942 @param name: User-visible name for this transfer (e.g. "disk/0")
943 @param src_io: Source I/O type
944 @param src_ioargs: Source I/O arguments
945 @param dest_io: Destination I/O type
946 @param dest_ioargs: Destination I/O arguments
947 @type finished_fn: callable
948 @param finished_fn: Function called once transfer has finished
949
950 """
951 self.name = name
952
953 self.src_io = src_io
954 self.src_ioargs = src_ioargs
955
956 self.dest_io = dest_io
957 self.dest_ioargs = dest_ioargs
958
959 self.finished_fn = finished_fn
960
963 - def __init__(self, data, success, export_opts):
964 """Initializes this class.
965
966 @type data: L{DiskTransfer}
967 @type success: bool
968
969 """
970 self.data = data
971 self.success = success
972 self.export_opts = export_opts
973
974 self.src_export = None
975 self.dest_import = None
976
978 """Updates the status.
979
980 One failed part will cause the whole transfer to fail.
981
982 """
983 self.success = self.success and success
984
987 """Computes the magic value for a disk export or import.
988
989 @type base: string
990 @param base: Random seed value (can be the same for all disks of a transfer)
991 @type instance_name: string
992 @param instance_name: Name of instance
993 @type index: number
994 @param index: Disk index
995
996 """
997 h = compat.sha1_hash()
998 h.update(str(constants.RIE_VERSION))
999 h.update(base)
1000 h.update(instance_name)
1001 h.update(str(index))
1002 return h.hexdigest()
1003
1004
1005 -def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1006 instance, all_transfers):
1007 """Transfers an instance's data from one node to another.
1008
1009 @param lu: Logical unit instance
1010 @param feedback_fn: Feedback function
1011 @type src_node: string
1012 @param src_node: Source node name
1013 @type dest_node: string
1014 @param dest_node: Destination node name
1015 @type dest_ip: string
1016 @param dest_ip: IP address of destination node
1017 @type instance: L{objects.Instance}
1018 @param instance: Instance object
1019 @type all_transfers: list of L{DiskTransfer} instances
1020 @param all_transfers: List of all disk transfers to be made
1021 @rtype: list
1022 @return: List with a boolean (True=successful, False=failed) for success for
1023 each transfer
1024
1025 """
1026
1027 compress = constants.IEC_NONE
1028
1029 logging.debug("Source node %s, destination node %s, compression '%s'",
1030 src_node, dest_node, compress)
1031
1032 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1033 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1034 src_node, None, dest_node, dest_ip)
1035 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1036 src_node, src_cbs, dest_node, dest_ip)
1037
1038 all_dtp = []
1039
1040 base_magic = utils.GenerateSecret(6)
1041
1042 ieloop = ImportExportLoop(lu)
1043 try:
1044 for idx, transfer in enumerate(all_transfers):
1045 if transfer:
1046 feedback_fn("Exporting %s from %s to %s" %
1047 (transfer.name, src_node, dest_node))
1048
1049 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1050 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1051 compress=compress, magic=magic)
1052
1053 dtp = _DiskTransferPrivate(transfer, True, opts)
1054
1055 di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1056 transfer.dest_io, transfer.dest_ioargs,
1057 timeouts, dest_cbs, private=dtp)
1058 ieloop.Add(di)
1059
1060 dtp.dest_import = di
1061 else:
1062 dtp = _DiskTransferPrivate(None, False, None)
1063
1064 all_dtp.append(dtp)
1065
1066 ieloop.Run()
1067 finally:
1068 ieloop.FinalizeAll()
1069
1070 assert len(all_dtp) == len(all_transfers)
1071 assert compat.all((dtp.src_export is None or
1072 dtp.src_export.success is not None) and
1073 (dtp.dest_import is None or
1074 dtp.dest_import.success is not None)
1075 for dtp in all_dtp), \
1076 "Not all imports/exports are finalized"
1077
1078 return [bool(dtp.success) for dtp in all_dtp]
1079
1082 - def __init__(self, feedback_fn, disk_count):
1083 """Initializes this class.
1084
1085 """
1086 ImportExportCbBase.__init__(self)
1087 self._feedback_fn = feedback_fn
1088 self._dresults = [None] * disk_count
1089
1090 @property
1092 """Returns per-disk results.
1093
1094 """
1095 return self._dresults
1096
1098 """Called when a connection has been established.
1099
1100 """
1101 (idx, _) = private
1102
1103 self._feedback_fn("Disk %s is now sending data" % idx)
1104
1106 """Called when new progress information should be reported.
1107
1108 """
1109 (idx, _) = private
1110
1111 progress = ie.progress
1112 if not progress:
1113 return
1114
1115 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1116
1118 """Called when a transfer has finished.
1119
1120 """
1121 (idx, finished_fn) = private
1122
1123 if ie.success:
1124 self._feedback_fn("Disk %s finished sending data" % idx)
1125 else:
1126 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1127 (idx, ie.final_message, ie.recent_output))
1128
1129 self._dresults[idx] = bool(ie.success)
1130
1131 if finished_fn:
1132 finished_fn()
1133
1136 - def __init__(self, lu, feedback_fn, instance):
1137 """Initializes this class.
1138
1139 @param lu: Logical unit instance
1140 @param feedback_fn: Feedback function
1141 @type instance: L{objects.Instance}
1142 @param instance: Instance object
1143
1144 """
1145 self._lu = lu
1146 self._feedback_fn = feedback_fn
1147 self._instance = instance
1148
1149 self._snap_disks = []
1150 self._removed_snaps = [False] * len(instance.disks)
1151
1153 """Creates an LVM snapshot for every disk of the instance.
1154
1155 """
1156 assert not self._snap_disks
1157
1158 instance = self._instance
1159 src_node = instance.primary_node
1160
1161 for idx, disk in enumerate(instance.disks):
1162 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1163 (idx, src_node))
1164
1165
1166
1167 result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1168 new_dev = False
1169 msg = result.fail_msg
1170 if msg:
1171 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1172 idx, src_node, msg)
1173 elif (not isinstance(result.payload, (tuple, list)) or
1174 len(result.payload) != 2):
1175 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1176 " result '%s'", idx, src_node, result.payload)
1177 else:
1178 disk_id = tuple(result.payload)
1179 disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
1180 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1181 logical_id=disk_id, physical_id=disk_id,
1182 iv_name=disk.iv_name,
1183 params=disk_params)
1184
1185 self._snap_disks.append(new_dev)
1186
1187 assert len(self._snap_disks) == len(instance.disks)
1188 assert len(self._removed_snaps) == len(instance.disks)
1189
1191 """Removes an LVM snapshot.
1192
1193 @type disk_index: number
1194 @param disk_index: Index of the snapshot to be removed
1195
1196 """
1197 disk = self._snap_disks[disk_index]
1198 if disk and not self._removed_snaps[disk_index]:
1199 src_node = self._instance.primary_node
1200
1201 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1202 (disk_index, src_node))
1203
1204 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1205 if result.fail_msg:
1206 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1207 " %s: %s", disk_index, src_node, result.fail_msg)
1208 else:
1209 self._removed_snaps[disk_index] = True
1210
1212 """Intra-cluster instance export.
1213
1214 @type dest_node: L{objects.Node}
1215 @param dest_node: Destination node
1216
1217 """
1218 instance = self._instance
1219 src_node = instance.primary_node
1220
1221 assert len(self._snap_disks) == len(instance.disks)
1222
1223 transfers = []
1224
1225 for idx, dev in enumerate(self._snap_disks):
1226 if not dev:
1227 transfers.append(None)
1228 continue
1229
1230 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1231 dev.physical_id[1])
1232
1233 finished_fn = compat.partial(self._TransferFinished, idx)
1234
1235
1236 dt = DiskTransfer("snapshot/%s" % idx,
1237 constants.IEIO_SCRIPT, (dev, idx),
1238 constants.IEIO_FILE, (path, ),
1239 finished_fn)
1240 transfers.append(dt)
1241
1242
1243 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1244 src_node, dest_node.name,
1245 dest_node.secondary_ip,
1246 instance, transfers)
1247
1248 assert len(dresults) == len(instance.disks)
1249
1250 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1251 result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1252 self._snap_disks)
1253 msg = result.fail_msg
1254 fin_resu = not msg
1255 if msg:
1256 self._lu.LogWarning("Could not finalize export for instance %s"
1257 " on node %s: %s", instance.name, dest_node.name, msg)
1258
1259 return (fin_resu, dresults)
1260
1261 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1262 """Inter-cluster instance export.
1263
1264 @type disk_info: list
1265 @param disk_info: Per-disk destination information
1266 @type key_name: string
1267 @param key_name: Name of X509 key to use
1268 @type dest_ca_pem: string
1269 @param dest_ca_pem: Destination X509 CA in PEM format
1270 @type timeouts: L{ImportExportTimeouts}
1271 @param timeouts: Timeouts for this import
1272
1273 """
1274 instance = self._instance
1275
1276 assert len(disk_info) == len(instance.disks)
1277
1278 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1279
1280 ieloop = ImportExportLoop(self._lu)
1281 try:
1282 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1283 disk_info)):
1284
1285 ipv6 = netutils.IP6Address.IsValid(host)
1286
1287 opts = objects.ImportExportOptions(key_name=key_name,
1288 ca_pem=dest_ca_pem,
1289 magic=magic, ipv6=ipv6)
1290
1291 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1292 finished_fn = compat.partial(self._TransferFinished, idx)
1293 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1294 opts, host, port, instance, "disk%d" % idx,
1295 constants.IEIO_SCRIPT, (dev, idx),
1296 timeouts, cbs, private=(idx, finished_fn)))
1297
1298 ieloop.Run()
1299 finally:
1300 ieloop.FinalizeAll()
1301
1302 return (True, cbs.disk_results)
1303
1305 """Called once a transfer has finished.
1306
1307 @type idx: number
1308 @param idx: Disk index
1309
1310 """
1311 logging.debug("Transfer %s finished", idx)
1312 self._RemoveSnapshot(idx)
1313
1315 """Remove all snapshots.
1316
1317 """
1318 assert len(self._removed_snaps) == len(self._instance.disks)
1319 for idx in range(len(self._instance.disks)):
1320 self._RemoveSnapshot(idx)
1321
1324 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1325 external_address):
1326 """Initializes this class.
1327
1328 @type cds: string
1329 @param cds: Cluster domain secret
1330 @type x509_cert_pem: string
1331 @param x509_cert_pem: CA used for signing import key
1332 @type disk_count: number
1333 @param disk_count: Number of disks
1334 @type external_address: string
1335 @param external_address: External address of destination node
1336
1337 """
1338 ImportExportCbBase.__init__(self)
1339 self._feedback_fn = feedback_fn
1340 self._cds = cds
1341 self._x509_cert_pem = x509_cert_pem
1342 self._disk_count = disk_count
1343 self._external_address = external_address
1344
1345 self._dresults = [None] * disk_count
1346 self._daemon_port = [None] * disk_count
1347
1348 self._salt = utils.GenerateSecret(8)
1349
1350 @property
1352 """Returns per-disk results.
1353
1354 """
1355 return self._dresults
1356
1358 """Checks whether all daemons are listening.
1359
1360 If all daemons are listening, the information is sent to the client.
1361
1362 """
1363 if not compat.all(dp is not None for dp in self._daemon_port):
1364 return
1365
1366 host = self._external_address
1367
1368 disks = []
1369 for idx, (port, magic) in enumerate(self._daemon_port):
1370 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1371 idx, host, port, magic))
1372
1373 assert len(disks) == self._disk_count
1374
1375 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1376 "disks": disks,
1377 "x509_ca": self._x509_cert_pem,
1378 })
1379
1381 """Called when daemon started listening.
1382
1383 """
1384 (idx, ) = private
1385
1386 self._feedback_fn("Disk %s is now listening" % idx)
1387
1388 assert self._daemon_port[idx] is None
1389
1390 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1391
1392 self._CheckAllListening()
1393
1395 """Called when a connection has been established.
1396
1397 """
1398 (idx, ) = private
1399
1400 self._feedback_fn("Disk %s is now receiving data" % idx)
1401
1403 """Called when a transfer has finished.
1404
1405 """
1406 (idx, ) = private
1407
1408
1409 self._daemon_port[idx] = None
1410
1411 if ie.success:
1412 self._feedback_fn("Disk %s finished receiving data" % idx)
1413 else:
1414 self._feedback_fn(("Disk %s failed to receive data: %s"
1415 " (recent output: %s)") %
1416 (idx, ie.final_message, ie.recent_output))
1417
1418 self._dresults[idx] = bool(ie.success)
1419
1420
1421 -def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1422 cds, timeouts):
1423 """Imports an instance from another cluster.
1424
1425 @param lu: Logical unit instance
1426 @param feedback_fn: Feedback function
1427 @type instance: L{objects.Instance}
1428 @param instance: Instance object
1429 @type pnode: L{objects.Node}
1430 @param pnode: Primary node of instance as an object
1431 @type source_x509_ca: OpenSSL.crypto.X509
1432 @param source_x509_ca: Import source's X509 CA
1433 @type cds: string
1434 @param cds: Cluster domain secret
1435 @type timeouts: L{ImportExportTimeouts}
1436 @param timeouts: Timeouts for this import
1437
1438 """
1439 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1440 source_x509_ca)
1441
1442 magic_base = utils.GenerateSecret(6)
1443
1444
1445 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1446
1447
1448 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1449 constants.RIE_CERT_VALIDITY)
1450 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1451
1452 (x509_key_name, x509_cert_pem) = result.payload
1453 try:
1454
1455 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1456 x509_cert_pem)
1457
1458
1459 signed_x509_cert_pem = \
1460 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1461
1462 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1463 len(instance.disks), pnode.primary_ip)
1464
1465 ieloop = ImportExportLoop(lu)
1466 try:
1467 for idx, dev in enumerate(instance.disks):
1468 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1469
1470
1471 opts = objects.ImportExportOptions(key_name=x509_key_name,
1472 ca_pem=source_ca_pem,
1473 magic=magic, ipv6=ipv6)
1474
1475 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1476 "disk%d" % idx,
1477 constants.IEIO_SCRIPT, (dev, idx),
1478 timeouts, cbs, private=(idx, )))
1479
1480 ieloop.Run()
1481 finally:
1482 ieloop.FinalizeAll()
1483 finally:
1484
1485 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1486 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1487
1488 return cbs.disk_results
1489
1492 """Returns the handshake message for a RIE protocol version.
1493
1494 @type version: number
1495
1496 """
1497 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1498
1510
1513 """Checks the handshake of a remote import/export.
1514
1515 @type cds: string
1516 @param cds: Cluster domain secret
1517 @type handshake: sequence
1518 @param handshake: Handshake sent by remote peer
1519
1520 """
1521 try:
1522 (version, hmac_digest, hmac_salt) = handshake
1523 except (TypeError, ValueError), err:
1524 return "Invalid data: %s" % err
1525
1526 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1527 hmac_digest, salt=hmac_salt):
1528 return "Hash didn't match, clusters don't share the same domain secret"
1529
1530 if version != constants.RIE_VERSION:
1531 return ("Clusters don't have the same remote import/export protocol"
1532 " (local=%s, remote=%s)" %
1533 (constants.RIE_VERSION, version))
1534
1535 return None
1536
1539 """Returns the hashed text for import/export disk information.
1540
1541 @type disk_index: number
1542 @param disk_index: Index of disk (included in hash)
1543 @type host: string
1544 @param host: Hostname
1545 @type port: number
1546 @param port: Daemon port
1547 @type magic: string
1548 @param magic: Magic value
1549
1550 """
1551 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1552
1555 """Verifies received disk information for an export.
1556
1557 @type cds: string
1558 @param cds: Cluster domain secret
1559 @type disk_index: number
1560 @param disk_index: Index of disk (included in hash)
1561 @type disk_info: sequence
1562 @param disk_info: Disk information sent by remote peer
1563
1564 """
1565 try:
1566 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1567 except (TypeError, ValueError), err:
1568 raise errors.GenericError("Invalid data: %s" % err)
1569
1570 if not (host and port and magic):
1571 raise errors.GenericError("Missing destination host, port or magic")
1572
1573 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1574
1575 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1576 raise errors.GenericError("HMAC is wrong")
1577
1578 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1579 destination = host
1580 else:
1581 destination = netutils.Hostname.GetNormalizedName(host)
1582
1583 return (destination,
1584 utils.ValidateServiceName(port),
1585 magic)
1586
1589 """Computes the signed disk information for a remote import.
1590
1591 @type cds: string
1592 @param cds: Cluster domain secret
1593 @type salt: string
1594 @param salt: HMAC salt
1595 @type disk_index: number
1596 @param disk_index: Index of disk (included in hash)
1597 @type host: string
1598 @param host: Hostname
1599 @type port: number
1600 @param port: Daemon port
1601 @type magic: string
1602 @param magic: Magic value
1603
1604 """
1605 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1606 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1607 return (host, port, magic, hmac_digest, salt)
1608