1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
36 from ganeti import pathutils
40 """Local exception to report import/export errors.
41
42 """
43
46
47 DEFAULT_READY_TIMEOUT = 10
48
49
50 DEFAULT_ERROR_TIMEOUT = 10
51
52
53 DEFAULT_LISTEN_TIMEOUT = 10
54
55
56 DEFAULT_PROGRESS_INTERVAL = 60
57
58 __slots__ = [
59 "error",
60 "ready",
61 "listen",
62 "connect",
63 "progress",
64 ]
65
71 """Initializes this class.
72
73 @type connect: number
74 @param connect: Timeout for establishing connection
75 @type listen: number
76 @param listen: Timeout for starting to listen for connections
77 @type error: number
78 @param error: Length of time until errors cause hard failure
79 @type ready: number
80 @param ready: Timeout for daemon to become ready
81 @type progress: number
82 @param progress: Progress update interval
83
84 """
85 self.error = error
86 self.ready = ready
87 self.listen = listen
88 self.connect = connect
89 self.progress = progress
90
93 """Callbacks for disk import/export.
94
95 """
97 """Called when daemon started listening.
98
99 @type ie: Subclass of L{_DiskImportExportBase}
100 @param ie: Import/export object
101 @param private: Private data passed to import/export object
102 @param component: transfer component name
103
104 """
105
107 """Called when a connection has been established.
108
109 @type ie: Subclass of L{_DiskImportExportBase}
110 @param ie: Import/export object
111 @param private: Private data passed to import/export object
112
113 """
114
116 """Called when new progress information should be reported.
117
118 @type ie: Subclass of L{_DiskImportExportBase}
119 @param ie: Import/export object
120 @param private: Private data passed to import/export object
121
122 """
123
125 """Called when a transfer has finished.
126
127 @type ie: Subclass of L{_DiskImportExportBase}
128 @param ie: Import/export object
129 @param private: Private data passed to import/export object
130
131 """
132
135 MODE_TEXT = None
136
137 - def __init__(self, lu, node_uuid, opts,
138 instance, component, timeouts, cbs, private=None):
139 """Initializes this class.
140
141 @param lu: Logical unit instance
142 @type node_uuid: string
143 @param node_uuid: Node UUID for import
144 @type opts: L{objects.ImportExportOptions}
145 @param opts: Import/export daemon options
146 @type instance: L{objects.Instance}
147 @param instance: Instance object
148 @type component: string
149 @param component: which part of the instance is being imported
150 @type timeouts: L{ImportExportTimeouts}
151 @param timeouts: Timeouts for this import
152 @type cbs: L{ImportExportCbBase}
153 @param cbs: Callbacks
154 @param private: Private data for callback functions
155
156 """
157 assert self.MODE_TEXT
158
159 self._lu = lu
160 self.node_uuid = node_uuid
161 self.node_name = lu.cfg.GetNodeName(node_uuid)
162 self._opts = opts.Copy()
163 self._instance = instance
164 self._component = component
165 self._timeouts = timeouts
166 self._cbs = cbs
167 self._private = private
168
169
170 assert self._opts.connect_timeout is None
171 self._opts.connect_timeout = timeouts.connect
172
173
174 self._loop = None
175
176
177 self._ts_begin = None
178 self._ts_connected = None
179 self._ts_finished = None
180 self._ts_cleanup = None
181 self._ts_last_progress = None
182 self._ts_last_error = None
183
184
185 self.success = None
186 self.final_message = None
187
188
189 self._daemon_name = None
190 self._daemon = None
191
192 @property
194 """Returns the most recent output from the daemon.
195
196 """
197 if self._daemon:
198 return "\n".join(self._daemon.recent_output)
199
200 return None
201
202 @property
204 """Returns transfer progress information.
205
206 """
207 if not self._daemon:
208 return None
209
210 return (self._daemon.progress_mbytes,
211 self._daemon.progress_throughput,
212 self._daemon.progress_percent,
213 self._daemon.progress_eta)
214
215 @property
217 """Returns the magic value for this import/export.
218
219 """
220 return self._opts.magic
221
222 @property
224 """Determines whether this transport is still active.
225
226 """
227 return self.success is None
228
229 @property
231 """Returns parent loop.
232
233 @rtype: L{ImportExportLoop}
234
235 """
236 return self._loop
237
239 """Sets the parent loop.
240
241 @type loop: L{ImportExportLoop}
242
243 """
244 if self._loop:
245 raise errors.ProgrammerError("Loop can only be set once")
246
247 self._loop = loop
248
250 """Starts the import/export daemon.
251
252 """
253 raise NotImplementedError()
254
256 """Checks whether daemon has been started and if not, starts it.
257
258 @rtype: string
259 @return: Daemon name
260
261 """
262 assert self._ts_cleanup is None
263
264 if self._daemon_name is None:
265 assert self._ts_begin is None
266
267 result = self._StartDaemon()
268 if result.fail_msg:
269 raise _ImportExportError("Failed to start %s on %s: %s" %
270 (self.MODE_TEXT, self.node_name,
271 result.fail_msg))
272
273 daemon_name = result.payload
274
275 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
276 self.node_name)
277
278 self._ts_begin = time.time()
279 self._daemon_name = daemon_name
280
281 return self._daemon_name
282
284 """Returns the daemon name.
285
286 """
287 assert self._daemon_name, "Daemon has not been started"
288 assert self._ts_cleanup is None
289 return self._daemon_name
290
292 """Sends SIGTERM to import/export daemon (if still active).
293
294 """
295 if self._daemon_name:
296 self._lu.LogWarning("Aborting %s '%s' on %s",
297 self.MODE_TEXT, self._daemon_name, self.node_uuid)
298 result = self._lu.rpc.call_impexp_abort(self.node_uuid, self._daemon_name)
299 if result.fail_msg:
300 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
301 self.MODE_TEXT, self._daemon_name,
302 self.node_uuid, result.fail_msg)
303 return False
304
305 return True
306
308 """Internal function for updating status daemon data.
309
310 @type data: L{objects.ImportExportStatus}
311 @param data: Daemon status data
312
313 """
314 assert self._ts_begin is not None
315
316 if not data:
317 if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
318 raise _ImportExportError("Didn't become ready after %s seconds" %
319 self._timeouts.ready)
320
321 return False
322
323 self._daemon = data
324
325 return True
326
328 """Updates daemon status data.
329
330 @type success: bool
331 @param success: Whether fetching data was successful or not
332 @type data: L{objects.ImportExportStatus}
333 @param data: Daemon status data
334
335 """
336 if not success:
337 if self._ts_last_error is None:
338 self._ts_last_error = time.time()
339
340 elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
341 raise _ImportExportError("Too many errors while updating data")
342
343 return False
344
345 self._ts_last_error = None
346
347 return self._SetDaemonData(data)
348
350 """Checks whether the daemon is listening.
351
352 """
353 raise NotImplementedError()
354
356 """Returns timeout to calculate connect timeout.
357
358 """
359 raise NotImplementedError()
360
362 """Checks whether the daemon is connected.
363
364 @rtype: bool
365 @return: Whether the daemon is connected
366
367 """
368 assert self._daemon, "Daemon status missing"
369
370 if self._ts_connected is not None:
371 return True
372
373 if self._daemon.connected:
374 self._ts_connected = time.time()
375
376
377 logging.debug("%s '%s' on %s is now connected",
378 self.MODE_TEXT, self._daemon_name, self.node_uuid)
379
380 self._cbs.ReportConnected(self, self._private)
381
382 return True
383
384 if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
385 self._timeouts.connect):
386 raise _ImportExportError("Not connected after %s seconds" %
387 self._timeouts.connect)
388
389 return False
390
392 """Checks whether a progress update should be reported.
393
394 """
395 if ((self._ts_last_progress is None or
396 utils.TimeoutExpired(self._ts_last_progress,
397 self._timeouts.progress)) and
398 self._daemon and
399 self._daemon.progress_mbytes is not None and
400 self._daemon.progress_throughput is not None):
401 self._cbs.ReportProgress(self, self._private)
402 self._ts_last_progress = time.time()
403
405 """Checks whether the daemon exited.
406
407 @rtype: bool
408 @return: Whether the transfer is finished
409
410 """
411 assert self._daemon, "Daemon status missing"
412
413 if self._ts_finished:
414 return True
415
416 if self._daemon.exit_status is None:
417
418 self._CheckProgress()
419 return False
420
421 self._ts_finished = time.time()
422
423 self._ReportFinished(self._daemon.exit_status == 0,
424 self._daemon.error_message)
425
426 return True
427
429 """Transfer is finished or daemon exited.
430
431 @type success: bool
432 @param success: Whether the transfer was successful
433 @type message: string
434 @param message: Error message
435
436 """
437 assert self.success is None
438
439 self.success = success
440 self.final_message = message
441
442 if success:
443 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
444 self._daemon_name, self.node_uuid)
445 elif self._daemon_name:
446 self._lu.LogWarning("%s '%s' on %s failed: %s",
447 self.MODE_TEXT, self._daemon_name,
448 self._lu.cfg.GetNodeName(self.node_uuid),
449 message)
450 else:
451 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
452 self._lu.cfg.GetNodeName(self.node_uuid), message)
453
454 self._cbs.ReportFinished(self, self._private)
455
457 """Makes the RPC call to finalize this import/export.
458
459 """
460 return self._lu.rpc.call_impexp_cleanup(self.node_uuid, self._daemon_name)
461
463 """Finalizes this import/export.
464
465 """
466 if self._daemon_name:
467 logging.info("Finalizing %s '%s' on %s",
468 self.MODE_TEXT, self._daemon_name, self.node_uuid)
469
470 result = self._Finalize()
471 if result.fail_msg:
472 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
473 self.MODE_TEXT, self._daemon_name,
474 self.node_uuid, result.fail_msg)
475 return False
476
477
478 self._daemon_name = None
479 self._ts_cleanup = time.time()
480
481 if error:
482 self._ReportFinished(False, error)
483
484 return True
485
488 MODE_TEXT = "import"
489
490 - def __init__(self, lu, node_uuid, opts, instance, component,
491 dest, dest_args, timeouts, cbs, private=None):
492 """Initializes this class.
493
494 @param lu: Logical unit instance
495 @type node_uuid: string
496 @param node_uuid: Node name for import
497 @type opts: L{objects.ImportExportOptions}
498 @param opts: Import/export daemon options
499 @type instance: L{objects.Instance}
500 @param instance: Instance object
501 @type component: string
502 @param component: which part of the instance is being imported
503 @param dest: I/O destination
504 @param dest_args: I/O arguments
505 @type timeouts: L{ImportExportTimeouts}
506 @param timeouts: Timeouts for this import
507 @type cbs: L{ImportExportCbBase}
508 @param cbs: Callbacks
509 @param private: Private data for callback functions
510
511 """
512 _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
513 component, timeouts, cbs, private)
514 self._dest = dest
515 self._dest_args = dest_args
516
517
518 self._ts_listening = None
519
520 @property
522 """Returns the port the daemon is listening on.
523
524 """
525 if self._daemon:
526 return self._daemon.listen_port
527
528 return None
529
531 """Starts the import daemon.
532
533 """
534 return self._lu.rpc.call_import_start(self.node_uuid, self._opts,
535 self._instance, self._component,
536 (self._dest, self._dest_args))
537
539 """Checks whether the daemon is listening.
540
541 @rtype: bool
542 @return: Whether the daemon is listening
543
544 """
545 assert self._daemon, "Daemon status missing"
546
547 if self._ts_listening is not None:
548 return True
549
550 port = self._daemon.listen_port
551 if port is not None:
552 self._ts_listening = time.time()
553
554 logging.debug("Import '%s' on %s is now listening on port %s",
555 self._daemon_name, self.node_uuid, port)
556
557 self._cbs.ReportListening(self, self._private, self._component)
558
559 return True
560
561 if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
562 raise _ImportExportError("Not listening after %s seconds" %
563 self._timeouts.listen)
564
565 return False
566
568 """Returns the time since we started listening.
569
570 """
571 assert self._ts_listening is not None, \
572 ("Checking whether an import is connected is only useful"
573 " once it's been listening")
574
575 return self._ts_listening
576
579 MODE_TEXT = "export"
580
581 - def __init__(self, lu, node_uuid, opts, dest_host, dest_port,
582 instance, component, source, source_args,
583 timeouts, cbs, private=None):
584 """Initializes this class.
585
586 @param lu: Logical unit instance
587 @type node_uuid: string
588 @param node_uuid: Node UUID for import
589 @type opts: L{objects.ImportExportOptions}
590 @param opts: Import/export daemon options
591 @type dest_host: string
592 @param dest_host: Destination host name or IP address
593 @type dest_port: number
594 @param dest_port: Destination port number
595 @type instance: L{objects.Instance}
596 @param instance: Instance object
597 @type component: string
598 @param component: which part of the instance is being imported
599 @param source: I/O source
600 @param source_args: I/O source
601 @type timeouts: L{ImportExportTimeouts}
602 @param timeouts: Timeouts for this import
603 @type cbs: L{ImportExportCbBase}
604 @param cbs: Callbacks
605 @param private: Private data for callback functions
606
607 """
608 _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
609 component, timeouts, cbs, private)
610 self._dest_host = dest_host
611 self._dest_port = dest_port
612 self._source = source
613 self._source_args = source_args
614
616 """Starts the export daemon.
617
618 """
619 return self._lu.rpc.call_export_start(self.node_uuid, self._opts,
620 self._dest_host, self._dest_port,
621 self._instance, self._component,
622 (self._source, self._source_args))
623
625 """Checks whether the daemon is listening.
626
627 """
628
629 return True
630
632 """Returns the time since the daemon started.
633
634 """
635 assert self._ts_begin is not None
636
637 return self._ts_begin
638
660
663 MIN_DELAY = 1.0
664 MAX_DELAY = 20.0
665
667 """Initializes this class.
668
669 """
670 self._lu = lu
671 self._queue = []
672 self._pending_add = []
673
674 - def Add(self, diskie):
675 """Adds an import/export object to the loop.
676
677 @type diskie: Subclass of L{_DiskImportExportBase}
678 @param diskie: Import/export object
679
680 """
681 assert diskie not in self._pending_add
682 assert diskie.loop is None
683
684 diskie.SetLoop(self)
685
686
687
688
689 self._pending_add.append(diskie)
690
691 @staticmethod
693 """Collects the status for all import/export daemons.
694
695 """
696 daemon_status = {}
697
698 for node_name, names in daemons.iteritems():
699 result = lu.rpc.call_impexp_status(node_name, names)
700 if result.fail_msg:
701 lu.LogWarning("Failed to get daemon status on %s: %s",
702 node_name, result.fail_msg)
703 continue
704
705 assert len(names) == len(result.payload)
706
707 daemon_status[node_name] = dict(zip(names, result.payload))
708
709 return daemon_status
710
711 @staticmethod
713 """Gets the names of all active daemons.
714
715 """
716 result = {}
717 for diskie in queue:
718 if not diskie.active:
719 continue
720
721 try:
722
723 daemon_name = diskie.CheckDaemon()
724 except _ImportExportError, err:
725 logging.exception("%s failed", diskie.MODE_TEXT)
726 diskie.Finalize(error=str(err))
727 continue
728
729 result.setdefault(diskie.node_name, []).append(daemon_name)
730
731 assert len(queue) >= len(result)
732 assert len(queue) >= sum([len(names) for names in result.itervalues()])
733
734 logging.debug("daemons=%r", result)
735
736 return result
737
739 """Adds all pending import/export objects to the internal queue.
740
741 """
742 assert compat.all(diskie not in self._queue and diskie.loop == self
743 for diskie in self._pending_add)
744
745 self._queue.extend(self._pending_add)
746
747 del self._pending_add[:]
748
750 """Utility main loop.
751
752 """
753 while True:
754 self._AddPendingToQueue()
755
756
757 daemons = self._GetActiveDaemonNames(self._queue)
758 if not daemons:
759 break
760
761
762 data = self._CollectDaemonStatus(self._lu, daemons)
763
764
765 delay = self.MAX_DELAY
766 for diskie in self._queue:
767 if not diskie.active:
768 continue
769
770 try:
771 try:
772 all_daemon_data = data[diskie.node_name]
773 except KeyError:
774 result = diskie.SetDaemonData(False, None)
775 else:
776 result = \
777 diskie.SetDaemonData(True,
778 all_daemon_data[diskie.GetDaemonName()])
779
780 if not result:
781
782 delay = min(3.0, delay)
783 continue
784
785 if diskie.CheckFinished():
786
787 diskie.Finalize()
788 continue
789
790
791 delay = min(5.0, delay)
792
793 if not diskie.CheckListening():
794
795 delay = min(1.0, delay)
796 continue
797
798 if not diskie.CheckConnected():
799
800 delay = min(1.0, delay)
801 continue
802
803 except _ImportExportError, err:
804 logging.exception("%s failed", diskie.MODE_TEXT)
805 diskie.Finalize(error=str(err))
806
807 if not compat.any(diskie.active for diskie in self._queue):
808 break
809
810
811 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
812 logging.debug("Waiting for %ss", delay)
813 time.sleep(delay)
814
816 """Finalizes all pending transfers.
817
818 """
819 success = True
820
821 for diskie in self._queue:
822 success = diskie.Finalize() and success
823
824 return success
825
828 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node_uuid,
829 src_cbs, dest_node_uuid, dest_ip):
830 """Initializes this class.
831
832 """
833 ImportExportCbBase.__init__(self)
834
835 self.lu = lu
836 self.feedback_fn = feedback_fn
837 self.instance = instance
838 self.timeouts = timeouts
839 self.src_node_uuid = src_node_uuid
840 self.src_cbs = src_cbs
841 self.dest_node_uuid = dest_node_uuid
842 self.dest_ip = dest_ip
843
847 """Called when a connection has been established.
848
849 """
850 assert self.src_cbs is None
851 assert dtp.src_export == ie
852 assert dtp.dest_import
853
854 self.feedback_fn("%s is sending data on %s" %
855 (dtp.data.name, ie.node_name))
856
866
868 """Called when a transfer has finished.
869
870 """
871 assert self.src_cbs is None
872 assert dtp.src_export == ie
873 assert dtp.dest_import
874
875 if ie.success:
876 self.feedback_fn("%s finished sending data" % dtp.data.name)
877 else:
878 self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
879 (dtp.data.name, ie.final_message, ie.recent_output))
880
881 dtp.RecordResult(ie.success)
882
883 cb = dtp.data.finished_fn
884 if cb:
885 cb()
886
887
888
889 if dtp.dest_import and not ie.success:
890 dtp.dest_import.Abort()
891
895 """Called when daemon started listening.
896
897 """
898 assert self.src_cbs
899 assert dtp.src_export is None
900 assert dtp.dest_import
901 assert dtp.export_opts
902
903 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
904
905
906 de = DiskExport(self.lu, self.src_node_uuid, dtp.export_opts,
907 self.dest_ip, ie.listen_port, self.instance,
908 component, dtp.data.src_io, dtp.data.src_ioargs,
909 self.timeouts, self.src_cbs, private=dtp)
910 ie.loop.Add(de)
911
912 dtp.src_export = de
913
915 """Called when a connection has been established.
916
917 """
918 self.feedback_fn("%s is receiving data on %s" %
919 (dtp.data.name,
920 self.lu.cfg.GetNodeName(self.dest_node_uuid)))
921
923 """Called when a transfer has finished.
924
925 """
926 if ie.success:
927 self.feedback_fn("%s finished receiving data" % dtp.data.name)
928 else:
929 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
930 (dtp.data.name, ie.final_message, ie.recent_output))
931
932 dtp.RecordResult(ie.success)
933
934
935
936 if dtp.src_export and not ie.success:
937 dtp.src_export.Abort()
938
941 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
942 finished_fn):
943 """Initializes this class.
944
945 @type name: string
946 @param name: User-visible name for this transfer (e.g. "disk/0")
947 @param src_io: Source I/O type
948 @param src_ioargs: Source I/O arguments
949 @param dest_io: Destination I/O type
950 @param dest_ioargs: Destination I/O arguments
951 @type finished_fn: callable
952 @param finished_fn: Function called once transfer has finished
953
954 """
955 self.name = name
956
957 self.src_io = src_io
958 self.src_ioargs = src_ioargs
959
960 self.dest_io = dest_io
961 self.dest_ioargs = dest_ioargs
962
963 self.finished_fn = finished_fn
964
967 - def __init__(self, data, success, export_opts):
968 """Initializes this class.
969
970 @type data: L{DiskTransfer}
971 @type success: bool
972
973 """
974 self.data = data
975 self.success = success
976 self.export_opts = export_opts
977
978 self.src_export = None
979 self.dest_import = None
980
982 """Updates the status.
983
984 One failed part will cause the whole transfer to fail.
985
986 """
987 self.success = self.success and success
988
991 """Computes the magic value for a disk export or import.
992
993 @type base: string
994 @param base: Random seed value (can be the same for all disks of a transfer)
995 @type instance_name: string
996 @param instance_name: Name of instance
997 @type index: number
998 @param index: Disk index
999
1000 """
1001 h = compat.sha1_hash()
1002 h.update(str(constants.RIE_VERSION))
1003 h.update(base)
1004 h.update(instance_name)
1005 h.update(str(index))
1006 return h.hexdigest()
1007
1008
1009 -def TransferInstanceData(lu, feedback_fn, src_node_uuid, dest_node_uuid,
1010 dest_ip, instance, all_transfers):
1011 """Transfers an instance's data from one node to another.
1012
1013 @param lu: Logical unit instance
1014 @param feedback_fn: Feedback function
1015 @type src_node_uuid: string
1016 @param src_node_uuid: Source node UUID
1017 @type dest_node_uuid: string
1018 @param dest_node_uuid: Destination node UUID
1019 @type dest_ip: string
1020 @param dest_ip: IP address of destination node
1021 @type instance: L{objects.Instance}
1022 @param instance: Instance object
1023 @type all_transfers: list of L{DiskTransfer} instances
1024 @param all_transfers: List of all disk transfers to be made
1025 @rtype: list
1026 @return: List with a boolean (True=successful, False=failed) for success for
1027 each transfer
1028
1029 """
1030
1031 compress = constants.IEC_NONE
1032
1033 src_node_name = lu.cfg.GetNodeName(src_node_uuid)
1034 dest_node_name = lu.cfg.GetNodeName(dest_node_uuid)
1035
1036 logging.debug("Source node %s, destination node %s, compression '%s'",
1037 src_node_name, dest_node_name, compress)
1038
1039 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1040 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1041 src_node_uuid, None, dest_node_uuid, dest_ip)
1042 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1043 src_node_uuid, src_cbs, dest_node_uuid,
1044 dest_ip)
1045
1046 all_dtp = []
1047
1048 base_magic = utils.GenerateSecret(6)
1049
1050 ieloop = ImportExportLoop(lu)
1051 try:
1052 for idx, transfer in enumerate(all_transfers):
1053 if transfer:
1054 feedback_fn("Exporting %s from %s to %s" %
1055 (transfer.name, src_node_name, dest_node_name))
1056
1057 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1058 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1059 compress=compress, magic=magic)
1060
1061 dtp = _DiskTransferPrivate(transfer, True, opts)
1062
1063 di = DiskImport(lu, dest_node_uuid, opts, instance, "disk%d" % idx,
1064 transfer.dest_io, transfer.dest_ioargs,
1065 timeouts, dest_cbs, private=dtp)
1066 ieloop.Add(di)
1067
1068 dtp.dest_import = di
1069 else:
1070 dtp = _DiskTransferPrivate(None, False, None)
1071
1072 all_dtp.append(dtp)
1073
1074 ieloop.Run()
1075 finally:
1076 ieloop.FinalizeAll()
1077
1078 assert len(all_dtp) == len(all_transfers)
1079 assert compat.all((dtp.src_export is None or
1080 dtp.src_export.success is not None) and
1081 (dtp.dest_import is None or
1082 dtp.dest_import.success is not None)
1083 for dtp in all_dtp), \
1084 "Not all imports/exports are finalized"
1085
1086 return [bool(dtp.success) for dtp in all_dtp]
1087
1090 - def __init__(self, feedback_fn, disk_count):
1091 """Initializes this class.
1092
1093 """
1094 ImportExportCbBase.__init__(self)
1095 self._feedback_fn = feedback_fn
1096 self._dresults = [None] * disk_count
1097
1098 @property
1100 """Returns per-disk results.
1101
1102 """
1103 return self._dresults
1104
1106 """Called when a connection has been established.
1107
1108 """
1109 (idx, _) = private
1110
1111 self._feedback_fn("Disk %s is now sending data" % idx)
1112
1114 """Called when new progress information should be reported.
1115
1116 """
1117 (idx, _) = private
1118
1119 progress = ie.progress
1120 if not progress:
1121 return
1122
1123 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1124
1126 """Called when a transfer has finished.
1127
1128 """
1129 (idx, finished_fn) = private
1130
1131 if ie.success:
1132 self._feedback_fn("Disk %s finished sending data" % idx)
1133 else:
1134 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1135 (idx, ie.final_message, ie.recent_output))
1136
1137 self._dresults[idx] = bool(ie.success)
1138
1139 if finished_fn:
1140 finished_fn()
1141
1144 - def __init__(self, lu, feedback_fn, instance):
1145 """Initializes this class.
1146
1147 @param lu: Logical unit instance
1148 @param feedback_fn: Feedback function
1149 @type instance: L{objects.Instance}
1150 @param instance: Instance object
1151
1152 """
1153 self._lu = lu
1154 self._feedback_fn = feedback_fn
1155 self._instance = instance
1156
1157 self._snap_disks = []
1158 self._removed_snaps = [False] * len(instance.disks)
1159
1161 """Creates an LVM snapshot for every disk of the instance.
1162
1163 """
1164 assert not self._snap_disks
1165
1166 instance = self._instance
1167 src_node = instance.primary_node
1168
1169 for idx, disk in enumerate(instance.disks):
1170 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1171 (idx, src_node))
1172
1173
1174
1175 result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1176 new_dev = False
1177 msg = result.fail_msg
1178 if msg:
1179 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1180 idx, src_node, msg)
1181 elif (not isinstance(result.payload, (tuple, list)) or
1182 len(result.payload) != 2):
1183 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1184 " result '%s'", idx, src_node, result.payload)
1185 else:
1186 disk_id = tuple(result.payload)
1187 disk_params = constants.DISK_LD_DEFAULTS[constants.DT_PLAIN].copy()
1188 new_dev = objects.Disk(dev_type=constants.DT_PLAIN, size=disk.size,
1189 logical_id=disk_id, physical_id=disk_id,
1190 iv_name=disk.iv_name,
1191 params=disk_params)
1192
1193 self._snap_disks.append(new_dev)
1194
1195 assert len(self._snap_disks) == len(instance.disks)
1196 assert len(self._removed_snaps) == len(instance.disks)
1197
1199 """Removes an LVM snapshot.
1200
1201 @type disk_index: number
1202 @param disk_index: Index of the snapshot to be removed
1203
1204 """
1205 disk = self._snap_disks[disk_index]
1206 if disk and not self._removed_snaps[disk_index]:
1207 src_node = self._instance.primary_node
1208
1209 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1210 (disk_index, src_node))
1211
1212 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1213 if result.fail_msg:
1214 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1215 " %s: %s", disk_index, src_node, result.fail_msg)
1216 else:
1217 self._removed_snaps[disk_index] = True
1218
1220 """Intra-cluster instance export.
1221
1222 @type dest_node: L{objects.Node}
1223 @param dest_node: Destination node
1224
1225 """
1226 instance = self._instance
1227 src_node_uuid = instance.primary_node
1228
1229 assert len(self._snap_disks) == len(instance.disks)
1230
1231 transfers = []
1232
1233 for idx, dev in enumerate(self._snap_disks):
1234 if not dev:
1235 transfers.append(None)
1236 continue
1237
1238 path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1239 dev.physical_id[1])
1240
1241 finished_fn = compat.partial(self._TransferFinished, idx)
1242
1243
1244 dt = DiskTransfer("snapshot/%s" % idx,
1245 constants.IEIO_SCRIPT, (dev, idx),
1246 constants.IEIO_FILE, (path, ),
1247 finished_fn)
1248 transfers.append(dt)
1249
1250
1251 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1252 src_node_uuid, dest_node.uuid,
1253 dest_node.secondary_ip,
1254 instance, transfers)
1255
1256 assert len(dresults) == len(instance.disks)
1257
1258 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1259 result = self._lu.rpc.call_finalize_export(dest_node.uuid, instance,
1260 self._snap_disks)
1261 msg = result.fail_msg
1262 fin_resu = not msg
1263 if msg:
1264 self._lu.LogWarning("Could not finalize export for instance %s"
1265 " on node %s: %s", instance.name, dest_node.name, msg)
1266
1267 return (fin_resu, dresults)
1268
1269 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1270 """Inter-cluster instance export.
1271
1272 @type disk_info: list
1273 @param disk_info: Per-disk destination information
1274 @type key_name: string
1275 @param key_name: Name of X509 key to use
1276 @type dest_ca_pem: string
1277 @param dest_ca_pem: Destination X509 CA in PEM format
1278 @type timeouts: L{ImportExportTimeouts}
1279 @param timeouts: Timeouts for this import
1280
1281 """
1282 instance = self._instance
1283
1284 assert len(disk_info) == len(instance.disks)
1285
1286 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1287
1288 ieloop = ImportExportLoop(self._lu)
1289 try:
1290 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1291 disk_info)):
1292
1293 ipv6 = netutils.IP6Address.IsValid(host)
1294
1295 opts = objects.ImportExportOptions(key_name=key_name,
1296 ca_pem=dest_ca_pem,
1297 magic=magic, ipv6=ipv6)
1298
1299 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1300 finished_fn = compat.partial(self._TransferFinished, idx)
1301 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1302 opts, host, port, instance, "disk%d" % idx,
1303 constants.IEIO_SCRIPT, (dev, idx),
1304 timeouts, cbs, private=(idx, finished_fn)))
1305
1306 ieloop.Run()
1307 finally:
1308 ieloop.FinalizeAll()
1309
1310 return (True, cbs.disk_results)
1311
1313 """Called once a transfer has finished.
1314
1315 @type idx: number
1316 @param idx: Disk index
1317
1318 """
1319 logging.debug("Transfer %s finished", idx)
1320 self._RemoveSnapshot(idx)
1321
1323 """Remove all snapshots.
1324
1325 """
1326 assert len(self._removed_snaps) == len(self._instance.disks)
1327 for idx in range(len(self._instance.disks)):
1328 self._RemoveSnapshot(idx)
1329
1332 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1333 external_address):
1334 """Initializes this class.
1335
1336 @type cds: string
1337 @param cds: Cluster domain secret
1338 @type x509_cert_pem: string
1339 @param x509_cert_pem: CA used for signing import key
1340 @type disk_count: number
1341 @param disk_count: Number of disks
1342 @type external_address: string
1343 @param external_address: External address of destination node
1344
1345 """
1346 ImportExportCbBase.__init__(self)
1347 self._feedback_fn = feedback_fn
1348 self._cds = cds
1349 self._x509_cert_pem = x509_cert_pem
1350 self._disk_count = disk_count
1351 self._external_address = external_address
1352
1353 self._dresults = [None] * disk_count
1354 self._daemon_port = [None] * disk_count
1355
1356 self._salt = utils.GenerateSecret(8)
1357
1358 @property
1360 """Returns per-disk results.
1361
1362 """
1363 return self._dresults
1364
1366 """Checks whether all daemons are listening.
1367
1368 If all daemons are listening, the information is sent to the client.
1369
1370 """
1371 if not compat.all(dp is not None for dp in self._daemon_port):
1372 return
1373
1374 host = self._external_address
1375
1376 disks = []
1377 for idx, (port, magic) in enumerate(self._daemon_port):
1378 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1379 idx, host, port, magic))
1380
1381 assert len(disks) == self._disk_count
1382
1383 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1384 "disks": disks,
1385 "x509_ca": self._x509_cert_pem,
1386 })
1387
1389 """Called when daemon started listening.
1390
1391 """
1392 (idx, ) = private
1393
1394 self._feedback_fn("Disk %s is now listening" % idx)
1395
1396 assert self._daemon_port[idx] is None
1397
1398 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1399
1400 self._CheckAllListening()
1401
1403 """Called when a connection has been established.
1404
1405 """
1406 (idx, ) = private
1407
1408 self._feedback_fn("Disk %s is now receiving data" % idx)
1409
1411 """Called when a transfer has finished.
1412
1413 """
1414 (idx, ) = private
1415
1416
1417 self._daemon_port[idx] = None
1418
1419 if ie.success:
1420 self._feedback_fn("Disk %s finished receiving data" % idx)
1421 else:
1422 self._feedback_fn(("Disk %s failed to receive data: %s"
1423 " (recent output: %s)") %
1424 (idx, ie.final_message, ie.recent_output))
1425
1426 self._dresults[idx] = bool(ie.success)
1427
1428
1429 -def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1430 cds, timeouts):
1431 """Imports an instance from another cluster.
1432
1433 @param lu: Logical unit instance
1434 @param feedback_fn: Feedback function
1435 @type instance: L{objects.Instance}
1436 @param instance: Instance object
1437 @type pnode: L{objects.Node}
1438 @param pnode: Primary node of instance as an object
1439 @type source_x509_ca: OpenSSL.crypto.X509
1440 @param source_x509_ca: Import source's X509 CA
1441 @type cds: string
1442 @param cds: Cluster domain secret
1443 @type timeouts: L{ImportExportTimeouts}
1444 @param timeouts: Timeouts for this import
1445
1446 """
1447 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1448 source_x509_ca)
1449
1450 magic_base = utils.GenerateSecret(6)
1451
1452
1453 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1454
1455
1456 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1457 constants.RIE_CERT_VALIDITY)
1458 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1459
1460 (x509_key_name, x509_cert_pem) = result.payload
1461 try:
1462
1463 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1464 x509_cert_pem)
1465
1466
1467 signed_x509_cert_pem = \
1468 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1469
1470 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1471 len(instance.disks), pnode.primary_ip)
1472
1473 ieloop = ImportExportLoop(lu)
1474 try:
1475 for idx, dev in enumerate(instance.disks):
1476 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1477
1478
1479 opts = objects.ImportExportOptions(key_name=x509_key_name,
1480 ca_pem=source_ca_pem,
1481 magic=magic, ipv6=ipv6)
1482
1483 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1484 "disk%d" % idx,
1485 constants.IEIO_SCRIPT, (dev, idx),
1486 timeouts, cbs, private=(idx, )))
1487
1488 ieloop.Run()
1489 finally:
1490 ieloop.FinalizeAll()
1491 finally:
1492
1493 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1494 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1495
1496 return cbs.disk_results
1497
1500 """Returns the handshake message for a RIE protocol version.
1501
1502 @type version: number
1503
1504 """
1505 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1506
1518
1521 """Checks the handshake of a remote import/export.
1522
1523 @type cds: string
1524 @param cds: Cluster domain secret
1525 @type handshake: sequence
1526 @param handshake: Handshake sent by remote peer
1527
1528 """
1529 try:
1530 (version, hmac_digest, hmac_salt) = handshake
1531 except (TypeError, ValueError), err:
1532 return "Invalid data: %s" % err
1533
1534 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1535 hmac_digest, salt=hmac_salt):
1536 return "Hash didn't match, clusters don't share the same domain secret"
1537
1538 if version != constants.RIE_VERSION:
1539 return ("Clusters don't have the same remote import/export protocol"
1540 " (local=%s, remote=%s)" %
1541 (constants.RIE_VERSION, version))
1542
1543 return None
1544
1547 """Returns the hashed text for import/export disk information.
1548
1549 @type disk_index: number
1550 @param disk_index: Index of disk (included in hash)
1551 @type host: string
1552 @param host: Hostname
1553 @type port: number
1554 @param port: Daemon port
1555 @type magic: string
1556 @param magic: Magic value
1557
1558 """
1559 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1560
1563 """Verifies received disk information for an export.
1564
1565 @type cds: string
1566 @param cds: Cluster domain secret
1567 @type disk_index: number
1568 @param disk_index: Index of disk (included in hash)
1569 @type disk_info: sequence
1570 @param disk_info: Disk information sent by remote peer
1571
1572 """
1573 try:
1574 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1575 except (TypeError, ValueError), err:
1576 raise errors.GenericError("Invalid data: %s" % err)
1577
1578 if not (host and port and magic):
1579 raise errors.GenericError("Missing destination host, port or magic")
1580
1581 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1582
1583 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1584 raise errors.GenericError("HMAC is wrong")
1585
1586 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1587 destination = host
1588 else:
1589 destination = netutils.Hostname.GetNormalizedName(host)
1590
1591 return (destination,
1592 utils.ValidateServiceName(port),
1593 magic)
1594
1597 """Computes the signed disk information for a remote import.
1598
1599 @type cds: string
1600 @param cds: Cluster domain secret
1601 @type salt: string
1602 @param salt: HMAC salt
1603 @type disk_index: number
1604 @param disk_index: Index of disk (included in hash)
1605 @type host: string
1606 @param host: Hostname
1607 @type port: number
1608 @param port: Daemon port
1609 @type magic: string
1610 @param magic: Magic value
1611
1612 """
1613 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1614 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1615 return (host, port, magic, hmac_digest, salt)
1616
1623
1648