1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Instance-related functions and classes for masterd.
32
33 """
34
35 import logging
36 import time
37 import OpenSSL
38
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import compat
42 from ganeti import utils
43 from ganeti import objects
44 from ganeti import netutils
45 from ganeti import pathutils
49 """Local exception to report import/export errors.
50
51 """
52
55
56 DEFAULT_READY_TIMEOUT = 10
57
58
59 DEFAULT_ERROR_TIMEOUT = 10
60
61
62 DEFAULT_LISTEN_TIMEOUT = 10
63
64
65 DEFAULT_PROGRESS_INTERVAL = 60
66
67 __slots__ = [
68 "error",
69 "ready",
70 "listen",
71 "connect",
72 "progress",
73 ]
74
80 """Initializes this class.
81
82 @type connect: number
83 @param connect: Timeout for establishing connection
84 @type listen: number
85 @param listen: Timeout for starting to listen for connections
86 @type error: number
87 @param error: Length of time until errors cause hard failure
88 @type ready: number
89 @param ready: Timeout for daemon to become ready
90 @type progress: number
91 @param progress: Progress update interval
92
93 """
94 self.error = error
95 self.ready = ready
96 self.listen = listen
97 self.connect = connect
98 self.progress = progress
99
102 """Callbacks for disk import/export.
103
104 """
106 """Called when daemon started listening.
107
108 @type ie: Subclass of L{_DiskImportExportBase}
109 @param ie: Import/export object
110 @param private: Private data passed to import/export object
111 @param component: transfer component name
112
113 """
114
116 """Called when a connection has been established.
117
118 @type ie: Subclass of L{_DiskImportExportBase}
119 @param ie: Import/export object
120 @param private: Private data passed to import/export object
121
122 """
123
125 """Called when new progress information should be reported.
126
127 @type ie: Subclass of L{_DiskImportExportBase}
128 @param ie: Import/export object
129 @param private: Private data passed to import/export object
130
131 """
132
134 """Called when a transfer has finished.
135
136 @type ie: Subclass of L{_DiskImportExportBase}
137 @param ie: Import/export object
138 @param private: Private data passed to import/export object
139
140 """
141
144 MODE_TEXT = None
145
146 - def __init__(self, lu, node_uuid, opts,
147 instance, component, timeouts, cbs, private=None):
148 """Initializes this class.
149
150 @param lu: Logical unit instance
151 @type node_uuid: string
152 @param node_uuid: Node UUID for import
153 @type opts: L{objects.ImportExportOptions}
154 @param opts: Import/export daemon options
155 @type instance: L{objects.Instance}
156 @param instance: Instance object
157 @type component: string
158 @param component: which part of the instance is being imported
159 @type timeouts: L{ImportExportTimeouts}
160 @param timeouts: Timeouts for this import
161 @type cbs: L{ImportExportCbBase}
162 @param cbs: Callbacks
163 @param private: Private data for callback functions
164
165 """
166 assert self.MODE_TEXT
167
168 self._lu = lu
169 self.node_uuid = node_uuid
170 self.node_name = lu.cfg.GetNodeName(node_uuid)
171 self._opts = opts.Copy()
172 self._instance = instance
173 self._component = component
174 self._timeouts = timeouts
175 self._cbs = cbs
176 self._private = private
177
178
179 assert self._opts.connect_timeout is None
180 self._opts.connect_timeout = timeouts.connect
181
182
183 self._loop = None
184
185
186 self._ts_begin = None
187 self._ts_connected = None
188 self._ts_finished = None
189 self._ts_cleanup = None
190 self._ts_last_progress = None
191 self._ts_last_error = None
192
193
194 self.success = None
195 self.final_message = None
196
197
198 self._daemon_name = None
199 self._daemon = None
200
201 @property
203 """Returns the most recent output from the daemon.
204
205 """
206 if self._daemon:
207 return "\n".join(self._daemon.recent_output)
208
209 return None
210
211 @property
213 """Returns transfer progress information.
214
215 """
216 if not self._daemon:
217 return None
218
219 return (self._daemon.progress_mbytes,
220 self._daemon.progress_throughput,
221 self._daemon.progress_percent,
222 self._daemon.progress_eta)
223
224 @property
226 """Returns the magic value for this import/export.
227
228 """
229 return self._opts.magic
230
231 @property
233 """Determines whether this transport is still active.
234
235 """
236 return self.success is None
237
238 @property
240 """Returns parent loop.
241
242 @rtype: L{ImportExportLoop}
243
244 """
245 return self._loop
246
248 """Sets the parent loop.
249
250 @type loop: L{ImportExportLoop}
251
252 """
253 if self._loop:
254 raise errors.ProgrammerError("Loop can only be set once")
255
256 self._loop = loop
257
259 """Starts the import/export daemon.
260
261 """
262 raise NotImplementedError()
263
265 """Checks whether daemon has been started and if not, starts it.
266
267 @rtype: string
268 @return: Daemon name
269
270 """
271 assert self._ts_cleanup is None
272
273 if self._daemon_name is None:
274 assert self._ts_begin is None
275
276 result = self._StartDaemon()
277 if result.fail_msg:
278 raise _ImportExportError("Failed to start %s on %s: %s" %
279 (self.MODE_TEXT, self.node_name,
280 result.fail_msg))
281
282 daemon_name = result.payload
283
284 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
285 self.node_name)
286
287 self._ts_begin = time.time()
288 self._daemon_name = daemon_name
289
290 return self._daemon_name
291
293 """Returns the daemon name.
294
295 """
296 assert self._daemon_name, "Daemon has not been started"
297 assert self._ts_cleanup is None
298 return self._daemon_name
299
301 """Sends SIGTERM to import/export daemon (if still active).
302
303 """
304 if self._daemon_name:
305 self._lu.LogWarning("Aborting %s '%s' on %s",
306 self.MODE_TEXT, self._daemon_name, self.node_uuid)
307 result = self._lu.rpc.call_impexp_abort(self.node_uuid, self._daemon_name)
308 if result.fail_msg:
309 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
310 self.MODE_TEXT, self._daemon_name,
311 self.node_uuid, result.fail_msg)
312 return False
313
314 return True
315
317 """Internal function for updating status daemon data.
318
319 @type data: L{objects.ImportExportStatus}
320 @param data: Daemon status data
321
322 """
323 assert self._ts_begin is not None
324
325 if not data:
326 if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
327 raise _ImportExportError("Didn't become ready after %s seconds" %
328 self._timeouts.ready)
329
330 return False
331
332 self._daemon = data
333
334 return True
335
337 """Updates daemon status data.
338
339 @type success: bool
340 @param success: Whether fetching data was successful or not
341 @type data: L{objects.ImportExportStatus}
342 @param data: Daemon status data
343
344 """
345 if not success:
346 if self._ts_last_error is None:
347 self._ts_last_error = time.time()
348
349 elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
350 raise _ImportExportError("Too many errors while updating data")
351
352 return False
353
354 self._ts_last_error = None
355
356 return self._SetDaemonData(data)
357
359 """Checks whether the daemon is listening.
360
361 """
362 raise NotImplementedError()
363
365 """Returns timeout to calculate connect timeout.
366
367 """
368 raise NotImplementedError()
369
371 """Checks whether the daemon is connected.
372
373 @rtype: bool
374 @return: Whether the daemon is connected
375
376 """
377 assert self._daemon, "Daemon status missing"
378
379 if self._ts_connected is not None:
380 return True
381
382 if self._daemon.connected:
383 self._ts_connected = time.time()
384
385
386 logging.debug("%s '%s' on %s is now connected",
387 self.MODE_TEXT, self._daemon_name, self.node_uuid)
388
389 self._cbs.ReportConnected(self, self._private)
390
391 return True
392
393 if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
394 self._timeouts.connect):
395 raise _ImportExportError("Not connected after %s seconds" %
396 self._timeouts.connect)
397
398 return False
399
401 """Checks whether a progress update should be reported.
402
403 """
404 if ((self._ts_last_progress is None or
405 utils.TimeoutExpired(self._ts_last_progress,
406 self._timeouts.progress)) and
407 self._daemon and
408 self._daemon.progress_mbytes is not None and
409 self._daemon.progress_throughput is not None):
410 self._cbs.ReportProgress(self, self._private)
411 self._ts_last_progress = time.time()
412
414 """Checks whether the daemon exited.
415
416 @rtype: bool
417 @return: Whether the transfer is finished
418
419 """
420 assert self._daemon, "Daemon status missing"
421
422 if self._ts_finished:
423 return True
424
425 if self._daemon.exit_status is None:
426
427 self._CheckProgress()
428 return False
429
430 self._ts_finished = time.time()
431
432 self._ReportFinished(self._daemon.exit_status == 0,
433 self._daemon.error_message)
434
435 return True
436
438 """Transfer is finished or daemon exited.
439
440 @type success: bool
441 @param success: Whether the transfer was successful
442 @type message: string
443 @param message: Error message
444
445 """
446 assert self.success is None
447
448 self.success = success
449 self.final_message = message
450
451 if success:
452 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
453 self._daemon_name, self.node_uuid)
454 elif self._daemon_name:
455 self._lu.LogWarning("%s '%s' on %s failed: %s",
456 self.MODE_TEXT, self._daemon_name,
457 self._lu.cfg.GetNodeName(self.node_uuid),
458 message)
459 else:
460 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
461 self._lu.cfg.GetNodeName(self.node_uuid), message)
462
463 self._cbs.ReportFinished(self, self._private)
464
466 """Makes the RPC call to finalize this import/export.
467
468 """
469 return self._lu.rpc.call_impexp_cleanup(self.node_uuid, self._daemon_name)
470
472 """Finalizes this import/export.
473
474 """
475 if self._daemon_name:
476 logging.info("Finalizing %s '%s' on %s",
477 self.MODE_TEXT, self._daemon_name, self.node_uuid)
478
479 result = self._Finalize()
480 if result.fail_msg:
481 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
482 self.MODE_TEXT, self._daemon_name,
483 self.node_uuid, result.fail_msg)
484 return False
485
486
487 self._daemon_name = None
488 self._ts_cleanup = time.time()
489
490 if error:
491 self._ReportFinished(False, error)
492
493 return True
494
497 MODE_TEXT = "import"
498
499 - def __init__(self, lu, node_uuid, opts, instance, component,
500 dest, dest_args, timeouts, cbs, private=None):
501 """Initializes this class.
502
503 @param lu: Logical unit instance
504 @type node_uuid: string
505 @param node_uuid: Node name for import
506 @type opts: L{objects.ImportExportOptions}
507 @param opts: Import/export daemon options
508 @type instance: L{objects.Instance}
509 @param instance: Instance object
510 @type component: string
511 @param component: which part of the instance is being imported
512 @param dest: I/O destination
513 @param dest_args: I/O arguments
514 @type timeouts: L{ImportExportTimeouts}
515 @param timeouts: Timeouts for this import
516 @type cbs: L{ImportExportCbBase}
517 @param cbs: Callbacks
518 @param private: Private data for callback functions
519
520 """
521 _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
522 component, timeouts, cbs, private)
523 self._dest = dest
524 self._dest_args = dest_args
525
526
527 self._ts_listening = None
528
529 @property
531 """Returns the port the daemon is listening on.
532
533 """
534 if self._daemon:
535 return self._daemon.listen_port
536
537 return None
538
540 """Starts the import daemon.
541
542 """
543 return self._lu.rpc.call_import_start(self.node_uuid, self._opts,
544 self._instance, self._component,
545 (self._dest, self._dest_args))
546
548 """Checks whether the daemon is listening.
549
550 @rtype: bool
551 @return: Whether the daemon is listening
552
553 """
554 assert self._daemon, "Daemon status missing"
555
556 if self._ts_listening is not None:
557 return True
558
559 port = self._daemon.listen_port
560 if port is not None:
561 self._ts_listening = time.time()
562
563 logging.debug("Import '%s' on %s is now listening on port %s",
564 self._daemon_name, self.node_uuid, port)
565
566 self._cbs.ReportListening(self, self._private, self._component)
567
568 return True
569
570 if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
571 raise _ImportExportError("Not listening after %s seconds" %
572 self._timeouts.listen)
573
574 return False
575
577 """Returns the time since we started listening.
578
579 """
580 assert self._ts_listening is not None, \
581 ("Checking whether an import is connected is only useful"
582 " once it's been listening")
583
584 return self._ts_listening
585
588 MODE_TEXT = "export"
589
590 - def __init__(self, lu, node_uuid, opts, dest_host, dest_port,
591 instance, component, source, source_args,
592 timeouts, cbs, private=None):
593 """Initializes this class.
594
595 @param lu: Logical unit instance
596 @type node_uuid: string
597 @param node_uuid: Node UUID for import
598 @type opts: L{objects.ImportExportOptions}
599 @param opts: Import/export daemon options
600 @type dest_host: string
601 @param dest_host: Destination host name or IP address
602 @type dest_port: number
603 @param dest_port: Destination port number
604 @type instance: L{objects.Instance}
605 @param instance: Instance object
606 @type component: string
607 @param component: which part of the instance is being imported
608 @param source: I/O source
609 @param source_args: I/O source
610 @type timeouts: L{ImportExportTimeouts}
611 @param timeouts: Timeouts for this import
612 @type cbs: L{ImportExportCbBase}
613 @param cbs: Callbacks
614 @param private: Private data for callback functions
615
616 """
617 _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
618 component, timeouts, cbs, private)
619 self._dest_host = dest_host
620 self._dest_port = dest_port
621 self._source = source
622 self._source_args = source_args
623
625 """Starts the export daemon.
626
627 """
628 return self._lu.rpc.call_export_start(self.node_uuid, self._opts,
629 self._dest_host, self._dest_port,
630 self._instance, self._component,
631 (self._source, self._source_args))
632
634 """Checks whether the daemon is listening.
635
636 """
637
638 return True
639
641 """Returns the time since the daemon started.
642
643 """
644 assert self._ts_begin is not None
645
646 return self._ts_begin
647
669
672 MIN_DELAY = 1.0
673 MAX_DELAY = 20.0
674
676 """Initializes this class.
677
678 """
679 self._lu = lu
680 self._queue = []
681 self._pending_add = []
682
683 - def Add(self, diskie):
684 """Adds an import/export object to the loop.
685
686 @type diskie: Subclass of L{_DiskImportExportBase}
687 @param diskie: Import/export object
688
689 """
690 assert diskie not in self._pending_add
691 assert diskie.loop is None
692
693 diskie.SetLoop(self)
694
695
696
697
698 self._pending_add.append(diskie)
699
700 @staticmethod
702 """Collects the status for all import/export daemons.
703
704 """
705 daemon_status = {}
706
707 for node_name, names in daemons.iteritems():
708 result = lu.rpc.call_impexp_status(node_name, names)
709 if result.fail_msg:
710 lu.LogWarning("Failed to get daemon status on %s: %s",
711 node_name, result.fail_msg)
712 continue
713
714 assert len(names) == len(result.payload)
715
716 daemon_status[node_name] = dict(zip(names, result.payload))
717
718 return daemon_status
719
720 @staticmethod
722 """Gets the names of all active daemons.
723
724 """
725 result = {}
726 for diskie in queue:
727 if not diskie.active:
728 continue
729
730 try:
731
732 daemon_name = diskie.CheckDaemon()
733 except _ImportExportError, err:
734 logging.exception("%s failed", diskie.MODE_TEXT)
735 diskie.Finalize(error=str(err))
736 continue
737
738 result.setdefault(diskie.node_name, []).append(daemon_name)
739
740 assert len(queue) >= len(result)
741 assert len(queue) >= sum([len(names) for names in result.itervalues()])
742
743 logging.debug("daemons=%r", result)
744
745 return result
746
748 """Adds all pending import/export objects to the internal queue.
749
750 """
751 assert compat.all(diskie not in self._queue and diskie.loop == self
752 for diskie in self._pending_add)
753
754 self._queue.extend(self._pending_add)
755
756 del self._pending_add[:]
757
759 """Utility main loop.
760
761 """
762 while True:
763 self._AddPendingToQueue()
764
765
766 daemons = self._GetActiveDaemonNames(self._queue)
767 if not daemons:
768 break
769
770
771 data = self._CollectDaemonStatus(self._lu, daemons)
772
773
774 delay = self.MAX_DELAY
775 for diskie in self._queue:
776 if not diskie.active:
777 continue
778
779 try:
780 try:
781 all_daemon_data = data[diskie.node_name]
782 except KeyError:
783 result = diskie.SetDaemonData(False, None)
784 else:
785 result = \
786 diskie.SetDaemonData(True,
787 all_daemon_data[diskie.GetDaemonName()])
788
789 if not result:
790
791 delay = min(3.0, delay)
792 continue
793
794 if diskie.CheckFinished():
795
796 diskie.Finalize()
797 continue
798
799
800 delay = min(5.0, delay)
801
802 if not diskie.CheckListening():
803
804 delay = min(1.0, delay)
805 continue
806
807 if not diskie.CheckConnected():
808
809 delay = min(1.0, delay)
810 continue
811
812 except _ImportExportError, err:
813 logging.exception("%s failed", diskie.MODE_TEXT)
814 diskie.Finalize(error=str(err))
815
816 if not compat.any(diskie.active for diskie in self._queue):
817 break
818
819
820 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
821 logging.debug("Waiting for %ss", delay)
822 time.sleep(delay)
823
825 """Finalizes all pending transfers.
826
827 """
828 success = True
829
830 for diskie in self._queue:
831 success = diskie.Finalize() and success
832
833 return success
834
837 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node_uuid,
838 src_cbs, dest_node_uuid, dest_ip):
839 """Initializes this class.
840
841 """
842 ImportExportCbBase.__init__(self)
843
844 self.lu = lu
845 self.feedback_fn = feedback_fn
846 self.instance = instance
847 self.timeouts = timeouts
848 self.src_node_uuid = src_node_uuid
849 self.src_cbs = src_cbs
850 self.dest_node_uuid = dest_node_uuid
851 self.dest_ip = dest_ip
852
856 """Called when a connection has been established.
857
858 """
859 assert self.src_cbs is None
860 assert dtp.src_export == ie
861 assert dtp.dest_import
862
863 self.feedback_fn("%s is sending data on %s" %
864 (dtp.data.name, ie.node_name))
865
875
877 """Called when a transfer has finished.
878
879 """
880 assert self.src_cbs is None
881 assert dtp.src_export == ie
882 assert dtp.dest_import
883
884 if ie.success:
885 self.feedback_fn("%s finished sending data" % dtp.data.name)
886 else:
887 self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
888 (dtp.data.name, ie.final_message, ie.recent_output))
889
890 dtp.RecordResult(ie.success)
891
892 cb = dtp.data.finished_fn
893 if cb:
894 cb()
895
896
897
898 if dtp.dest_import and not ie.success:
899 dtp.dest_import.Abort()
900
904 """Called when daemon started listening.
905
906 """
907 assert self.src_cbs
908 assert dtp.src_export is None
909 assert dtp.dest_import
910 assert dtp.export_opts
911
912 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
913
914
915 de = DiskExport(self.lu, self.src_node_uuid, dtp.export_opts,
916 self.dest_ip, ie.listen_port, self.instance,
917 component, dtp.data.src_io, dtp.data.src_ioargs,
918 self.timeouts, self.src_cbs, private=dtp)
919 ie.loop.Add(de)
920
921 dtp.src_export = de
922
924 """Called when a connection has been established.
925
926 """
927 self.feedback_fn("%s is receiving data on %s" %
928 (dtp.data.name,
929 self.lu.cfg.GetNodeName(self.dest_node_uuid)))
930
932 """Called when a transfer has finished.
933
934 """
935 if ie.success:
936 self.feedback_fn("%s finished receiving data" % dtp.data.name)
937 else:
938 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
939 (dtp.data.name, ie.final_message, ie.recent_output))
940
941 dtp.RecordResult(ie.success)
942
943
944
945 if dtp.src_export and not ie.success:
946 dtp.src_export.Abort()
947
950 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
951 finished_fn):
952 """Initializes this class.
953
954 @type name: string
955 @param name: User-visible name for this transfer (e.g. "disk/0")
956 @param src_io: Source I/O type
957 @param src_ioargs: Source I/O arguments
958 @param dest_io: Destination I/O type
959 @param dest_ioargs: Destination I/O arguments
960 @type finished_fn: callable
961 @param finished_fn: Function called once transfer has finished
962
963 """
964 self.name = name
965
966 self.src_io = src_io
967 self.src_ioargs = src_ioargs
968
969 self.dest_io = dest_io
970 self.dest_ioargs = dest_ioargs
971
972 self.finished_fn = finished_fn
973
976 - def __init__(self, data, success, export_opts):
977 """Initializes this class.
978
979 @type data: L{DiskTransfer}
980 @type success: bool
981
982 """
983 self.data = data
984 self.success = success
985 self.export_opts = export_opts
986
987 self.src_export = None
988 self.dest_import = None
989
991 """Updates the status.
992
993 One failed part will cause the whole transfer to fail.
994
995 """
996 self.success = self.success and success
997
1000 """Computes the magic value for a disk export or import.
1001
1002 @type base: string
1003 @param base: Random seed value (can be the same for all disks of a transfer)
1004 @type instance_name: string
1005 @param instance_name: Name of instance
1006 @type index: number
1007 @param index: Disk index
1008
1009 """
1010 h = compat.sha1_hash()
1011 h.update(str(constants.RIE_VERSION))
1012 h.update(base)
1013 h.update(instance_name)
1014 h.update(str(index))
1015 return h.hexdigest()
1016
1017
1018 -def TransferInstanceData(lu, feedback_fn, src_node_uuid, dest_node_uuid,
1019 dest_ip, compress, instance, all_transfers):
1020 """Transfers an instance's data from one node to another.
1021
1022 @param lu: Logical unit instance
1023 @param feedback_fn: Feedback function
1024 @type src_node_uuid: string
1025 @param src_node_uuid: Source node UUID
1026 @type dest_node_uuid: string
1027 @param dest_node_uuid: Destination node UUID
1028 @type dest_ip: string
1029 @param dest_ip: IP address of destination node
1030 @type compress: string
1031 @param compress: Compression tool to use
1032 @type instance: L{objects.Instance}
1033 @param instance: Instance object
1034 @type all_transfers: list of L{DiskTransfer} instances
1035 @param all_transfers: List of all disk transfers to be made
1036 @rtype: list
1037 @return: List with a boolean (True=successful, False=failed) for success for
1038 each transfer
1039
1040 """
1041 src_node_name = lu.cfg.GetNodeName(src_node_uuid)
1042 dest_node_name = lu.cfg.GetNodeName(dest_node_uuid)
1043
1044 logging.debug("Source node %s, destination node %s, compression '%s'",
1045 src_node_name, dest_node_name, compress)
1046
1047 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1048 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1049 src_node_uuid, None, dest_node_uuid, dest_ip)
1050 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1051 src_node_uuid, src_cbs, dest_node_uuid,
1052 dest_ip)
1053
1054 all_dtp = []
1055
1056 base_magic = utils.GenerateSecret(6)
1057
1058 ieloop = ImportExportLoop(lu)
1059 try:
1060 for idx, transfer in enumerate(all_transfers):
1061 if transfer:
1062 feedback_fn("Exporting %s from %s to %s" %
1063 (transfer.name, src_node_name, dest_node_name))
1064
1065 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1066 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1067 compress=compress, magic=magic)
1068
1069 dtp = _DiskTransferPrivate(transfer, True, opts)
1070
1071 di = DiskImport(lu, dest_node_uuid, opts, instance, "disk%d" % idx,
1072 transfer.dest_io, transfer.dest_ioargs,
1073 timeouts, dest_cbs, private=dtp)
1074 ieloop.Add(di)
1075
1076 dtp.dest_import = di
1077 else:
1078 dtp = _DiskTransferPrivate(None, False, None)
1079
1080 all_dtp.append(dtp)
1081
1082 ieloop.Run()
1083 finally:
1084 ieloop.FinalizeAll()
1085
1086 assert len(all_dtp) == len(all_transfers)
1087 assert compat.all((dtp.src_export is None or
1088 dtp.src_export.success is not None) and
1089 (dtp.dest_import is None or
1090 dtp.dest_import.success is not None)
1091 for dtp in all_dtp), \
1092 "Not all imports/exports are finalized"
1093
1094 return [bool(dtp.success) for dtp in all_dtp]
1095
1098 - def __init__(self, feedback_fn, disk_count):
1099 """Initializes this class.
1100
1101 """
1102 ImportExportCbBase.__init__(self)
1103 self._feedback_fn = feedback_fn
1104 self._dresults = [None] * disk_count
1105
1106 @property
1108 """Returns per-disk results.
1109
1110 """
1111 return self._dresults
1112
1114 """Called when a connection has been established.
1115
1116 """
1117 (idx, _) = private
1118
1119 self._feedback_fn("Disk %s is now sending data" % idx)
1120
1122 """Called when new progress information should be reported.
1123
1124 """
1125 (idx, _) = private
1126
1127 progress = ie.progress
1128 if not progress:
1129 return
1130
1131 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1132
1134 """Called when a transfer has finished.
1135
1136 """
1137 (idx, finished_fn) = private
1138
1139 if ie.success:
1140 self._feedback_fn("Disk %s finished sending data" % idx)
1141 else:
1142 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1143 (idx, ie.final_message, ie.recent_output))
1144
1145 self._dresults[idx] = bool(ie.success)
1146
1147 if finished_fn:
1148 finished_fn()
1149
1152 - def __init__(self, lu, feedback_fn, instance):
1153 """Initializes this class.
1154
1155 @param lu: Logical unit instance
1156 @param feedback_fn: Feedback function
1157 @type instance: L{objects.Instance}
1158 @param instance: Instance object
1159
1160 """
1161 self._lu = lu
1162 self._feedback_fn = feedback_fn
1163 self._instance = instance
1164
1165 self._snapshots = [None] * len(instance.disks)
1166 self._snapshots_removed = [False] * len(instance.disks)
1167
1169 """Returns true if snapshots are ready to be used in exports.
1170
1171 """
1172 return all(self._snapshots)
1173
1175 """Attempts to create a snapshot for every disk of the instance.
1176
1177 Currently support drbd, plain and ext disk templates.
1178
1179 @rtype: bool
1180 @return: Whether following transfers can use snapshots
1181
1182 """
1183 if any(self._snapshots):
1184 raise errors.ProgrammerError("Snapshot creation was invoked more than "
1185 "once")
1186
1187 instance = self._instance
1188 inst_disks = self._lu.cfg.GetInstanceDisks(instance.uuid)
1189
1190
1191 if not all([d.SupportsSnapshots() for d in inst_disks]):
1192 return False
1193
1194 src_node = instance.primary_node
1195 src_node_name = self._lu.cfg.GetNodeName(src_node)
1196
1197 for idx, disk in enumerate(inst_disks):
1198 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1199 (idx, src_node_name))
1200
1201
1202
1203 result = self._lu.rpc.call_blockdev_snapshot(src_node,
1204 (disk, instance),
1205 None, None)
1206 msg = result.fail_msg
1207 if msg:
1208 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1209 idx, src_node_name, msg)
1210 elif (not isinstance(result.payload, (tuple, list)) or
1211 len(result.payload) != 2):
1212 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1213 " result '%s'", idx, src_node_name, result.payload)
1214 else:
1215 disk_id = tuple(result.payload)
1216
1217
1218 if disk.dev_type == constants.DT_EXT:
1219 dev_type = constants.DT_EXT
1220 else:
1221 dev_type = constants.DT_PLAIN
1222 disk_params = constants.DISK_LD_DEFAULTS[dev_type].copy()
1223 new_dev = objects.Disk(dev_type=dev_type, size=disk.size,
1224 logical_id=disk_id, iv_name=disk.iv_name,
1225 params=disk_params)
1226 new_dev.uuid = self._lu.cfg.GenerateUniqueID(self._lu.proc.GetECId())
1227
1228 self._snapshots[idx] = new_dev
1229 self._snapshots_removed[idx] = False
1230
1231
1232 if self._SnapshotsReady():
1233 return True
1234 else:
1235
1236
1237 self.Cleanup()
1238 return False
1239
1241 """Removes an LVM snapshot.
1242
1243 @type disk_index: number
1244 @param disk_index: Index of the snapshot to be removed
1245
1246 """
1247 snapshot = self._snapshots[disk_index]
1248 if snapshot is not None and not self._snapshots_removed[disk_index]:
1249 src_node_uuid = self._instance.primary_node
1250 src_node_name = self._lu.cfg.GetNodeName(src_node_uuid)
1251
1252 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1253 (disk_index, src_node_name))
1254
1255 result = self._lu.rpc.call_blockdev_remove(src_node_uuid,
1256 (snapshot, self._instance))
1257 if result.fail_msg:
1258 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1259 " %s: %s", disk_index, src_node_name,
1260 result.fail_msg)
1261 else:
1262 self._snapshots_removed[disk_index] = True
1263
1265 """Returns disks to be transferred, whether snapshots or instance disks.
1266
1267 @rtype: list of L{objects.Disk}
1268 @return: The disks to transfer
1269
1270 """
1271 if self._SnapshotsReady():
1272 return self._snapshots
1273 else:
1274 return self._lu.cfg.GetInstanceDisks(self._instance.uuid)
1275
1277 """Returns a label which should be used to represent a disk to transfer.
1278
1279 @type idx: int
1280 @param idx: The disk index
1281 @rtype: string
1282
1283 """
1284 if self._SnapshotsReady():
1285 return "snapshot/%d" % idx
1286 else:
1287 return "disk/%d" % idx
1288
1290 """Intra-cluster instance export.
1291
1292 @type dest_node: L{objects.Node}
1293 @param dest_node: Destination node
1294 @type compress: string
1295 @param compress: Compression tool to use
1296
1297 """
1298 disks_to_transfer = self._GetDisksToTransfer()
1299
1300 instance = self._instance
1301 src_node_uuid = instance.primary_node
1302
1303 transfers = []
1304
1305 for idx, dev in enumerate(disks_to_transfer):
1306 path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1307 dev.uuid)
1308
1309 finished_fn = compat.partial(self._TransferFinished, idx)
1310
1311 if instance.os:
1312 src_io = constants.IEIO_SCRIPT
1313 src_ioargs = ((dev, instance), idx)
1314 else:
1315 src_io = constants.IEIO_RAW_DISK
1316 src_ioargs = (dev, instance)
1317
1318
1319 dt = DiskTransfer(self._GetDiskLabel(idx), src_io, src_ioargs,
1320 constants.IEIO_FILE, (path, ), finished_fn)
1321 transfers.append(dt)
1322
1323
1324 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1325 src_node_uuid, dest_node.uuid,
1326 dest_node.secondary_ip,
1327 compress,
1328 instance, transfers)
1329
1330 assert len(dresults) == len(instance.disks)
1331
1332
1333 if all(dresults):
1334 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1335 result = self._lu.rpc.call_finalize_export(dest_node.uuid, instance,
1336 disks_to_transfer)
1337 msg = result.fail_msg
1338 fin_resu = not msg
1339 if msg:
1340 self._lu.LogWarning("Could not finalize export for instance %s"
1341 " on node %s: %s", instance.name, dest_node.name,
1342 msg)
1343 else:
1344 fin_resu = False
1345 self._lu.LogWarning("Some disk exports have failed; there may be "
1346 "leftover data for instance %s on node %s",
1347 instance.name, dest_node.name)
1348
1349 return (fin_resu, dresults)
1350
1351 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, compress, timeouts):
1352 """Inter-cluster instance export.
1353
1354 @type disk_info: list
1355 @param disk_info: Per-disk destination information
1356 @type key_name: string
1357 @param key_name: Name of X509 key to use
1358 @type dest_ca_pem: string
1359 @param dest_ca_pem: Destination X509 CA in PEM format
1360 @type compress: string
1361 @param compress: Compression tool to use
1362 @type timeouts: L{ImportExportTimeouts}
1363 @param timeouts: Timeouts for this import
1364
1365 """
1366 instance = self._instance
1367 disks_to_transfer = self._GetDisksToTransfer()
1368
1369 assert len(disk_info) == len(disks_to_transfer)
1370
1371 cbs = _RemoteExportCb(self._feedback_fn, len(disks_to_transfer))
1372
1373 ieloop = ImportExportLoop(self._lu)
1374 try:
1375 for idx, (dev, (host, port, magic)) in enumerate(zip(disks_to_transfer,
1376 disk_info)):
1377
1378 ipv6 = netutils.IP6Address.IsValid(host)
1379
1380 opts = objects.ImportExportOptions(key_name=key_name,
1381 ca_pem=dest_ca_pem,
1382 magic=magic,
1383 compress=compress,
1384 ipv6=ipv6)
1385
1386 if instance.os:
1387 src_io = constants.IEIO_SCRIPT
1388 src_ioargs = ((dev, instance), idx)
1389 else:
1390 src_io = constants.IEIO_RAW_DISK
1391 src_ioargs = (dev, instance)
1392
1393 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1394 finished_fn = compat.partial(self._TransferFinished, idx)
1395 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1396 opts, host, port, instance, "disk%d" % idx,
1397 src_io, src_ioargs,
1398 timeouts, cbs, private=(idx, finished_fn)))
1399
1400 ieloop.Run()
1401 finally:
1402 ieloop.FinalizeAll()
1403
1404 return (True, cbs.disk_results)
1405
1407 """Called once a transfer has finished.
1408
1409 @type idx: number
1410 @param idx: Disk index
1411
1412 """
1413 logging.debug("Transfer %s finished", idx)
1414 self._RemoveSnapshot(idx)
1415
1417 """Remove all snapshots.
1418
1419 """
1420 for idx in range(len(self._snapshots)):
1421 self._RemoveSnapshot(idx)
1422
1425 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1426 external_address):
1427 """Initializes this class.
1428
1429 @type cds: string
1430 @param cds: Cluster domain secret
1431 @type x509_cert_pem: string
1432 @param x509_cert_pem: CA used for signing import key
1433 @type disk_count: number
1434 @param disk_count: Number of disks
1435 @type external_address: string
1436 @param external_address: External address of destination node
1437
1438 """
1439 ImportExportCbBase.__init__(self)
1440 self._feedback_fn = feedback_fn
1441 self._cds = cds
1442 self._x509_cert_pem = x509_cert_pem
1443 self._disk_count = disk_count
1444 self._external_address = external_address
1445
1446 self._dresults = [None] * disk_count
1447 self._daemon_port = [None] * disk_count
1448
1449 self._salt = utils.GenerateSecret(8)
1450
1451 @property
1453 """Returns per-disk results.
1454
1455 """
1456 return self._dresults
1457
1459 """Checks whether all daemons are listening.
1460
1461 If all daemons are listening, the information is sent to the client.
1462
1463 """
1464 if not compat.all(dp is not None for dp in self._daemon_port):
1465 return
1466
1467 host = self._external_address
1468
1469 disks = []
1470 for idx, (port, magic) in enumerate(self._daemon_port):
1471 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1472 idx, host, port, magic))
1473
1474 assert len(disks) == self._disk_count
1475
1476 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1477 "disks": disks,
1478 "x509_ca": self._x509_cert_pem,
1479 })
1480
1482 """Called when daemon started listening.
1483
1484 """
1485 (idx, ) = private
1486
1487 self._feedback_fn("Disk %s is now listening" % idx)
1488
1489 assert self._daemon_port[idx] is None
1490
1491 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1492
1493 self._CheckAllListening()
1494
1496 """Called when a connection has been established.
1497
1498 """
1499 (idx, ) = private
1500
1501 self._feedback_fn("Disk %s is now receiving data" % idx)
1502
1504 """Called when a transfer has finished.
1505
1506 """
1507 (idx, ) = private
1508
1509
1510 self._daemon_port[idx] = None
1511
1512 if ie.success:
1513 self._feedback_fn("Disk %s finished receiving data" % idx)
1514 else:
1515 self._feedback_fn(("Disk %s failed to receive data: %s"
1516 " (recent output: %s)") %
1517 (idx, ie.final_message, ie.recent_output))
1518
1519 self._dresults[idx] = bool(ie.success)
1520
1521
1522 -def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1523 cds, compress, timeouts):
1524 """Imports an instance from another cluster.
1525
1526 @param lu: Logical unit instance
1527 @param feedback_fn: Feedback function
1528 @type instance: L{objects.Instance}
1529 @param instance: Instance object
1530 @type pnode: L{objects.Node}
1531 @param pnode: Primary node of instance as an object
1532 @type source_x509_ca: OpenSSL.crypto.X509
1533 @param source_x509_ca: Import source's X509 CA
1534 @type cds: string
1535 @param cds: Cluster domain secret
1536 @type compress: string
1537 @param compress: Compression tool to use
1538 @type timeouts: L{ImportExportTimeouts}
1539 @param timeouts: Timeouts for this import
1540
1541 """
1542 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1543 source_x509_ca)
1544
1545 magic_base = utils.GenerateSecret(6)
1546
1547
1548 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1549
1550
1551 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1552 constants.RIE_CERT_VALIDITY)
1553 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1554
1555 (x509_key_name, x509_cert_pem) = result.payload
1556 try:
1557
1558 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1559 x509_cert_pem)
1560
1561
1562 signed_x509_cert_pem = \
1563 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1564
1565 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1566 len(instance.disks), pnode.primary_ip)
1567
1568 ieloop = ImportExportLoop(lu)
1569 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1570 try:
1571 for idx, dev in enumerate(inst_disks):
1572 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1573
1574
1575 opts = objects.ImportExportOptions(key_name=x509_key_name,
1576 ca_pem=source_ca_pem,
1577 magic=magic,
1578 compress=compress,
1579 ipv6=ipv6)
1580
1581 if instance.os:
1582 src_io = constants.IEIO_SCRIPT
1583 src_ioargs = ((dev, instance), idx)
1584 else:
1585 src_io = constants.IEIO_RAW_DISK
1586 src_ioargs = (dev, instance)
1587
1588 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1589 "disk%d" % idx,
1590 src_io, src_ioargs,
1591 timeouts, cbs, private=(idx, )))
1592
1593 ieloop.Run()
1594 finally:
1595 ieloop.FinalizeAll()
1596 finally:
1597
1598 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1599 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1600
1601 return cbs.disk_results
1602
1605 """Returns the handshake message for a RIE protocol version.
1606
1607 @type version: number
1608
1609 """
1610 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1611
1623
1626 """Checks the handshake of a remote import/export.
1627
1628 @type cds: string
1629 @param cds: Cluster domain secret
1630 @type handshake: sequence
1631 @param handshake: Handshake sent by remote peer
1632
1633 """
1634 try:
1635 (version, hmac_digest, hmac_salt) = handshake
1636 except (TypeError, ValueError), err:
1637 return "Invalid data: %s" % err
1638
1639 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1640 hmac_digest, salt=hmac_salt):
1641 return "Hash didn't match, clusters don't share the same domain secret"
1642
1643 if version != constants.RIE_VERSION:
1644 return ("Clusters don't have the same remote import/export protocol"
1645 " (local=%s, remote=%s)" %
1646 (constants.RIE_VERSION, version))
1647
1648 return None
1649
1652 """Returns the hashed text for import/export disk information.
1653
1654 @type disk_index: number
1655 @param disk_index: Index of disk (included in hash)
1656 @type host: string
1657 @param host: Hostname
1658 @type port: number
1659 @param port: Daemon port
1660 @type magic: string
1661 @param magic: Magic value
1662
1663 """
1664 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1665
1668 """Verifies received disk information for an export.
1669
1670 @type cds: string
1671 @param cds: Cluster domain secret
1672 @type disk_index: number
1673 @param disk_index: Index of disk (included in hash)
1674 @type disk_info: sequence
1675 @param disk_info: Disk information sent by remote peer
1676
1677 """
1678 try:
1679 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1680 except (TypeError, ValueError), err:
1681 raise errors.GenericError("Invalid data: %s" % err)
1682
1683 if not (host and port and magic):
1684 raise errors.GenericError("Missing destination host, port or magic")
1685
1686 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1687
1688 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1689 raise errors.GenericError("HMAC is wrong")
1690
1691 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1692 destination = host
1693 else:
1694 destination = netutils.Hostname.GetNormalizedName(host)
1695
1696 return (destination,
1697 utils.ValidateServiceName(port),
1698 magic)
1699
1702 """Computes the signed disk information for a remote import.
1703
1704 @type cds: string
1705 @param cds: Cluster domain secret
1706 @type salt: string
1707 @param salt: HMAC salt
1708 @type disk_index: number
1709 @param disk_index: Index of disk (included in hash)
1710 @type host: string
1711 @param host: Hostname
1712 @type port: number
1713 @param port: Daemon port
1714 @type magic: string
1715 @param magic: Magic value
1716
1717 """
1718 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1719 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1720 return (host, port, magic, hmac_digest, salt)
1721
1728
1754
1755 return sum(map(size_f, disks))
1756