Package ganeti :: Package masterd :: Module instance
[hide private]
[frames] | no frames]

Source Code for Module ganeti.masterd.instance

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2010, 2011 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Instance-related functions and classes for masterd. 
  23   
  24  """ 
  25   
  26  import logging 
  27  import time 
  28  import OpenSSL 
  29   
  30  from ganeti import constants 
  31  from ganeti import errors 
  32  from ganeti import compat 
  33  from ganeti import utils 
  34  from ganeti import objects 
  35  from ganeti import netutils 
36 37 38 -class _ImportExportError(Exception):
39 """Local exception to report import/export errors. 40 41 """
42
43 44 -class ImportExportTimeouts(object):
45 #: Time until daemon starts writing status file 46 DEFAULT_READY_TIMEOUT = 10 47 48 #: Length of time until errors cause hard failure 49 DEFAULT_ERROR_TIMEOUT = 10 50 51 #: Time after which daemon must be listening 52 DEFAULT_LISTEN_TIMEOUT = 10 53 54 #: Progress update interval 55 DEFAULT_PROGRESS_INTERVAL = 60 56 57 __slots__ = [ 58 "error", 59 "ready", 60 "listen", 61 "connect", 62 "progress", 63 ] 64
65 - def __init__(self, connect, 66 listen=DEFAULT_LISTEN_TIMEOUT, 67 error=DEFAULT_ERROR_TIMEOUT, 68 ready=DEFAULT_READY_TIMEOUT, 69 progress=DEFAULT_PROGRESS_INTERVAL):
70 """Initializes this class. 71 72 @type connect: number 73 @param connect: Timeout for establishing connection 74 @type listen: number 75 @param listen: Timeout for starting to listen for connections 76 @type error: number 77 @param error: Length of time until errors cause hard failure 78 @type ready: number 79 @param ready: Timeout for daemon to become ready 80 @type progress: number 81 @param progress: Progress update interval 82 83 """ 84 self.error = error 85 self.ready = ready 86 self.listen = listen 87 self.connect = connect 88 self.progress = progress
89
90 91 -class ImportExportCbBase(object):
92 """Callbacks for disk import/export. 93 94 """
95 - def ReportListening(self, ie, private):
96 """Called when daemon started listening. 97 98 @type ie: Subclass of L{_DiskImportExportBase} 99 @param ie: Import/export object 100 @param private: Private data passed to import/export object 101 102 """
103
104 - def ReportConnected(self, ie, private):
105 """Called when a connection has been established. 106 107 @type ie: Subclass of L{_DiskImportExportBase} 108 @param ie: Import/export object 109 @param private: Private data passed to import/export object 110 111 """
112
113 - def ReportProgress(self, ie, private):
114 """Called when new progress information should be reported. 115 116 @type ie: Subclass of L{_DiskImportExportBase} 117 @param ie: Import/export object 118 @param private: Private data passed to import/export object 119 120 """
121
122 - def ReportFinished(self, ie, private):
123 """Called when a transfer has finished. 124 125 @type ie: Subclass of L{_DiskImportExportBase} 126 @param ie: Import/export object 127 @param private: Private data passed to import/export object 128 129 """
130
131 132 -def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
133 """Checks whether a timeout has expired. 134 135 """ 136 return _time_fn() > (epoch + timeout)
137
138 139 -class _DiskImportExportBase(object):
140 MODE_TEXT = None 141
142 - def __init__(self, lu, node_name, opts, 143 instance, timeouts, cbs, private=None):
144 """Initializes this class. 145 146 @param lu: Logical unit instance 147 @type node_name: string 148 @param node_name: Node name for import 149 @type opts: L{objects.ImportExportOptions} 150 @param opts: Import/export daemon options 151 @type instance: L{objects.Instance} 152 @param instance: Instance object 153 @type timeouts: L{ImportExportTimeouts} 154 @param timeouts: Timeouts for this import 155 @type cbs: L{ImportExportCbBase} 156 @param cbs: Callbacks 157 @param private: Private data for callback functions 158 159 """ 160 assert self.MODE_TEXT 161 162 self._lu = lu 163 self.node_name = node_name 164 self._opts = opts.Copy() 165 self._instance = instance 166 self._timeouts = timeouts 167 self._cbs = cbs 168 self._private = private 169 170 # Set master daemon's timeout in options for import/export daemon 171 assert self._opts.connect_timeout is None 172 self._opts.connect_timeout = timeouts.connect 173 174 # Parent loop 175 self._loop = None 176 177 # Timestamps 178 self._ts_begin = None 179 self._ts_connected = None 180 self._ts_finished = None 181 self._ts_cleanup = None 182 self._ts_last_progress = None 183 self._ts_last_error = None 184 185 # Transfer status 186 self.success = None 187 self.final_message = None 188 189 # Daemon status 190 self._daemon_name = None 191 self._daemon = None
192 193 @property
194 - def recent_output(self):
195 """Returns the most recent output from the daemon. 196 197 """ 198 if self._daemon: 199 return "\n".join(self._daemon.recent_output) 200 201 return None
202 203 @property
204 - def progress(self):
205 """Returns transfer progress information. 206 207 """ 208 if not self._daemon: 209 return None 210 211 return (self._daemon.progress_mbytes, 212 self._daemon.progress_throughput, 213 self._daemon.progress_percent, 214 self._daemon.progress_eta)
215 216 @property
217 - def magic(self):
218 """Returns the magic value for this import/export. 219 220 """ 221 return self._opts.magic
222 223 @property
224 - def active(self):
225 """Determines whether this transport is still active. 226 227 """ 228 return self.success is None
229 230 @property
231 - def loop(self):
232 """Returns parent loop. 233 234 @rtype: L{ImportExportLoop} 235 236 """ 237 return self._loop
238
239 - def SetLoop(self, loop):
240 """Sets the parent loop. 241 242 @type loop: L{ImportExportLoop} 243 244 """ 245 if self._loop: 246 raise errors.ProgrammerError("Loop can only be set once") 247 248 self._loop = loop
249
250 - def _StartDaemon(self):
251 """Starts the import/export daemon. 252 253 """ 254 raise NotImplementedError()
255
256 - def CheckDaemon(self):
257 """Checks whether daemon has been started and if not, starts it. 258 259 @rtype: string 260 @return: Daemon name 261 262 """ 263 assert self._ts_cleanup is None 264 265 if self._daemon_name is None: 266 assert self._ts_begin is None 267 268 result = self._StartDaemon() 269 if result.fail_msg: 270 raise _ImportExportError("Failed to start %s on %s: %s" % 271 (self.MODE_TEXT, self.node_name, 272 result.fail_msg)) 273 274 daemon_name = result.payload 275 276 logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name, 277 self.node_name) 278 279 self._ts_begin = time.time() 280 self._daemon_name = daemon_name 281 282 return self._daemon_name
283
284 - def GetDaemonName(self):
285 """Returns the daemon name. 286 287 """ 288 assert self._daemon_name, "Daemon has not been started" 289 assert self._ts_cleanup is None 290 return self._daemon_name
291
292 - def Abort(self):
293 """Sends SIGTERM to import/export daemon (if still active). 294 295 """ 296 if self._daemon_name: 297 self._lu.LogWarning("Aborting %s %r on %s", 298 self.MODE_TEXT, self._daemon_name, self.node_name) 299 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name) 300 if result.fail_msg: 301 self._lu.LogWarning("Failed to abort %s %r on %s: %s", 302 self.MODE_TEXT, self._daemon_name, 303 self.node_name, result.fail_msg) 304 return False 305 306 return True
307
308 - def _SetDaemonData(self, data):
309 """Internal function for updating status daemon data. 310 311 @type data: L{objects.ImportExportStatus} 312 @param data: Daemon status data 313 314 """ 315 assert self._ts_begin is not None 316 317 if not data: 318 if _TimeoutExpired(self._ts_begin, self._timeouts.ready): 319 raise _ImportExportError("Didn't become ready after %s seconds" % 320 self._timeouts.ready) 321 322 return False 323 324 self._daemon = data 325 326 return True
327
328 - def SetDaemonData(self, success, data):
329 """Updates daemon status data. 330 331 @type success: bool 332 @param success: Whether fetching data was successful or not 333 @type data: L{objects.ImportExportStatus} 334 @param data: Daemon status data 335 336 """ 337 if not success: 338 if self._ts_last_error is None: 339 self._ts_last_error = time.time() 340 341 elif _TimeoutExpired(self._ts_last_error, self._timeouts.error): 342 raise _ImportExportError("Too many errors while updating data") 343 344 return False 345 346 self._ts_last_error = None 347 348 return self._SetDaemonData(data)
349
350 - def CheckListening(self):
351 """Checks whether the daemon is listening. 352 353 """ 354 raise NotImplementedError()
355
356 - def _GetConnectedCheckEpoch(self):
357 """Returns timeout to calculate connect timeout. 358 359 """ 360 raise NotImplementedError()
361
362 - def CheckConnected(self):
363 """Checks whether the daemon is connected. 364 365 @rtype: bool 366 @return: Whether the daemon is connected 367 368 """ 369 assert self._daemon, "Daemon status missing" 370 371 if self._ts_connected is not None: 372 return True 373 374 if self._daemon.connected: 375 self._ts_connected = time.time() 376 377 # TODO: Log remote peer 378 logging.debug("%s %r on %s is now connected", 379 self.MODE_TEXT, self._daemon_name, self.node_name) 380 381 self._cbs.ReportConnected(self, self._private) 382 383 return True 384 385 if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect): 386 raise _ImportExportError("Not connected after %s seconds" % 387 self._timeouts.connect) 388 389 return False
390
391 - def _CheckProgress(self):
392 """Checks whether a progress update should be reported. 393 394 """ 395 if ((self._ts_last_progress is None or 396 _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and 397 self._daemon and 398 self._daemon.progress_mbytes is not None and 399 self._daemon.progress_throughput is not None): 400 self._cbs.ReportProgress(self, self._private) 401 self._ts_last_progress = time.time()
402
403 - def CheckFinished(self):
404 """Checks whether the daemon exited. 405 406 @rtype: bool 407 @return: Whether the transfer is finished 408 409 """ 410 assert self._daemon, "Daemon status missing" 411 412 if self._ts_finished: 413 return True 414 415 if self._daemon.exit_status is None: 416 # TODO: Adjust delay for ETA expiring soon 417 self._CheckProgress() 418 return False 419 420 self._ts_finished = time.time() 421 422 self._ReportFinished(self._daemon.exit_status == 0, 423 self._daemon.error_message) 424 425 return True
426
427 - def _ReportFinished(self, success, message):
428 """Transfer is finished or daemon exited. 429 430 @type success: bool 431 @param success: Whether the transfer was successful 432 @type message: string 433 @param message: Error message 434 435 """ 436 assert self.success is None 437 438 self.success = success 439 self.final_message = message 440 441 if success: 442 logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name, 443 self.node_name) 444 elif self._daemon_name: 445 self._lu.LogWarning("%s %r on %s failed: %s", 446 self.MODE_TEXT, self._daemon_name, self.node_name, 447 message) 448 else: 449 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT, 450 self.node_name, message) 451 452 self._cbs.ReportFinished(self, self._private)
453
454 - def _Finalize(self):
455 """Makes the RPC call to finalize this import/export. 456 457 """ 458 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
459
460 - def Finalize(self, error=None):
461 """Finalizes this import/export. 462 463 """ 464 if self._daemon_name: 465 logging.info("Finalizing %s %r on %s", 466 self.MODE_TEXT, self._daemon_name, self.node_name) 467 468 result = self._Finalize() 469 if result.fail_msg: 470 self._lu.LogWarning("Failed to finalize %s %r on %s: %s", 471 self.MODE_TEXT, self._daemon_name, 472 self.node_name, result.fail_msg) 473 return False 474 475 # Daemon is no longer running 476 self._daemon_name = None 477 self._ts_cleanup = time.time() 478 479 if error: 480 self._ReportFinished(False, error) 481 482 return True
483
484 485 -class DiskImport(_DiskImportExportBase):
486 MODE_TEXT = "import" 487
488 - def __init__(self, lu, node_name, opts, instance, 489 dest, dest_args, timeouts, cbs, private=None):
490 """Initializes this class. 491 492 @param lu: Logical unit instance 493 @type node_name: string 494 @param node_name: Node name for import 495 @type opts: L{objects.ImportExportOptions} 496 @param opts: Import/export daemon options 497 @type instance: L{objects.Instance} 498 @param instance: Instance object 499 @param dest: I/O destination 500 @param dest_args: I/O arguments 501 @type timeouts: L{ImportExportTimeouts} 502 @param timeouts: Timeouts for this import 503 @type cbs: L{ImportExportCbBase} 504 @param cbs: Callbacks 505 @param private: Private data for callback functions 506 507 """ 508 _DiskImportExportBase.__init__(self, lu, node_name, opts, 509 instance, timeouts, cbs, private) 510 self._dest = dest 511 self._dest_args = dest_args 512 513 # Timestamps 514 self._ts_listening = None
515 516 @property
517 - def listen_port(self):
518 """Returns the port the daemon is listening on. 519 520 """ 521 if self._daemon: 522 return self._daemon.listen_port 523 524 return None
525
526 - def _StartDaemon(self):
527 """Starts the import daemon. 528 529 """ 530 return self._lu.rpc.call_import_start(self.node_name, self._opts, 531 self._instance, 532 self._dest, self._dest_args)
533
534 - def CheckListening(self):
535 """Checks whether the daemon is listening. 536 537 @rtype: bool 538 @return: Whether the daemon is listening 539 540 """ 541 assert self._daemon, "Daemon status missing" 542 543 if self._ts_listening is not None: 544 return True 545 546 port = self._daemon.listen_port 547 if port is not None: 548 self._ts_listening = time.time() 549 550 logging.debug("Import %r on %s is now listening on port %s", 551 self._daemon_name, self.node_name, port) 552 553 self._cbs.ReportListening(self, self._private) 554 555 return True 556 557 if _TimeoutExpired(self._ts_begin, self._timeouts.listen): 558 raise _ImportExportError("Not listening after %s seconds" % 559 self._timeouts.listen) 560 561 return False
562
563 - def _GetConnectedCheckEpoch(self):
564 """Returns the time since we started listening. 565 566 """ 567 assert self._ts_listening is not None, \ 568 ("Checking whether an import is connected is only useful" 569 " once it's been listening") 570 571 return self._ts_listening
572
573 574 -class DiskExport(_DiskImportExportBase):
575 MODE_TEXT = "export" 576
577 - def __init__(self, lu, node_name, opts, 578 dest_host, dest_port, instance, source, source_args, 579 timeouts, cbs, private=None):
580 """Initializes this class. 581 582 @param lu: Logical unit instance 583 @type node_name: string 584 @param node_name: Node name for import 585 @type opts: L{objects.ImportExportOptions} 586 @param opts: Import/export daemon options 587 @type dest_host: string 588 @param dest_host: Destination host name or IP address 589 @type dest_port: number 590 @param dest_port: Destination port number 591 @type instance: L{objects.Instance} 592 @param instance: Instance object 593 @param source: I/O source 594 @param source_args: I/O source 595 @type timeouts: L{ImportExportTimeouts} 596 @param timeouts: Timeouts for this import 597 @type cbs: L{ImportExportCbBase} 598 @param cbs: Callbacks 599 @param private: Private data for callback functions 600 601 """ 602 _DiskImportExportBase.__init__(self, lu, node_name, opts, 603 instance, timeouts, cbs, private) 604 self._dest_host = dest_host 605 self._dest_port = dest_port 606 self._source = source 607 self._source_args = source_args
608
609 - def _StartDaemon(self):
610 """Starts the export daemon. 611 612 """ 613 return self._lu.rpc.call_export_start(self.node_name, self._opts, 614 self._dest_host, self._dest_port, 615 self._instance, self._source, 616 self._source_args)
617
618 - def CheckListening(self):
619 """Checks whether the daemon is listening. 620 621 """ 622 # Only an import can be listening 623 return True
624
625 - def _GetConnectedCheckEpoch(self):
626 """Returns the time since the daemon started. 627 628 """ 629 assert self._ts_begin is not None 630 631 return self._ts_begin
632
633 634 -def FormatProgress(progress):
635 """Formats progress information for user consumption 636 637 """ 638 (mbytes, throughput, percent, eta) = progress 639 640 parts = [ 641 utils.FormatUnit(mbytes, "h"), 642 643 # Not using FormatUnit as it doesn't support kilobytes 644 "%0.1f MiB/s" % throughput, 645 ] 646 647 if percent is not None: 648 parts.append("%d%%" % percent) 649 650 if eta is not None: 651 parts.append("ETA %s" % utils.FormatSeconds(eta)) 652 653 return utils.CommaJoin(parts)
654
655 656 -class ImportExportLoop:
657 MIN_DELAY = 1.0 658 MAX_DELAY = 20.0 659
660 - def __init__(self, lu):
661 """Initializes this class. 662 663 """ 664 self._lu = lu 665 self._queue = [] 666 self._pending_add = []
667
668 - def Add(self, diskie):
669 """Adds an import/export object to the loop. 670 671 @type diskie: Subclass of L{_DiskImportExportBase} 672 @param diskie: Import/export object 673 674 """ 675 assert diskie not in self._pending_add 676 assert diskie.loop is None 677 678 diskie.SetLoop(self) 679 680 # Adding new objects to a staging list is necessary, otherwise the main 681 # loop gets confused if callbacks modify the queue while the main loop is 682 # iterating over it. 683 self._pending_add.append(diskie)
684 685 @staticmethod
686 - def _CollectDaemonStatus(lu, daemons):
687 """Collects the status for all import/export daemons. 688 689 """ 690 daemon_status = {} 691 692 for node_name, names in daemons.iteritems(): 693 result = lu.rpc.call_impexp_status(node_name, names) 694 if result.fail_msg: 695 lu.LogWarning("Failed to get daemon status on %s: %s", 696 node_name, result.fail_msg) 697 continue 698 699 assert len(names) == len(result.payload) 700 701 daemon_status[node_name] = dict(zip(names, result.payload)) 702 703 return daemon_status
704 705 @staticmethod
706 - def _GetActiveDaemonNames(queue):
707 """Gets the names of all active daemons. 708 709 """ 710 result = {} 711 for diskie in queue: 712 if not diskie.active: 713 continue 714 715 try: 716 # Start daemon if necessary 717 daemon_name = diskie.CheckDaemon() 718 except _ImportExportError, err: 719 logging.exception("%s failed", diskie.MODE_TEXT) 720 diskie.Finalize(error=str(err)) 721 continue 722 723 result.setdefault(diskie.node_name, []).append(daemon_name) 724 725 assert len(queue) >= len(result) 726 assert len(queue) >= sum([len(names) for names in result.itervalues()]) 727 728 logging.debug("daemons=%r", result) 729 730 return result
731
732 - def _AddPendingToQueue(self):
733 """Adds all pending import/export objects to the internal queue. 734 735 """ 736 assert compat.all(diskie not in self._queue and diskie.loop == self 737 for diskie in self._pending_add) 738 739 self._queue.extend(self._pending_add) 740 741 del self._pending_add[:]
742
743 - def Run(self):
744 """Utility main loop. 745 746 """ 747 while True: 748 self._AddPendingToQueue() 749 750 # Collect all active daemon names 751 daemons = self._GetActiveDaemonNames(self._queue) 752 if not daemons: 753 break 754 755 # Collection daemon status data 756 data = self._CollectDaemonStatus(self._lu, daemons) 757 758 # Use data 759 delay = self.MAX_DELAY 760 for diskie in self._queue: 761 if not diskie.active: 762 continue 763 764 try: 765 try: 766 all_daemon_data = data[diskie.node_name] 767 except KeyError: 768 result = diskie.SetDaemonData(False, None) 769 else: 770 result = \ 771 diskie.SetDaemonData(True, 772 all_daemon_data[diskie.GetDaemonName()]) 773 774 if not result: 775 # Daemon not yet ready, retry soon 776 delay = min(3.0, delay) 777 continue 778 779 if diskie.CheckFinished(): 780 # Transfer finished 781 diskie.Finalize() 782 continue 783 784 # Normal case: check again in 5 seconds 785 delay = min(5.0, delay) 786 787 if not diskie.CheckListening(): 788 # Not yet listening, retry soon 789 delay = min(1.0, delay) 790 continue 791 792 if not diskie.CheckConnected(): 793 # Not yet connected, retry soon 794 delay = min(1.0, delay) 795 continue 796 797 except _ImportExportError, err: 798 logging.exception("%s failed", diskie.MODE_TEXT) 799 diskie.Finalize(error=str(err)) 800 801 if not compat.any(diskie.active for diskie in self._queue): 802 break 803 804 # Wait a bit 805 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay)) 806 logging.debug("Waiting for %ss", delay) 807 time.sleep(delay)
808
809 - def FinalizeAll(self):
810 """Finalizes all pending transfers. 811 812 """ 813 success = True 814 815 for diskie in self._queue: 816 success = diskie.Finalize() and success 817 818 return success
819
820 821 -class _TransferInstCbBase(ImportExportCbBase):
822 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs, 823 dest_node, dest_ip):
824 """Initializes this class. 825 826 """ 827 ImportExportCbBase.__init__(self) 828 829 self.lu = lu 830 self.feedback_fn = feedback_fn 831 self.instance = instance 832 self.timeouts = timeouts 833 self.src_node = src_node 834 self.src_cbs = src_cbs 835 self.dest_node = dest_node 836 self.dest_ip = dest_ip
837
838 839 -class _TransferInstSourceCb(_TransferInstCbBase):
840 - def ReportConnected(self, ie, dtp):
841 """Called when a connection has been established. 842 843 """ 844 assert self.src_cbs is None 845 assert dtp.src_export == ie 846 assert dtp.dest_import 847 848 self.feedback_fn("%s is sending data on %s" % 849 (dtp.data.name, ie.node_name))
850
851 - def ReportProgress(self, ie, dtp):
852 """Called when new progress information should be reported. 853 854 """ 855 progress = ie.progress 856 if not progress: 857 return 858 859 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
860
861 - def ReportFinished(self, ie, dtp):
862 """Called when a transfer has finished. 863 864 """ 865 assert self.src_cbs is None 866 assert dtp.src_export == ie 867 assert dtp.dest_import 868 869 if ie.success: 870 self.feedback_fn("%s finished sending data" % dtp.data.name) 871 else: 872 self.feedback_fn("%s failed to send data: %s (recent output: %s)" % 873 (dtp.data.name, ie.final_message, ie.recent_output)) 874 875 dtp.RecordResult(ie.success) 876 877 cb = dtp.data.finished_fn 878 if cb: 879 cb() 880 881 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 882 # give the daemon a moment to sort things out 883 if dtp.dest_import and not ie.success: 884 dtp.dest_import.Abort()
885
886 887 -class _TransferInstDestCb(_TransferInstCbBase):
888 - def ReportListening(self, ie, dtp):
889 """Called when daemon started listening. 890 891 """ 892 assert self.src_cbs 893 assert dtp.src_export is None 894 assert dtp.dest_import 895 assert dtp.export_opts 896 897 self.feedback_fn("%s is now listening, starting export" % dtp.data.name) 898 899 # Start export on source node 900 de = DiskExport(self.lu, self.src_node, dtp.export_opts, 901 self.dest_ip, ie.listen_port, 902 self.instance, dtp.data.src_io, dtp.data.src_ioargs, 903 self.timeouts, self.src_cbs, private=dtp) 904 ie.loop.Add(de) 905 906 dtp.src_export = de
907
908 - def ReportConnected(self, ie, dtp):
909 """Called when a connection has been established. 910 911 """ 912 self.feedback_fn("%s is receiving data on %s" % 913 (dtp.data.name, self.dest_node))
914
915 - def ReportFinished(self, ie, dtp):
916 """Called when a transfer has finished. 917 918 """ 919 if ie.success: 920 self.feedback_fn("%s finished receiving data" % dtp.data.name) 921 else: 922 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" % 923 (dtp.data.name, ie.final_message, ie.recent_output)) 924 925 dtp.RecordResult(ie.success) 926 927 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 928 # give the daemon a moment to sort things out 929 if dtp.src_export and not ie.success: 930 dtp.src_export.Abort()
931
932 933 -class DiskTransfer(object):
934 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs, 935 finished_fn):
936 """Initializes this class. 937 938 @type name: string 939 @param name: User-visible name for this transfer (e.g. "disk/0") 940 @param src_io: Source I/O type 941 @param src_ioargs: Source I/O arguments 942 @param dest_io: Destination I/O type 943 @param dest_ioargs: Destination I/O arguments 944 @type finished_fn: callable 945 @param finished_fn: Function called once transfer has finished 946 947 """ 948 self.name = name 949 950 self.src_io = src_io 951 self.src_ioargs = src_ioargs 952 953 self.dest_io = dest_io 954 self.dest_ioargs = dest_ioargs 955 956 self.finished_fn = finished_fn
957
958 959 -class _DiskTransferPrivate(object):
960 - def __init__(self, data, success, export_opts):
961 """Initializes this class. 962 963 @type data: L{DiskTransfer} 964 @type success: bool 965 966 """ 967 self.data = data 968 self.success = success 969 self.export_opts = export_opts 970 971 self.src_export = None 972 self.dest_import = None
973
974 - def RecordResult(self, success):
975 """Updates the status. 976 977 One failed part will cause the whole transfer to fail. 978 979 """ 980 self.success = self.success and success
981
982 983 -def _GetInstDiskMagic(base, instance_name, index):
984 """Computes the magic value for a disk export or import. 985 986 @type base: string 987 @param base: Random seed value (can be the same for all disks of a transfer) 988 @type instance_name: string 989 @param instance_name: Name of instance 990 @type index: number 991 @param index: Disk index 992 993 """ 994 h = compat.sha1_hash() 995 h.update(str(constants.RIE_VERSION)) 996 h.update(base) 997 h.update(instance_name) 998 h.update(str(index)) 999 return h.hexdigest()
1000
1001 1002 -def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, 1003 instance, all_transfers):
1004 """Transfers an instance's data from one node to another. 1005 1006 @param lu: Logical unit instance 1007 @param feedback_fn: Feedback function 1008 @type src_node: string 1009 @param src_node: Source node name 1010 @type dest_node: string 1011 @param dest_node: Destination node name 1012 @type dest_ip: string 1013 @param dest_ip: IP address of destination node 1014 @type instance: L{objects.Instance} 1015 @param instance: Instance object 1016 @type all_transfers: list of L{DiskTransfer} instances 1017 @param all_transfers: List of all disk transfers to be made 1018 @rtype: list 1019 @return: List with a boolean (True=successful, False=failed) for success for 1020 each transfer 1021 1022 """ 1023 # Disable compression for all moves as these are all within the same cluster 1024 compress = constants.IEC_NONE 1025 1026 logging.debug("Source node %s, destination node %s, compression '%s'", 1027 src_node, dest_node, compress) 1028 1029 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) 1030 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, 1031 src_node, None, dest_node, dest_ip) 1032 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts, 1033 src_node, src_cbs, dest_node, dest_ip) 1034 1035 all_dtp = [] 1036 1037 base_magic = utils.GenerateSecret(6) 1038 1039 ieloop = ImportExportLoop(lu) 1040 try: 1041 for idx, transfer in enumerate(all_transfers): 1042 if transfer: 1043 feedback_fn("Exporting %s from %s to %s" % 1044 (transfer.name, src_node, dest_node)) 1045 1046 magic = _GetInstDiskMagic(base_magic, instance.name, idx) 1047 opts = objects.ImportExportOptions(key_name=None, ca_pem=None, 1048 compress=compress, magic=magic) 1049 1050 dtp = _DiskTransferPrivate(transfer, True, opts) 1051 1052 di = DiskImport(lu, dest_node, opts, instance, 1053 transfer.dest_io, transfer.dest_ioargs, 1054 timeouts, dest_cbs, private=dtp) 1055 ieloop.Add(di) 1056 1057 dtp.dest_import = di 1058 else: 1059 dtp = _DiskTransferPrivate(None, False, None) 1060 1061 all_dtp.append(dtp) 1062 1063 ieloop.Run() 1064 finally: 1065 ieloop.FinalizeAll() 1066 1067 assert len(all_dtp) == len(all_transfers) 1068 assert compat.all((dtp.src_export is None or 1069 dtp.src_export.success is not None) and 1070 (dtp.dest_import is None or 1071 dtp.dest_import.success is not None) 1072 for dtp in all_dtp), \ 1073 "Not all imports/exports are finalized" 1074 1075 return [bool(dtp.success) for dtp in all_dtp]
1076
1077 1078 -class _RemoteExportCb(ImportExportCbBase):
1079 - def __init__(self, feedback_fn, disk_count):
1080 """Initializes this class. 1081 1082 """ 1083 ImportExportCbBase.__init__(self) 1084 self._feedback_fn = feedback_fn 1085 self._dresults = [None] * disk_count
1086 1087 @property
1088 - def disk_results(self):
1089 """Returns per-disk results. 1090 1091 """ 1092 return self._dresults
1093
1094 - def ReportConnected(self, ie, private):
1095 """Called when a connection has been established. 1096 1097 """ 1098 (idx, _) = private 1099 1100 self._feedback_fn("Disk %s is now sending data" % idx)
1101
1102 - def ReportProgress(self, ie, private):
1103 """Called when new progress information should be reported. 1104 1105 """ 1106 (idx, _) = private 1107 1108 progress = ie.progress 1109 if not progress: 1110 return 1111 1112 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1113
1114 - def ReportFinished(self, ie, private):
1115 """Called when a transfer has finished. 1116 1117 """ 1118 (idx, finished_fn) = private 1119 1120 if ie.success: 1121 self._feedback_fn("Disk %s finished sending data" % idx) 1122 else: 1123 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" % 1124 (idx, ie.final_message, ie.recent_output)) 1125 1126 self._dresults[idx] = bool(ie.success) 1127 1128 if finished_fn: 1129 finished_fn()
1130
1131 1132 -class ExportInstanceHelper:
1133 - def __init__(self, lu, feedback_fn, instance):
1134 """Initializes this class. 1135 1136 @param lu: Logical unit instance 1137 @param feedback_fn: Feedback function 1138 @type instance: L{objects.Instance} 1139 @param instance: Instance object 1140 1141 """ 1142 self._lu = lu 1143 self._feedback_fn = feedback_fn 1144 self._instance = instance 1145 1146 self._snap_disks = [] 1147 self._removed_snaps = [False] * len(instance.disks)
1148
1149 - def CreateSnapshots(self):
1150 """Creates an LVM snapshot for every disk of the instance. 1151 1152 """ 1153 assert not self._snap_disks 1154 1155 instance = self._instance 1156 src_node = instance.primary_node 1157 1158 for idx, disk in enumerate(instance.disks): 1159 self._feedback_fn("Creating a snapshot of disk/%s on node %s" % 1160 (idx, src_node)) 1161 1162 # result.payload will be a snapshot of an lvm leaf of the one we 1163 # passed 1164 result = self._lu.rpc.call_blockdev_snapshot(src_node, disk) 1165 new_dev = False 1166 msg = result.fail_msg 1167 if msg: 1168 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s", 1169 idx, src_node, msg) 1170 elif (not isinstance(result.payload, (tuple, list)) or 1171 len(result.payload) != 2): 1172 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid" 1173 " result '%s'", idx, src_node, result.payload) 1174 else: 1175 disk_id = tuple(result.payload) 1176 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, 1177 logical_id=disk_id, physical_id=disk_id, 1178 iv_name=disk.iv_name) 1179 1180 self._snap_disks.append(new_dev) 1181 1182 assert len(self._snap_disks) == len(instance.disks) 1183 assert len(self._removed_snaps) == len(instance.disks)
1184
1185 - def _RemoveSnapshot(self, disk_index):
1186 """Removes an LVM snapshot. 1187 1188 @type disk_index: number 1189 @param disk_index: Index of the snapshot to be removed 1190 1191 """ 1192 disk = self._snap_disks[disk_index] 1193 if disk and not self._removed_snaps[disk_index]: 1194 src_node = self._instance.primary_node 1195 1196 self._feedback_fn("Removing snapshot of disk/%s on node %s" % 1197 (disk_index, src_node)) 1198 1199 result = self._lu.rpc.call_blockdev_remove(src_node, disk) 1200 if result.fail_msg: 1201 self._lu.LogWarning("Could not remove snapshot for disk/%d from node" 1202 " %s: %s", disk_index, src_node, result.fail_msg) 1203 else: 1204 self._removed_snaps[disk_index] = True
1205
1206 - def LocalExport(self, dest_node):
1207 """Intra-cluster instance export. 1208 1209 @type dest_node: L{objects.Node} 1210 @param dest_node: Destination node 1211 1212 """ 1213 instance = self._instance 1214 src_node = instance.primary_node 1215 1216 assert len(self._snap_disks) == len(instance.disks) 1217 1218 transfers = [] 1219 1220 for idx, dev in enumerate(self._snap_disks): 1221 if not dev: 1222 transfers.append(None) 1223 continue 1224 1225 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name, 1226 dev.physical_id[1]) 1227 1228 finished_fn = compat.partial(self._TransferFinished, idx) 1229 1230 # FIXME: pass debug option from opcode to backend 1231 dt = DiskTransfer("snapshot/%s" % idx, 1232 constants.IEIO_SCRIPT, (dev, idx), 1233 constants.IEIO_FILE, (path, ), 1234 finished_fn) 1235 transfers.append(dt) 1236 1237 # Actually export data 1238 dresults = TransferInstanceData(self._lu, self._feedback_fn, 1239 src_node, dest_node.name, 1240 dest_node.secondary_ip, 1241 instance, transfers) 1242 1243 assert len(dresults) == len(instance.disks) 1244 1245 self._feedback_fn("Finalizing export on %s" % dest_node.name) 1246 result = self._lu.rpc.call_finalize_export(dest_node.name, instance, 1247 self._snap_disks) 1248 msg = result.fail_msg 1249 fin_resu = not msg 1250 if msg: 1251 self._lu.LogWarning("Could not finalize export for instance %s" 1252 " on node %s: %s", instance.name, dest_node.name, msg) 1253 1254 return (fin_resu, dresults)
1255
1256 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1257 """Inter-cluster instance export. 1258 1259 @type disk_info: list 1260 @param disk_info: Per-disk destination information 1261 @type key_name: string 1262 @param key_name: Name of X509 key to use 1263 @type dest_ca_pem: string 1264 @param dest_ca_pem: Destination X509 CA in PEM format 1265 @type timeouts: L{ImportExportTimeouts} 1266 @param timeouts: Timeouts for this import 1267 1268 """ 1269 instance = self._instance 1270 1271 assert len(disk_info) == len(instance.disks) 1272 1273 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks)) 1274 1275 ieloop = ImportExportLoop(self._lu) 1276 try: 1277 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks, 1278 disk_info)): 1279 # Decide whether to use IPv6 1280 ipv6 = netutils.IP6Address.IsValid(host) 1281 1282 opts = objects.ImportExportOptions(key_name=key_name, 1283 ca_pem=dest_ca_pem, 1284 magic=magic, ipv6=ipv6) 1285 1286 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port)) 1287 finished_fn = compat.partial(self._TransferFinished, idx) 1288 ieloop.Add(DiskExport(self._lu, instance.primary_node, 1289 opts, host, port, instance, 1290 constants.IEIO_SCRIPT, (dev, idx), 1291 timeouts, cbs, private=(idx, finished_fn))) 1292 1293 ieloop.Run() 1294 finally: 1295 ieloop.FinalizeAll() 1296 1297 return (True, cbs.disk_results)
1298
1299 - def _TransferFinished(self, idx):
1300 """Called once a transfer has finished. 1301 1302 @type idx: number 1303 @param idx: Disk index 1304 1305 """ 1306 logging.debug("Transfer %s finished", idx) 1307 self._RemoveSnapshot(idx)
1308
1309 - def Cleanup(self):
1310 """Remove all snapshots. 1311 1312 """ 1313 assert len(self._removed_snaps) == len(self._instance.disks) 1314 for idx in range(len(self._instance.disks)): 1315 self._RemoveSnapshot(idx)
1316
1317 1318 -class _RemoteImportCb(ImportExportCbBase):
1319 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count, 1320 external_address):
1321 """Initializes this class. 1322 1323 @type cds: string 1324 @param cds: Cluster domain secret 1325 @type x509_cert_pem: string 1326 @param x509_cert_pem: CA used for signing import key 1327 @type disk_count: number 1328 @param disk_count: Number of disks 1329 @type external_address: string 1330 @param external_address: External address of destination node 1331 1332 """ 1333 ImportExportCbBase.__init__(self) 1334 self._feedback_fn = feedback_fn 1335 self._cds = cds 1336 self._x509_cert_pem = x509_cert_pem 1337 self._disk_count = disk_count 1338 self._external_address = external_address 1339 1340 self._dresults = [None] * disk_count 1341 self._daemon_port = [None] * disk_count 1342 1343 self._salt = utils.GenerateSecret(8)
1344 1345 @property
1346 - def disk_results(self):
1347 """Returns per-disk results. 1348 1349 """ 1350 return self._dresults
1351
1352 - def _CheckAllListening(self):
1353 """Checks whether all daemons are listening. 1354 1355 If all daemons are listening, the information is sent to the client. 1356 1357 """ 1358 if not compat.all(dp is not None for dp in self._daemon_port): 1359 return 1360 1361 host = self._external_address 1362 1363 disks = [] 1364 for idx, (port, magic) in enumerate(self._daemon_port): 1365 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt, 1366 idx, host, port, magic)) 1367 1368 assert len(disks) == self._disk_count 1369 1370 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, { 1371 "disks": disks, 1372 "x509_ca": self._x509_cert_pem, 1373 })
1374
1375 - def ReportListening(self, ie, private):
1376 """Called when daemon started listening. 1377 1378 """ 1379 (idx, ) = private 1380 1381 self._feedback_fn("Disk %s is now listening" % idx) 1382 1383 assert self._daemon_port[idx] is None 1384 1385 self._daemon_port[idx] = (ie.listen_port, ie.magic) 1386 1387 self._CheckAllListening()
1388
1389 - def ReportConnected(self, ie, private):
1390 """Called when a connection has been established. 1391 1392 """ 1393 (idx, ) = private 1394 1395 self._feedback_fn("Disk %s is now receiving data" % idx)
1396
1397 - def ReportFinished(self, ie, private):
1398 """Called when a transfer has finished. 1399 1400 """ 1401 (idx, ) = private 1402 1403 # Daemon is certainly no longer listening 1404 self._daemon_port[idx] = None 1405 1406 if ie.success: 1407 self._feedback_fn("Disk %s finished receiving data" % idx) 1408 else: 1409 self._feedback_fn(("Disk %s failed to receive data: %s" 1410 " (recent output: %s)") % 1411 (idx, ie.final_message, ie.recent_output)) 1412 1413 self._dresults[idx] = bool(ie.success)
1414
1415 1416 -def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca, 1417 cds, timeouts):
1418 """Imports an instance from another cluster. 1419 1420 @param lu: Logical unit instance 1421 @param feedback_fn: Feedback function 1422 @type instance: L{objects.Instance} 1423 @param instance: Instance object 1424 @type pnode: L{objects.Node} 1425 @param pnode: Primary node of instance as an object 1426 @type source_x509_ca: OpenSSL.crypto.X509 1427 @param source_x509_ca: Import source's X509 CA 1428 @type cds: string 1429 @param cds: Cluster domain secret 1430 @type timeouts: L{ImportExportTimeouts} 1431 @param timeouts: Timeouts for this import 1432 1433 """ 1434 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, 1435 source_x509_ca) 1436 1437 magic_base = utils.GenerateSecret(6) 1438 1439 # Decide whether to use IPv6 1440 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip) 1441 1442 # Create crypto key 1443 result = lu.rpc.call_x509_cert_create(instance.primary_node, 1444 constants.RIE_CERT_VALIDITY) 1445 result.Raise("Can't create X509 key and certificate on %s" % result.node) 1446 1447 (x509_key_name, x509_cert_pem) = result.payload 1448 try: 1449 # Load certificate 1450 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, 1451 x509_cert_pem) 1452 1453 # Sign certificate 1454 signed_x509_cert_pem = \ 1455 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8)) 1456 1457 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem, 1458 len(instance.disks), pnode.primary_ip) 1459 1460 ieloop = ImportExportLoop(lu) 1461 try: 1462 for idx, dev in enumerate(instance.disks): 1463 magic = _GetInstDiskMagic(magic_base, instance.name, idx) 1464 1465 # Import daemon options 1466 opts = objects.ImportExportOptions(key_name=x509_key_name, 1467 ca_pem=source_ca_pem, 1468 magic=magic, ipv6=ipv6) 1469 1470 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance, 1471 constants.IEIO_SCRIPT, (dev, idx), 1472 timeouts, cbs, private=(idx, ))) 1473 1474 ieloop.Run() 1475 finally: 1476 ieloop.FinalizeAll() 1477 finally: 1478 # Remove crypto key and certificate 1479 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name) 1480 result.Raise("Can't remove X509 key and certificate on %s" % result.node) 1481 1482 return cbs.disk_results
1483
1484 1485 -def _GetImportExportHandshakeMessage(version):
1486 """Returns the handshake message for a RIE protocol version. 1487 1488 @type version: number 1489 1490 """ 1491 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1492
1493 1494 -def ComputeRemoteExportHandshake(cds):
1495 """Computes the remote import/export handshake. 1496 1497 @type cds: string 1498 @param cds: Cluster domain secret 1499 1500 """ 1501 salt = utils.GenerateSecret(8) 1502 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION) 1503 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1504
1505 1506 -def CheckRemoteExportHandshake(cds, handshake):
1507 """Checks the handshake of a remote import/export. 1508 1509 @type cds: string 1510 @param cds: Cluster domain secret 1511 @type handshake: sequence 1512 @param handshake: Handshake sent by remote peer 1513 1514 """ 1515 try: 1516 (version, hmac_digest, hmac_salt) = handshake 1517 except (TypeError, ValueError), err: 1518 return "Invalid data: %s" % err 1519 1520 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version), 1521 hmac_digest, salt=hmac_salt): 1522 return "Hash didn't match, clusters don't share the same domain secret" 1523 1524 if version != constants.RIE_VERSION: 1525 return ("Clusters don't have the same remote import/export protocol" 1526 " (local=%s, remote=%s)" % 1527 (constants.RIE_VERSION, version)) 1528 1529 return None
1530
1531 1532 -def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1533 """Returns the hashed text for import/export disk information. 1534 1535 @type disk_index: number 1536 @param disk_index: Index of disk (included in hash) 1537 @type host: string 1538 @param host: Hostname 1539 @type port: number 1540 @param port: Daemon port 1541 @type magic: string 1542 @param magic: Magic value 1543 1544 """ 1545 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1546
1547 1548 -def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1549 """Verifies received disk information for an export. 1550 1551 @type cds: string 1552 @param cds: Cluster domain secret 1553 @type disk_index: number 1554 @param disk_index: Index of disk (included in hash) 1555 @type disk_info: sequence 1556 @param disk_info: Disk information sent by remote peer 1557 1558 """ 1559 try: 1560 (host, port, magic, hmac_digest, hmac_salt) = disk_info 1561 except (TypeError, ValueError), err: 1562 raise errors.GenericError("Invalid data: %s" % err) 1563 1564 if not (host and port and magic): 1565 raise errors.GenericError("Missing destination host, port or magic") 1566 1567 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1568 1569 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt): 1570 raise errors.GenericError("HMAC is wrong") 1571 1572 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host): 1573 destination = host 1574 else: 1575 destination = netutils.Hostname.GetNormalizedName(host) 1576 1577 return (destination, 1578 utils.ValidateServiceName(port), 1579 magic)
1580
1581 1582 -def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1583 """Computes the signed disk information for a remote import. 1584 1585 @type cds: string 1586 @param cds: Cluster domain secret 1587 @type salt: string 1588 @param salt: HMAC salt 1589 @type disk_index: number 1590 @param disk_index: Index of disk (included in hash) 1591 @type host: string 1592 @param host: Hostname 1593 @type port: number 1594 @param port: Daemon port 1595 @type magic: string 1596 @param magic: Magic value 1597 1598 """ 1599 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1600 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt) 1601 return (host, port, magic, hmac_digest, salt)
1602