1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
36 from ganeti import pathutils
40 """Local exception to report import/export errors.
41
42 """
43
46
47 DEFAULT_READY_TIMEOUT = 10
48
49
50 DEFAULT_ERROR_TIMEOUT = 10
51
52
53 DEFAULT_LISTEN_TIMEOUT = 10
54
55
56 DEFAULT_PROGRESS_INTERVAL = 60
57
58 __slots__ = [
59 "error",
60 "ready",
61 "listen",
62 "connect",
63 "progress",
64 ]
65
71 """Initializes this class.
72
73 @type connect: number
74 @param connect: Timeout for establishing connection
75 @type listen: number
76 @param listen: Timeout for starting to listen for connections
77 @type error: number
78 @param error: Length of time until errors cause hard failure
79 @type ready: number
80 @param ready: Timeout for daemon to become ready
81 @type progress: number
82 @param progress: Progress update interval
83
84 """
85 self.error = error
86 self.ready = ready
87 self.listen = listen
88 self.connect = connect
89 self.progress = progress
90
93 """Callbacks for disk import/export.
94
95 """
97 """Called when daemon started listening.
98
99 @type ie: Subclass of L{_DiskImportExportBase}
100 @param ie: Import/export object
101 @param private: Private data passed to import/export object
102 @param component: transfer component name
103
104 """
105
107 """Called when a connection has been established.
108
109 @type ie: Subclass of L{_DiskImportExportBase}
110 @param ie: Import/export object
111 @param private: Private data passed to import/export object
112
113 """
114
116 """Called when new progress information should be reported.
117
118 @type ie: Subclass of L{_DiskImportExportBase}
119 @param ie: Import/export object
120 @param private: Private data passed to import/export object
121
122 """
123
125 """Called when a transfer has finished.
126
127 @type ie: Subclass of L{_DiskImportExportBase}
128 @param ie: Import/export object
129 @param private: Private data passed to import/export object
130
131 """
132
135 MODE_TEXT = None
136
137 - def __init__(self, lu, node_name, opts,
138 instance, component, timeouts, cbs, private=None):
139 """Initializes this class.
140
141 @param lu: Logical unit instance
142 @type node_name: string
143 @param node_name: Node name for import
144 @type opts: L{objects.ImportExportOptions}
145 @param opts: Import/export daemon options
146 @type instance: L{objects.Instance}
147 @param instance: Instance object
148 @type component: string
149 @param component: which part of the instance is being imported
150 @type timeouts: L{ImportExportTimeouts}
151 @param timeouts: Timeouts for this import
152 @type cbs: L{ImportExportCbBase}
153 @param cbs: Callbacks
154 @param private: Private data for callback functions
155
156 """
157 assert self.MODE_TEXT
158
159 self._lu = lu
160 self.node_name = node_name
161 self._opts = opts.Copy()
162 self._instance = instance
163 self._component = component
164 self._timeouts = timeouts
165 self._cbs = cbs
166 self._private = private
167
168
169 assert self._opts.connect_timeout is None
170 self._opts.connect_timeout = timeouts.connect
171
172
173 self._loop = None
174
175
176 self._ts_begin = None
177 self._ts_connected = None
178 self._ts_finished = None
179 self._ts_cleanup = None
180 self._ts_last_progress = None
181 self._ts_last_error = None
182
183
184 self.success = None
185 self.final_message = None
186
187
188 self._daemon_name = None
189 self._daemon = None
190
191 @property
193 """Returns the most recent output from the daemon.
194
195 """
196 if self._daemon:
197 return "\n".join(self._daemon.recent_output)
198
199 return None
200
201 @property
203 """Returns transfer progress information.
204
205 """
206 if not self._daemon:
207 return None
208
209 return (self._daemon.progress_mbytes,
210 self._daemon.progress_throughput,
211 self._daemon.progress_percent,
212 self._daemon.progress_eta)
213
214 @property
216 """Returns the magic value for this import/export.
217
218 """
219 return self._opts.magic
220
221 @property
223 """Determines whether this transport is still active.
224
225 """
226 return self.success is None
227
228 @property
230 """Returns parent loop.
231
232 @rtype: L{ImportExportLoop}
233
234 """
235 return self._loop
236
238 """Sets the parent loop.
239
240 @type loop: L{ImportExportLoop}
241
242 """
243 if self._loop:
244 raise errors.ProgrammerError("Loop can only be set once")
245
246 self._loop = loop
247
249 """Starts the import/export daemon.
250
251 """
252 raise NotImplementedError()
253
255 """Checks whether daemon has been started and if not, starts it.
256
257 @rtype: string
258 @return: Daemon name
259
260 """
261 assert self._ts_cleanup is None
262
263 if self._daemon_name is None:
264 assert self._ts_begin is None
265
266 result = self._StartDaemon()
267 if result.fail_msg:
268 raise _ImportExportError("Failed to start %s on %s: %s" %
269 (self.MODE_TEXT, self.node_name,
270 result.fail_msg))
271
272 daemon_name = result.payload
273
274 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
275 self.node_name)
276
277 self._ts_begin = time.time()
278 self._daemon_name = daemon_name
279
280 return self._daemon_name
281
283 """Returns the daemon name.
284
285 """
286 assert self._daemon_name, "Daemon has not been started"
287 assert self._ts_cleanup is None
288 return self._daemon_name
289
291 """Sends SIGTERM to import/export daemon (if still active).
292
293 """
294 if self._daemon_name:
295 self._lu.LogWarning("Aborting %s '%s' on %s",
296 self.MODE_TEXT, self._daemon_name, self.node_name)
297 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
298 if result.fail_msg:
299 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
300 self.MODE_TEXT, self._daemon_name,
301 self.node_name, result.fail_msg)
302 return False
303
304 return True
305
307 """Internal function for updating status daemon data.
308
309 @type data: L{objects.ImportExportStatus}
310 @param data: Daemon status data
311
312 """
313 assert self._ts_begin is not None
314
315 if not data:
316 if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
317 raise _ImportExportError("Didn't become ready after %s seconds" %
318 self._timeouts.ready)
319
320 return False
321
322 self._daemon = data
323
324 return True
325
327 """Updates daemon status data.
328
329 @type success: bool
330 @param success: Whether fetching data was successful or not
331 @type data: L{objects.ImportExportStatus}
332 @param data: Daemon status data
333
334 """
335 if not success:
336 if self._ts_last_error is None:
337 self._ts_last_error = time.time()
338
339 elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
340 raise _ImportExportError("Too many errors while updating data")
341
342 return False
343
344 self._ts_last_error = None
345
346 return self._SetDaemonData(data)
347
349 """Checks whether the daemon is listening.
350
351 """
352 raise NotImplementedError()
353
355 """Returns timeout to calculate connect timeout.
356
357 """
358 raise NotImplementedError()
359
361 """Checks whether the daemon is connected.
362
363 @rtype: bool
364 @return: Whether the daemon is connected
365
366 """
367 assert self._daemon, "Daemon status missing"
368
369 if self._ts_connected is not None:
370 return True
371
372 if self._daemon.connected:
373 self._ts_connected = time.time()
374
375
376 logging.debug("%s '%s' on %s is now connected",
377 self.MODE_TEXT, self._daemon_name, self.node_name)
378
379 self._cbs.ReportConnected(self, self._private)
380
381 return True
382
383 if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
384 self._timeouts.connect):
385 raise _ImportExportError("Not connected after %s seconds" %
386 self._timeouts.connect)
387
388 return False
389
391 """Checks whether a progress update should be reported.
392
393 """
394 if ((self._ts_last_progress is None or
395 utils.TimeoutExpired(self._ts_last_progress,
396 self._timeouts.progress)) and
397 self._daemon and
398 self._daemon.progress_mbytes is not None and
399 self._daemon.progress_throughput is not None):
400 self._cbs.ReportProgress(self, self._private)
401 self._ts_last_progress = time.time()
402
404 """Checks whether the daemon exited.
405
406 @rtype: bool
407 @return: Whether the transfer is finished
408
409 """
410 assert self._daemon, "Daemon status missing"
411
412 if self._ts_finished:
413 return True
414
415 if self._daemon.exit_status is None:
416
417 self._CheckProgress()
418 return False
419
420 self._ts_finished = time.time()
421
422 self._ReportFinished(self._daemon.exit_status == 0,
423 self._daemon.error_message)
424
425 return True
426
428 """Transfer is finished or daemon exited.
429
430 @type success: bool
431 @param success: Whether the transfer was successful
432 @type message: string
433 @param message: Error message
434
435 """
436 assert self.success is None
437
438 self.success = success
439 self.final_message = message
440
441 if success:
442 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
443 self._daemon_name, self.node_name)
444 elif self._daemon_name:
445 self._lu.LogWarning("%s '%s' on %s failed: %s",
446 self.MODE_TEXT, self._daemon_name, self.node_name,
447 message)
448 else:
449 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
450 self.node_name, message)
451
452 self._cbs.ReportFinished(self, self._private)
453
455 """Makes the RPC call to finalize this import/export.
456
457 """
458 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
459
461 """Finalizes this import/export.
462
463 """
464 if self._daemon_name:
465 logging.info("Finalizing %s '%s' on %s",
466 self.MODE_TEXT, self._daemon_name, self.node_name)
467
468 result = self._Finalize()
469 if result.fail_msg:
470 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
471 self.MODE_TEXT, self._daemon_name,
472 self.node_name, result.fail_msg)
473 return False
474
475
476 self._daemon_name = None
477 self._ts_cleanup = time.time()
478
479 if error:
480 self._ReportFinished(False, error)
481
482 return True
483
486 MODE_TEXT = "import"
487
488 - def __init__(self, lu, node_name, opts, instance, component,
489 dest, dest_args, timeouts, cbs, private=None):
490 """Initializes this class.
491
492 @param lu: Logical unit instance
493 @type node_name: string
494 @param node_name: Node name for import
495 @type opts: L{objects.ImportExportOptions}
496 @param opts: Import/export daemon options
497 @type instance: L{objects.Instance}
498 @param instance: Instance object
499 @type component: string
500 @param component: which part of the instance is being imported
501 @param dest: I/O destination
502 @param dest_args: I/O arguments
503 @type timeouts: L{ImportExportTimeouts}
504 @param timeouts: Timeouts for this import
505 @type cbs: L{ImportExportCbBase}
506 @param cbs: Callbacks
507 @param private: Private data for callback functions
508
509 """
510 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
511 component, timeouts, cbs, private)
512 self._dest = dest
513 self._dest_args = dest_args
514
515
516 self._ts_listening = None
517
518 @property
520 """Returns the port the daemon is listening on.
521
522 """
523 if self._daemon:
524 return self._daemon.listen_port
525
526 return None
527
529 """Starts the import daemon.
530
531 """
532 return self._lu.rpc.call_import_start(self.node_name, self._opts,
533 self._instance, self._component,
534 (self._dest, self._dest_args))
535
537 """Checks whether the daemon is listening.
538
539 @rtype: bool
540 @return: Whether the daemon is listening
541
542 """
543 assert self._daemon, "Daemon status missing"
544
545 if self._ts_listening is not None:
546 return True
547
548 port = self._daemon.listen_port
549 if port is not None:
550 self._ts_listening = time.time()
551
552 logging.debug("Import '%s' on %s is now listening on port %s",
553 self._daemon_name, self.node_name, port)
554
555 self._cbs.ReportListening(self, self._private, self._component)
556
557 return True
558
559 if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
560 raise _ImportExportError("Not listening after %s seconds" %
561 self._timeouts.listen)
562
563 return False
564
566 """Returns the time since we started listening.
567
568 """
569 assert self._ts_listening is not None, \
570 ("Checking whether an import is connected is only useful"
571 " once it's been listening")
572
573 return self._ts_listening
574
577 MODE_TEXT = "export"
578
579 - def __init__(self, lu, node_name, opts, dest_host, dest_port,
580 instance, component, source, source_args,
581 timeouts, cbs, private=None):
582 """Initializes this class.
583
584 @param lu: Logical unit instance
585 @type node_name: string
586 @param node_name: Node name for import
587 @type opts: L{objects.ImportExportOptions}
588 @param opts: Import/export daemon options
589 @type dest_host: string
590 @param dest_host: Destination host name or IP address
591 @type dest_port: number
592 @param dest_port: Destination port number
593 @type instance: L{objects.Instance}
594 @param instance: Instance object
595 @type component: string
596 @param component: which part of the instance is being imported
597 @param source: I/O source
598 @param source_args: I/O source
599 @type timeouts: L{ImportExportTimeouts}
600 @param timeouts: Timeouts for this import
601 @type cbs: L{ImportExportCbBase}
602 @param cbs: Callbacks
603 @param private: Private data for callback functions
604
605 """
606 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
607 component, timeouts, cbs, private)
608 self._dest_host = dest_host
609 self._dest_port = dest_port
610 self._source = source
611 self._source_args = source_args
612
614 """Starts the export daemon.
615
616 """
617 return self._lu.rpc.call_export_start(self.node_name, self._opts,
618 self._dest_host, self._dest_port,
619 self._instance, self._component,
620 (self._source, self._source_args))
621
623 """Checks whether the daemon is listening.
624
625 """
626
627 return True
628
630 """Returns the time since the daemon started.
631
632 """
633 assert self._ts_begin is not None
634
635 return self._ts_begin
636
658
661 MIN_DELAY = 1.0
662 MAX_DELAY = 20.0
663
665 """Initializes this class.
666
667 """
668 self._lu = lu
669 self._queue = []
670 self._pending_add = []
671
672 - def Add(self, diskie):
673 """Adds an import/export object to the loop.
674
675 @type diskie: Subclass of L{_DiskImportExportBase}
676 @param diskie: Import/export object
677
678 """
679 assert diskie not in self._pending_add
680 assert diskie.loop is None
681
682 diskie.SetLoop(self)
683
684
685
686
687 self._pending_add.append(diskie)
688
689 @staticmethod
691 """Collects the status for all import/export daemons.
692
693 """
694 daemon_status = {}
695
696 for node_name, names in daemons.iteritems():
697 result = lu.rpc.call_impexp_status(node_name, names)
698 if result.fail_msg:
699 lu.LogWarning("Failed to get daemon status on %s: %s",
700 node_name, result.fail_msg)
701 continue
702
703 assert len(names) == len(result.payload)
704
705 daemon_status[node_name] = dict(zip(names, result.payload))
706
707 return daemon_status
708
709 @staticmethod
711 """Gets the names of all active daemons.
712
713 """
714 result = {}
715 for diskie in queue:
716 if not diskie.active:
717 continue
718
719 try:
720
721 daemon_name = diskie.CheckDaemon()
722 except _ImportExportError, err:
723 logging.exception("%s failed", diskie.MODE_TEXT)
724 diskie.Finalize(error=str(err))
725 continue
726
727 result.setdefault(diskie.node_name, []).append(daemon_name)
728
729 assert len(queue) >= len(result)
730 assert len(queue) >= sum([len(names) for names in result.itervalues()])
731
732 logging.debug("daemons=%r", result)
733
734 return result
735
737 """Adds all pending import/export objects to the internal queue.
738
739 """
740 assert compat.all(diskie not in self._queue and diskie.loop == self
741 for diskie in self._pending_add)
742
743 self._queue.extend(self._pending_add)
744
745 del self._pending_add[:]
746
748 """Utility main loop.
749
750 """
751 while True:
752 self._AddPendingToQueue()
753
754
755 daemons = self._GetActiveDaemonNames(self._queue)
756 if not daemons:
757 break
758
759
760 data = self._CollectDaemonStatus(self._lu, daemons)
761
762
763 delay = self.MAX_DELAY
764 for diskie in self._queue:
765 if not diskie.active:
766 continue
767
768 try:
769 try:
770 all_daemon_data = data[diskie.node_name]
771 except KeyError:
772 result = diskie.SetDaemonData(False, None)
773 else:
774 result = \
775 diskie.SetDaemonData(True,
776 all_daemon_data[diskie.GetDaemonName()])
777
778 if not result:
779
780 delay = min(3.0, delay)
781 continue
782
783 if diskie.CheckFinished():
784
785 diskie.Finalize()
786 continue
787
788
789 delay = min(5.0, delay)
790
791 if not diskie.CheckListening():
792
793 delay = min(1.0, delay)
794 continue
795
796 if not diskie.CheckConnected():
797
798 delay = min(1.0, delay)
799 continue
800
801 except _ImportExportError, err:
802 logging.exception("%s failed", diskie.MODE_TEXT)
803 diskie.Finalize(error=str(err))
804
805 if not compat.any(diskie.active for diskie in self._queue):
806 break
807
808
809 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
810 logging.debug("Waiting for %ss", delay)
811 time.sleep(delay)
812
814 """Finalizes all pending transfers.
815
816 """
817 success = True
818
819 for diskie in self._queue:
820 success = diskie.Finalize() and success
821
822 return success
823
826 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
827 dest_node, dest_ip):
828 """Initializes this class.
829
830 """
831 ImportExportCbBase.__init__(self)
832
833 self.lu = lu
834 self.feedback_fn = feedback_fn
835 self.instance = instance
836 self.timeouts = timeouts
837 self.src_node = src_node
838 self.src_cbs = src_cbs
839 self.dest_node = dest_node
840 self.dest_ip = dest_ip
841
845 """Called when a connection has been established.
846
847 """
848 assert self.src_cbs is None
849 assert dtp.src_export == ie
850 assert dtp.dest_import
851
852 self.feedback_fn("%s is sending data on %s" %
853 (dtp.data.name, ie.node_name))
854
864
866 """Called when a transfer has finished.
867
868 """
869 assert self.src_cbs is None
870 assert dtp.src_export == ie
871 assert dtp.dest_import
872
873 if ie.success:
874 self.feedback_fn("%s finished sending data" % dtp.data.name)
875 else:
876 self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
877 (dtp.data.name, ie.final_message, ie.recent_output))
878
879 dtp.RecordResult(ie.success)
880
881 cb = dtp.data.finished_fn
882 if cb:
883 cb()
884
885
886
887 if dtp.dest_import and not ie.success:
888 dtp.dest_import.Abort()
889
893 """Called when daemon started listening.
894
895 """
896 assert self.src_cbs
897 assert dtp.src_export is None
898 assert dtp.dest_import
899 assert dtp.export_opts
900
901 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
902
903
904 de = DiskExport(self.lu, self.src_node, dtp.export_opts,
905 self.dest_ip, ie.listen_port, self.instance,
906 component, dtp.data.src_io, dtp.data.src_ioargs,
907 self.timeouts, self.src_cbs, private=dtp)
908 ie.loop.Add(de)
909
910 dtp.src_export = de
911
913 """Called when a connection has been established.
914
915 """
916 self.feedback_fn("%s is receiving data on %s" %
917 (dtp.data.name, self.dest_node))
918
920 """Called when a transfer has finished.
921
922 """
923 if ie.success:
924 self.feedback_fn("%s finished receiving data" % dtp.data.name)
925 else:
926 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
927 (dtp.data.name, ie.final_message, ie.recent_output))
928
929 dtp.RecordResult(ie.success)
930
931
932
933 if dtp.src_export and not ie.success:
934 dtp.src_export.Abort()
935
938 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
939 finished_fn):
940 """Initializes this class.
941
942 @type name: string
943 @param name: User-visible name for this transfer (e.g. "disk/0")
944 @param src_io: Source I/O type
945 @param src_ioargs: Source I/O arguments
946 @param dest_io: Destination I/O type
947 @param dest_ioargs: Destination I/O arguments
948 @type finished_fn: callable
949 @param finished_fn: Function called once transfer has finished
950
951 """
952 self.name = name
953
954 self.src_io = src_io
955 self.src_ioargs = src_ioargs
956
957 self.dest_io = dest_io
958 self.dest_ioargs = dest_ioargs
959
960 self.finished_fn = finished_fn
961
964 - def __init__(self, data, success, export_opts):
965 """Initializes this class.
966
967 @type data: L{DiskTransfer}
968 @type success: bool
969
970 """
971 self.data = data
972 self.success = success
973 self.export_opts = export_opts
974
975 self.src_export = None
976 self.dest_import = None
977
979 """Updates the status.
980
981 One failed part will cause the whole transfer to fail.
982
983 """
984 self.success = self.success and success
985
988 """Computes the magic value for a disk export or import.
989
990 @type base: string
991 @param base: Random seed value (can be the same for all disks of a transfer)
992 @type instance_name: string
993 @param instance_name: Name of instance
994 @type index: number
995 @param index: Disk index
996
997 """
998 h = compat.sha1_hash()
999 h.update(str(constants.RIE_VERSION))
1000 h.update(base)
1001 h.update(instance_name)
1002 h.update(str(index))
1003 return h.hexdigest()
1004
1005
1006 -def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1007 instance, all_transfers):
1008 """Transfers an instance's data from one node to another.
1009
1010 @param lu: Logical unit instance
1011 @param feedback_fn: Feedback function
1012 @type src_node: string
1013 @param src_node: Source node name
1014 @type dest_node: string
1015 @param dest_node: Destination node name
1016 @type dest_ip: string
1017 @param dest_ip: IP address of destination node
1018 @type instance: L{objects.Instance}
1019 @param instance: Instance object
1020 @type all_transfers: list of L{DiskTransfer} instances
1021 @param all_transfers: List of all disk transfers to be made
1022 @rtype: list
1023 @return: List with a boolean (True=successful, False=failed) for success for
1024 each transfer
1025
1026 """
1027
1028 compress = constants.IEC_NONE
1029
1030 logging.debug("Source node %s, destination node %s, compression '%s'",
1031 src_node, dest_node, compress)
1032
1033 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1034 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1035 src_node, None, dest_node, dest_ip)
1036 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1037 src_node, src_cbs, dest_node, dest_ip)
1038
1039 all_dtp = []
1040
1041 base_magic = utils.GenerateSecret(6)
1042
1043 ieloop = ImportExportLoop(lu)
1044 try:
1045 for idx, transfer in enumerate(all_transfers):
1046 if transfer:
1047 feedback_fn("Exporting %s from %s to %s" %
1048 (transfer.name, src_node, dest_node))
1049
1050 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1051 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1052 compress=compress, magic=magic)
1053
1054 dtp = _DiskTransferPrivate(transfer, True, opts)
1055
1056 di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1057 transfer.dest_io, transfer.dest_ioargs,
1058 timeouts, dest_cbs, private=dtp)
1059 ieloop.Add(di)
1060
1061 dtp.dest_import = di
1062 else:
1063 dtp = _DiskTransferPrivate(None, False, None)
1064
1065 all_dtp.append(dtp)
1066
1067 ieloop.Run()
1068 finally:
1069 ieloop.FinalizeAll()
1070
1071 assert len(all_dtp) == len(all_transfers)
1072 assert compat.all((dtp.src_export is None or
1073 dtp.src_export.success is not None) and
1074 (dtp.dest_import is None or
1075 dtp.dest_import.success is not None)
1076 for dtp in all_dtp), \
1077 "Not all imports/exports are finalized"
1078
1079 return [bool(dtp.success) for dtp in all_dtp]
1080
1083 - def __init__(self, feedback_fn, disk_count):
1084 """Initializes this class.
1085
1086 """
1087 ImportExportCbBase.__init__(self)
1088 self._feedback_fn = feedback_fn
1089 self._dresults = [None] * disk_count
1090
1091 @property
1093 """Returns per-disk results.
1094
1095 """
1096 return self._dresults
1097
1099 """Called when a connection has been established.
1100
1101 """
1102 (idx, _) = private
1103
1104 self._feedback_fn("Disk %s is now sending data" % idx)
1105
1107 """Called when new progress information should be reported.
1108
1109 """
1110 (idx, _) = private
1111
1112 progress = ie.progress
1113 if not progress:
1114 return
1115
1116 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1117
1119 """Called when a transfer has finished.
1120
1121 """
1122 (idx, finished_fn) = private
1123
1124 if ie.success:
1125 self._feedback_fn("Disk %s finished sending data" % idx)
1126 else:
1127 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1128 (idx, ie.final_message, ie.recent_output))
1129
1130 self._dresults[idx] = bool(ie.success)
1131
1132 if finished_fn:
1133 finished_fn()
1134
1137 - def __init__(self, lu, feedback_fn, instance):
1138 """Initializes this class.
1139
1140 @param lu: Logical unit instance
1141 @param feedback_fn: Feedback function
1142 @type instance: L{objects.Instance}
1143 @param instance: Instance object
1144
1145 """
1146 self._lu = lu
1147 self._feedback_fn = feedback_fn
1148 self._instance = instance
1149
1150 self._snap_disks = []
1151 self._removed_snaps = [False] * len(instance.disks)
1152
1154 """Creates an LVM snapshot for every disk of the instance.
1155
1156 """
1157 assert not self._snap_disks
1158
1159 instance = self._instance
1160 src_node = instance.primary_node
1161
1162 for idx, disk in enumerate(instance.disks):
1163 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1164 (idx, src_node))
1165
1166
1167
1168 result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1169 new_dev = False
1170 msg = result.fail_msg
1171 if msg:
1172 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1173 idx, src_node, msg)
1174 elif (not isinstance(result.payload, (tuple, list)) or
1175 len(result.payload) != 2):
1176 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1177 " result '%s'", idx, src_node, result.payload)
1178 else:
1179 disk_id = tuple(result.payload)
1180 disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
1181 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1182 logical_id=disk_id, physical_id=disk_id,
1183 iv_name=disk.iv_name,
1184 params=disk_params)
1185
1186 self._snap_disks.append(new_dev)
1187
1188 assert len(self._snap_disks) == len(instance.disks)
1189 assert len(self._removed_snaps) == len(instance.disks)
1190
1192 """Removes an LVM snapshot.
1193
1194 @type disk_index: number
1195 @param disk_index: Index of the snapshot to be removed
1196
1197 """
1198 disk = self._snap_disks[disk_index]
1199 if disk and not self._removed_snaps[disk_index]:
1200 src_node = self._instance.primary_node
1201
1202 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1203 (disk_index, src_node))
1204
1205 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1206 if result.fail_msg:
1207 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1208 " %s: %s", disk_index, src_node, result.fail_msg)
1209 else:
1210 self._removed_snaps[disk_index] = True
1211
1213 """Intra-cluster instance export.
1214
1215 @type dest_node: L{objects.Node}
1216 @param dest_node: Destination node
1217
1218 """
1219 instance = self._instance
1220 src_node = instance.primary_node
1221
1222 assert len(self._snap_disks) == len(instance.disks)
1223
1224 transfers = []
1225
1226 for idx, dev in enumerate(self._snap_disks):
1227 if not dev:
1228 transfers.append(None)
1229 continue
1230
1231 path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1232 dev.physical_id[1])
1233
1234 finished_fn = compat.partial(self._TransferFinished, idx)
1235
1236
1237 dt = DiskTransfer("snapshot/%s" % idx,
1238 constants.IEIO_SCRIPT, (dev, idx),
1239 constants.IEIO_FILE, (path, ),
1240 finished_fn)
1241 transfers.append(dt)
1242
1243
1244 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1245 src_node, dest_node.name,
1246 dest_node.secondary_ip,
1247 instance, transfers)
1248
1249 assert len(dresults) == len(instance.disks)
1250
1251 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1252 result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1253 self._snap_disks)
1254 msg = result.fail_msg
1255 fin_resu = not msg
1256 if msg:
1257 self._lu.LogWarning("Could not finalize export for instance %s"
1258 " on node %s: %s", instance.name, dest_node.name, msg)
1259
1260 return (fin_resu, dresults)
1261
1262 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1263 """Inter-cluster instance export.
1264
1265 @type disk_info: list
1266 @param disk_info: Per-disk destination information
1267 @type key_name: string
1268 @param key_name: Name of X509 key to use
1269 @type dest_ca_pem: string
1270 @param dest_ca_pem: Destination X509 CA in PEM format
1271 @type timeouts: L{ImportExportTimeouts}
1272 @param timeouts: Timeouts for this import
1273
1274 """
1275 instance = self._instance
1276
1277 assert len(disk_info) == len(instance.disks)
1278
1279 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1280
1281 ieloop = ImportExportLoop(self._lu)
1282 try:
1283 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1284 disk_info)):
1285
1286 ipv6 = netutils.IP6Address.IsValid(host)
1287
1288 opts = objects.ImportExportOptions(key_name=key_name,
1289 ca_pem=dest_ca_pem,
1290 magic=magic, ipv6=ipv6)
1291
1292 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1293 finished_fn = compat.partial(self._TransferFinished, idx)
1294 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1295 opts, host, port, instance, "disk%d" % idx,
1296 constants.IEIO_SCRIPT, (dev, idx),
1297 timeouts, cbs, private=(idx, finished_fn)))
1298
1299 ieloop.Run()
1300 finally:
1301 ieloop.FinalizeAll()
1302
1303 return (True, cbs.disk_results)
1304
1306 """Called once a transfer has finished.
1307
1308 @type idx: number
1309 @param idx: Disk index
1310
1311 """
1312 logging.debug("Transfer %s finished", idx)
1313 self._RemoveSnapshot(idx)
1314
1316 """Remove all snapshots.
1317
1318 """
1319 assert len(self._removed_snaps) == len(self._instance.disks)
1320 for idx in range(len(self._instance.disks)):
1321 self._RemoveSnapshot(idx)
1322
1325 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1326 external_address):
1327 """Initializes this class.
1328
1329 @type cds: string
1330 @param cds: Cluster domain secret
1331 @type x509_cert_pem: string
1332 @param x509_cert_pem: CA used for signing import key
1333 @type disk_count: number
1334 @param disk_count: Number of disks
1335 @type external_address: string
1336 @param external_address: External address of destination node
1337
1338 """
1339 ImportExportCbBase.__init__(self)
1340 self._feedback_fn = feedback_fn
1341 self._cds = cds
1342 self._x509_cert_pem = x509_cert_pem
1343 self._disk_count = disk_count
1344 self._external_address = external_address
1345
1346 self._dresults = [None] * disk_count
1347 self._daemon_port = [None] * disk_count
1348
1349 self._salt = utils.GenerateSecret(8)
1350
1351 @property
1353 """Returns per-disk results.
1354
1355 """
1356 return self._dresults
1357
1359 """Checks whether all daemons are listening.
1360
1361 If all daemons are listening, the information is sent to the client.
1362
1363 """
1364 if not compat.all(dp is not None for dp in self._daemon_port):
1365 return
1366
1367 host = self._external_address
1368
1369 disks = []
1370 for idx, (port, magic) in enumerate(self._daemon_port):
1371 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1372 idx, host, port, magic))
1373
1374 assert len(disks) == self._disk_count
1375
1376 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1377 "disks": disks,
1378 "x509_ca": self._x509_cert_pem,
1379 })
1380
1382 """Called when daemon started listening.
1383
1384 """
1385 (idx, ) = private
1386
1387 self._feedback_fn("Disk %s is now listening" % idx)
1388
1389 assert self._daemon_port[idx] is None
1390
1391 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1392
1393 self._CheckAllListening()
1394
1396 """Called when a connection has been established.
1397
1398 """
1399 (idx, ) = private
1400
1401 self._feedback_fn("Disk %s is now receiving data" % idx)
1402
1404 """Called when a transfer has finished.
1405
1406 """
1407 (idx, ) = private
1408
1409
1410 self._daemon_port[idx] = None
1411
1412 if ie.success:
1413 self._feedback_fn("Disk %s finished receiving data" % idx)
1414 else:
1415 self._feedback_fn(("Disk %s failed to receive data: %s"
1416 " (recent output: %s)") %
1417 (idx, ie.final_message, ie.recent_output))
1418
1419 self._dresults[idx] = bool(ie.success)
1420
1421
1422 -def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1423 cds, timeouts):
1424 """Imports an instance from another cluster.
1425
1426 @param lu: Logical unit instance
1427 @param feedback_fn: Feedback function
1428 @type instance: L{objects.Instance}
1429 @param instance: Instance object
1430 @type pnode: L{objects.Node}
1431 @param pnode: Primary node of instance as an object
1432 @type source_x509_ca: OpenSSL.crypto.X509
1433 @param source_x509_ca: Import source's X509 CA
1434 @type cds: string
1435 @param cds: Cluster domain secret
1436 @type timeouts: L{ImportExportTimeouts}
1437 @param timeouts: Timeouts for this import
1438
1439 """
1440 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1441 source_x509_ca)
1442
1443 magic_base = utils.GenerateSecret(6)
1444
1445
1446 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1447
1448
1449 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1450 constants.RIE_CERT_VALIDITY)
1451 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1452
1453 (x509_key_name, x509_cert_pem) = result.payload
1454 try:
1455
1456 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1457 x509_cert_pem)
1458
1459
1460 signed_x509_cert_pem = \
1461 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1462
1463 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1464 len(instance.disks), pnode.primary_ip)
1465
1466 ieloop = ImportExportLoop(lu)
1467 try:
1468 for idx, dev in enumerate(instance.disks):
1469 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1470
1471
1472 opts = objects.ImportExportOptions(key_name=x509_key_name,
1473 ca_pem=source_ca_pem,
1474 magic=magic, ipv6=ipv6)
1475
1476 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1477 "disk%d" % idx,
1478 constants.IEIO_SCRIPT, (dev, idx),
1479 timeouts, cbs, private=(idx, )))
1480
1481 ieloop.Run()
1482 finally:
1483 ieloop.FinalizeAll()
1484 finally:
1485
1486 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1487 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1488
1489 return cbs.disk_results
1490
1493 """Returns the handshake message for a RIE protocol version.
1494
1495 @type version: number
1496
1497 """
1498 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1499
1511
1514 """Checks the handshake of a remote import/export.
1515
1516 @type cds: string
1517 @param cds: Cluster domain secret
1518 @type handshake: sequence
1519 @param handshake: Handshake sent by remote peer
1520
1521 """
1522 try:
1523 (version, hmac_digest, hmac_salt) = handshake
1524 except (TypeError, ValueError), err:
1525 return "Invalid data: %s" % err
1526
1527 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1528 hmac_digest, salt=hmac_salt):
1529 return "Hash didn't match, clusters don't share the same domain secret"
1530
1531 if version != constants.RIE_VERSION:
1532 return ("Clusters don't have the same remote import/export protocol"
1533 " (local=%s, remote=%s)" %
1534 (constants.RIE_VERSION, version))
1535
1536 return None
1537
1540 """Returns the hashed text for import/export disk information.
1541
1542 @type disk_index: number
1543 @param disk_index: Index of disk (included in hash)
1544 @type host: string
1545 @param host: Hostname
1546 @type port: number
1547 @param port: Daemon port
1548 @type magic: string
1549 @param magic: Magic value
1550
1551 """
1552 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1553
1556 """Verifies received disk information for an export.
1557
1558 @type cds: string
1559 @param cds: Cluster domain secret
1560 @type disk_index: number
1561 @param disk_index: Index of disk (included in hash)
1562 @type disk_info: sequence
1563 @param disk_info: Disk information sent by remote peer
1564
1565 """
1566 try:
1567 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1568 except (TypeError, ValueError), err:
1569 raise errors.GenericError("Invalid data: %s" % err)
1570
1571 if not (host and port and magic):
1572 raise errors.GenericError("Missing destination host, port or magic")
1573
1574 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1575
1576 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1577 raise errors.GenericError("HMAC is wrong")
1578
1579 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1580 destination = host
1581 else:
1582 destination = netutils.Hostname.GetNormalizedName(host)
1583
1584 return (destination,
1585 utils.ValidateServiceName(port),
1586 magic)
1587
1590 """Computes the signed disk information for a remote import.
1591
1592 @type cds: string
1593 @param cds: Cluster domain secret
1594 @type salt: string
1595 @param salt: HMAC salt
1596 @type disk_index: number
1597 @param disk_index: Index of disk (included in hash)
1598 @type host: string
1599 @param host: Hostname
1600 @type port: number
1601 @param port: Daemon port
1602 @type magic: string
1603 @param magic: Magic value
1604
1605 """
1606 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1607 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1608 return (host, port, magic, hmac_digest, salt)
1609
1612 """Calculate instance policy for group.
1613
1614 """
1615 return cluster.SimpleFillIPolicy(group.ipolicy)
1616
1641