Package ganeti :: Package masterd :: Module instance
[hide private]
[frames] | no frames]

Source Code for Module ganeti.masterd.instance

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2010 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Instance-related functions and classes for masterd. 
  23   
  24  """ 
  25   
  26  import logging 
  27  import time 
  28  import OpenSSL 
  29   
  30  from ganeti import constants 
  31  from ganeti import errors 
  32  from ganeti import compat 
  33  from ganeti import utils 
  34  from ganeti import objects 
  35  from ganeti import netutils 
36 37 38 -class _ImportExportError(Exception):
39 """Local exception to report import/export errors. 40 41 """
42
43 44 -class ImportExportTimeouts(object):
45 #: Time until daemon starts writing status file 46 DEFAULT_READY_TIMEOUT = 10 47 48 #: Length of time until errors cause hard failure 49 DEFAULT_ERROR_TIMEOUT = 10 50 51 #: Time after which daemon must be listening 52 DEFAULT_LISTEN_TIMEOUT = 10 53 54 #: Progress update interval 55 DEFAULT_PROGRESS_INTERVAL = 60 56 57 __slots__ = [ 58 "error", 59 "ready", 60 "listen", 61 "connect", 62 "progress", 63 ] 64
65 - def __init__(self, connect, 66 listen=DEFAULT_LISTEN_TIMEOUT, 67 error=DEFAULT_ERROR_TIMEOUT, 68 ready=DEFAULT_READY_TIMEOUT, 69 progress=DEFAULT_PROGRESS_INTERVAL):
70 """Initializes this class. 71 72 @type connect: number 73 @param connect: Timeout for establishing connection 74 @type listen: number 75 @param listen: Timeout for starting to listen for connections 76 @type error: number 77 @param error: Length of time until errors cause hard failure 78 @type ready: number 79 @param ready: Timeout for daemon to become ready 80 @type progress: number 81 @param progress: Progress update interval 82 83 """ 84 self.error = error 85 self.ready = ready 86 self.listen = listen 87 self.connect = connect 88 self.progress = progress
89
90 91 -class ImportExportCbBase(object):
92 """Callbacks for disk import/export. 93 94 """
95 - def ReportListening(self, ie, private):
96 """Called when daemon started listening. 97 98 @type ie: Subclass of L{_DiskImportExportBase} 99 @param ie: Import/export object 100 @param private: Private data passed to import/export object 101 102 """
103
104 - def ReportConnected(self, ie, private):
105 """Called when a connection has been established. 106 107 @type ie: Subclass of L{_DiskImportExportBase} 108 @param ie: Import/export object 109 @param private: Private data passed to import/export object 110 111 """
112
113 - def ReportProgress(self, ie, private):
114 """Called when new progress information should be reported. 115 116 @type ie: Subclass of L{_DiskImportExportBase} 117 @param ie: Import/export object 118 @param private: Private data passed to import/export object 119 120 """
121
122 - def ReportFinished(self, ie, private):
123 """Called when a transfer has finished. 124 125 @type ie: Subclass of L{_DiskImportExportBase} 126 @param ie: Import/export object 127 @param private: Private data passed to import/export object 128 129 """
130
131 132 -def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
133 """Checks whether a timeout has expired. 134 135 """ 136 return _time_fn() > (epoch + timeout)
137
138 139 -class _DiskImportExportBase(object):
140 MODE_TEXT = None 141
142 - def __init__(self, lu, node_name, opts, 143 instance, timeouts, cbs, private=None):
144 """Initializes this class. 145 146 @param lu: Logical unit instance 147 @type node_name: string 148 @param node_name: Node name for import 149 @type opts: L{objects.ImportExportOptions} 150 @param opts: Import/export daemon options 151 @type instance: L{objects.Instance} 152 @param instance: Instance object 153 @type timeouts: L{ImportExportTimeouts} 154 @param timeouts: Timeouts for this import 155 @type cbs: L{ImportExportCbBase} 156 @param cbs: Callbacks 157 @param private: Private data for callback functions 158 159 """ 160 assert self.MODE_TEXT 161 162 self._lu = lu 163 self.node_name = node_name 164 self._opts = opts 165 self._instance = instance 166 self._timeouts = timeouts 167 self._cbs = cbs 168 self._private = private 169 170 # Parent loop 171 self._loop = None 172 173 # Timestamps 174 self._ts_begin = None 175 self._ts_connected = None 176 self._ts_finished = None 177 self._ts_cleanup = None 178 self._ts_last_progress = None 179 self._ts_last_error = None 180 181 # Transfer status 182 self.success = None 183 self.final_message = None 184 185 # Daemon status 186 self._daemon_name = None 187 self._daemon = None
188 189 @property
190 - def recent_output(self):
191 """Returns the most recent output from the daemon. 192 193 """ 194 if self._daemon: 195 return self._daemon.recent_output 196 197 return None
198 199 @property
200 - def progress(self):
201 """Returns transfer progress information. 202 203 """ 204 if not self._daemon: 205 return None 206 207 return (self._daemon.progress_mbytes, 208 self._daemon.progress_throughput, 209 self._daemon.progress_percent, 210 self._daemon.progress_eta)
211 212 @property
213 - def magic(self):
214 """Returns the magic value for this import/export. 215 216 """ 217 return self._opts.magic
218 219 @property
220 - def active(self):
221 """Determines whether this transport is still active. 222 223 """ 224 return self.success is None
225 226 @property
227 - def loop(self):
228 """Returns parent loop. 229 230 @rtype: L{ImportExportLoop} 231 232 """ 233 return self._loop
234
235 - def SetLoop(self, loop):
236 """Sets the parent loop. 237 238 @type loop: L{ImportExportLoop} 239 240 """ 241 if self._loop: 242 raise errors.ProgrammerError("Loop can only be set once") 243 244 self._loop = loop
245
246 - def _StartDaemon(self):
247 """Starts the import/export daemon. 248 249 """ 250 raise NotImplementedError()
251
252 - def CheckDaemon(self):
253 """Checks whether daemon has been started and if not, starts it. 254 255 @rtype: string 256 @return: Daemon name 257 258 """ 259 assert self._ts_cleanup is None 260 261 if self._daemon_name is None: 262 assert self._ts_begin is None 263 264 result = self._StartDaemon() 265 if result.fail_msg: 266 raise _ImportExportError("Failed to start %s on %s: %s" % 267 (self.MODE_TEXT, self.node_name, 268 result.fail_msg)) 269 270 daemon_name = result.payload 271 272 logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name, 273 self.node_name) 274 275 self._ts_begin = time.time() 276 self._daemon_name = daemon_name 277 278 return self._daemon_name
279
280 - def GetDaemonName(self):
281 """Returns the daemon name. 282 283 """ 284 assert self._daemon_name, "Daemon has not been started" 285 assert self._ts_cleanup is None 286 return self._daemon_name
287
288 - def Abort(self):
289 """Sends SIGTERM to import/export daemon (if still active). 290 291 """ 292 if self._daemon_name: 293 self._lu.LogWarning("Aborting %s %r on %s", 294 self.MODE_TEXT, self._daemon_name, self.node_name) 295 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name) 296 if result.fail_msg: 297 self._lu.LogWarning("Failed to abort %s %r on %s: %s", 298 self.MODE_TEXT, self._daemon_name, 299 self.node_name, result.fail_msg) 300 return False 301 302 return True
303
304 - def _SetDaemonData(self, data):
305 """Internal function for updating status daemon data. 306 307 @type data: L{objects.ImportExportStatus} 308 @param data: Daemon status data 309 310 """ 311 assert self._ts_begin is not None 312 313 if not data: 314 if _TimeoutExpired(self._ts_begin, self._timeouts.ready): 315 raise _ImportExportError("Didn't become ready after %s seconds" % 316 self._timeouts.ready) 317 318 return False 319 320 self._daemon = data 321 322 return True
323
324 - def SetDaemonData(self, success, data):
325 """Updates daemon status data. 326 327 @type success: bool 328 @param success: Whether fetching data was successful or not 329 @type data: L{objects.ImportExportStatus} 330 @param data: Daemon status data 331 332 """ 333 if not success: 334 if self._ts_last_error is None: 335 self._ts_last_error = time.time() 336 337 elif _TimeoutExpired(self._ts_last_error, self._timeouts.error): 338 raise _ImportExportError("Too many errors while updating data") 339 340 return False 341 342 self._ts_last_error = None 343 344 return self._SetDaemonData(data)
345
346 - def CheckListening(self):
347 """Checks whether the daemon is listening. 348 349 """ 350 raise NotImplementedError()
351
352 - def _GetConnectedCheckEpoch(self):
353 """Returns timeout to calculate connect timeout. 354 355 """ 356 raise NotImplementedError()
357
358 - def CheckConnected(self):
359 """Checks whether the daemon is connected. 360 361 @rtype: bool 362 @return: Whether the daemon is connected 363 364 """ 365 assert self._daemon, "Daemon status missing" 366 367 if self._ts_connected is not None: 368 return True 369 370 if self._daemon.connected: 371 self._ts_connected = time.time() 372 373 # TODO: Log remote peer 374 logging.debug("%s %r on %s is now connected", 375 self.MODE_TEXT, self._daemon_name, self.node_name) 376 377 self._cbs.ReportConnected(self, self._private) 378 379 return True 380 381 if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect): 382 raise _ImportExportError("Not connected after %s seconds" % 383 self._timeouts.connect) 384 385 return False
386
387 - def _CheckProgress(self):
388 """Checks whether a progress update should be reported. 389 390 """ 391 if ((self._ts_last_progress is None or 392 _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and 393 self._daemon and 394 self._daemon.progress_mbytes is not None and 395 self._daemon.progress_throughput is not None): 396 self._cbs.ReportProgress(self, self._private) 397 self._ts_last_progress = time.time()
398
399 - def CheckFinished(self):
400 """Checks whether the daemon exited. 401 402 @rtype: bool 403 @return: Whether the transfer is finished 404 405 """ 406 assert self._daemon, "Daemon status missing" 407 408 if self._ts_finished: 409 return True 410 411 if self._daemon.exit_status is None: 412 # TODO: Adjust delay for ETA expiring soon 413 self._CheckProgress() 414 return False 415 416 self._ts_finished = time.time() 417 418 self._ReportFinished(self._daemon.exit_status == 0, 419 self._daemon.error_message) 420 421 return True
422
423 - def _ReportFinished(self, success, message):
424 """Transfer is finished or daemon exited. 425 426 @type success: bool 427 @param success: Whether the transfer was successful 428 @type message: string 429 @param message: Error message 430 431 """ 432 assert self.success is None 433 434 self.success = success 435 self.final_message = message 436 437 if success: 438 logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name, 439 self.node_name) 440 elif self._daemon_name: 441 self._lu.LogWarning("%s %r on %s failed: %s", 442 self.MODE_TEXT, self._daemon_name, self.node_name, 443 message) 444 else: 445 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT, 446 self.node_name, message) 447 448 self._cbs.ReportFinished(self, self._private)
449
450 - def _Finalize(self):
451 """Makes the RPC call to finalize this import/export. 452 453 """ 454 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
455
456 - def Finalize(self, error=None):
457 """Finalizes this import/export. 458 459 """ 460 if self._daemon_name: 461 logging.info("Finalizing %s %r on %s", 462 self.MODE_TEXT, self._daemon_name, self.node_name) 463 464 result = self._Finalize() 465 if result.fail_msg: 466 self._lu.LogWarning("Failed to finalize %s %r on %s: %s", 467 self.MODE_TEXT, self._daemon_name, 468 self.node_name, result.fail_msg) 469 return False 470 471 # Daemon is no longer running 472 self._daemon_name = None 473 self._ts_cleanup = time.time() 474 475 if error: 476 self._ReportFinished(False, error) 477 478 return True
479
480 481 -class DiskImport(_DiskImportExportBase):
482 MODE_TEXT = "import" 483
484 - def __init__(self, lu, node_name, opts, instance, 485 dest, dest_args, timeouts, cbs, private=None):
486 """Initializes this class. 487 488 @param lu: Logical unit instance 489 @type node_name: string 490 @param node_name: Node name for import 491 @type opts: L{objects.ImportExportOptions} 492 @param opts: Import/export daemon options 493 @type instance: L{objects.Instance} 494 @param instance: Instance object 495 @param dest: I/O destination 496 @param dest_args: I/O arguments 497 @type timeouts: L{ImportExportTimeouts} 498 @param timeouts: Timeouts for this import 499 @type cbs: L{ImportExportCbBase} 500 @param cbs: Callbacks 501 @param private: Private data for callback functions 502 503 """ 504 _DiskImportExportBase.__init__(self, lu, node_name, opts, 505 instance, timeouts, cbs, private) 506 self._dest = dest 507 self._dest_args = dest_args 508 509 # Timestamps 510 self._ts_listening = None
511 512 @property
513 - def listen_port(self):
514 """Returns the port the daemon is listening on. 515 516 """ 517 if self._daemon: 518 return self._daemon.listen_port 519 520 return None
521
522 - def _StartDaemon(self):
523 """Starts the import daemon. 524 525 """ 526 return self._lu.rpc.call_import_start(self.node_name, self._opts, 527 self._instance, 528 self._dest, self._dest_args)
529
530 - def CheckListening(self):
531 """Checks whether the daemon is listening. 532 533 @rtype: bool 534 @return: Whether the daemon is listening 535 536 """ 537 assert self._daemon, "Daemon status missing" 538 539 if self._ts_listening is not None: 540 return True 541 542 port = self._daemon.listen_port 543 if port is not None: 544 self._ts_listening = time.time() 545 546 logging.debug("Import %r on %s is now listening on port %s", 547 self._daemon_name, self.node_name, port) 548 549 self._cbs.ReportListening(self, self._private) 550 551 return True 552 553 if _TimeoutExpired(self._ts_begin, self._timeouts.listen): 554 raise _ImportExportError("Not listening after %s seconds" % 555 self._timeouts.listen) 556 557 return False
558
559 - def _GetConnectedCheckEpoch(self):
560 """Returns the time since we started listening. 561 562 """ 563 assert self._ts_listening is not None, \ 564 ("Checking whether an import is connected is only useful" 565 " once it's been listening") 566 567 return self._ts_listening
568
569 570 -class DiskExport(_DiskImportExportBase):
571 MODE_TEXT = "export" 572
573 - def __init__(self, lu, node_name, opts, 574 dest_host, dest_port, instance, source, source_args, 575 timeouts, cbs, private=None):
576 """Initializes this class. 577 578 @param lu: Logical unit instance 579 @type node_name: string 580 @param node_name: Node name for import 581 @type opts: L{objects.ImportExportOptions} 582 @param opts: Import/export daemon options 583 @type dest_host: string 584 @param dest_host: Destination host name or IP address 585 @type dest_port: number 586 @param dest_port: Destination port number 587 @type instance: L{objects.Instance} 588 @param instance: Instance object 589 @param source: I/O source 590 @param source_args: I/O source 591 @type timeouts: L{ImportExportTimeouts} 592 @param timeouts: Timeouts for this import 593 @type cbs: L{ImportExportCbBase} 594 @param cbs: Callbacks 595 @param private: Private data for callback functions 596 597 """ 598 _DiskImportExportBase.__init__(self, lu, node_name, opts, 599 instance, timeouts, cbs, private) 600 self._dest_host = dest_host 601 self._dest_port = dest_port 602 self._source = source 603 self._source_args = source_args
604
605 - def _StartDaemon(self):
606 """Starts the export daemon. 607 608 """ 609 return self._lu.rpc.call_export_start(self.node_name, self._opts, 610 self._dest_host, self._dest_port, 611 self._instance, self._source, 612 self._source_args)
613
614 - def CheckListening(self):
615 """Checks whether the daemon is listening. 616 617 """ 618 # Only an import can be listening 619 return True
620
621 - def _GetConnectedCheckEpoch(self):
622 """Returns the time since the daemon started. 623 624 """ 625 assert self._ts_begin is not None 626 627 return self._ts_begin
628
629 630 -def FormatProgress(progress):
631 """Formats progress information for user consumption 632 633 """ 634 (mbytes, throughput, percent, eta) = progress 635 636 parts = [ 637 utils.FormatUnit(mbytes, "h"), 638 639 # Not using FormatUnit as it doesn't support kilobytes 640 "%0.1f MiB/s" % throughput, 641 ] 642 643 if percent is not None: 644 parts.append("%d%%" % percent) 645 646 if eta is not None: 647 parts.append("ETA %s" % utils.FormatSeconds(eta)) 648 649 return utils.CommaJoin(parts)
650
651 652 -class ImportExportLoop:
653 MIN_DELAY = 1.0 654 MAX_DELAY = 20.0 655
656 - def __init__(self, lu):
657 """Initializes this class. 658 659 """ 660 self._lu = lu 661 self._queue = [] 662 self._pending_add = []
663
664 - def Add(self, diskie):
665 """Adds an import/export object to the loop. 666 667 @type diskie: Subclass of L{_DiskImportExportBase} 668 @param diskie: Import/export object 669 670 """ 671 assert diskie not in self._pending_add 672 assert diskie.loop is None 673 674 diskie.SetLoop(self) 675 676 # Adding new objects to a staging list is necessary, otherwise the main 677 # loop gets confused if callbacks modify the queue while the main loop is 678 # iterating over it. 679 self._pending_add.append(diskie)
680 681 @staticmethod
682 - def _CollectDaemonStatus(lu, daemons):
683 """Collects the status for all import/export daemons. 684 685 """ 686 daemon_status = {} 687 688 for node_name, names in daemons.iteritems(): 689 result = lu.rpc.call_impexp_status(node_name, names) 690 if result.fail_msg: 691 lu.LogWarning("Failed to get daemon status on %s: %s", 692 node_name, result.fail_msg) 693 continue 694 695 assert len(names) == len(result.payload) 696 697 daemon_status[node_name] = dict(zip(names, result.payload)) 698 699 return daemon_status
700 701 @staticmethod
702 - def _GetActiveDaemonNames(queue):
703 """Gets the names of all active daemons. 704 705 """ 706 result = {} 707 for diskie in queue: 708 if not diskie.active: 709 continue 710 711 try: 712 # Start daemon if necessary 713 daemon_name = diskie.CheckDaemon() 714 except _ImportExportError, err: 715 logging.exception("%s failed", diskie.MODE_TEXT) 716 diskie.Finalize(error=str(err)) 717 continue 718 719 result.setdefault(diskie.node_name, []).append(daemon_name) 720 721 assert len(queue) >= len(result) 722 assert len(queue) >= sum([len(names) for names in result.itervalues()]) 723 724 logging.debug("daemons=%r", result) 725 726 return result
727
728 - def _AddPendingToQueue(self):
729 """Adds all pending import/export objects to the internal queue. 730 731 """ 732 assert compat.all(diskie not in self._queue and diskie.loop == self 733 for diskie in self._pending_add) 734 735 self._queue.extend(self._pending_add) 736 737 del self._pending_add[:]
738
739 - def Run(self):
740 """Utility main loop. 741 742 """ 743 while True: 744 self._AddPendingToQueue() 745 746 # Collect all active daemon names 747 daemons = self._GetActiveDaemonNames(self._queue) 748 if not daemons: 749 break 750 751 # Collection daemon status data 752 data = self._CollectDaemonStatus(self._lu, daemons) 753 754 # Use data 755 delay = self.MAX_DELAY 756 for diskie in self._queue: 757 if not diskie.active: 758 continue 759 760 try: 761 try: 762 all_daemon_data = data[diskie.node_name] 763 except KeyError: 764 result = diskie.SetDaemonData(False, None) 765 else: 766 result = \ 767 diskie.SetDaemonData(True, 768 all_daemon_data[diskie.GetDaemonName()]) 769 770 if not result: 771 # Daemon not yet ready, retry soon 772 delay = min(3.0, delay) 773 continue 774 775 if diskie.CheckFinished(): 776 # Transfer finished 777 diskie.Finalize() 778 continue 779 780 # Normal case: check again in 5 seconds 781 delay = min(5.0, delay) 782 783 if not diskie.CheckListening(): 784 # Not yet listening, retry soon 785 delay = min(1.0, delay) 786 continue 787 788 if not diskie.CheckConnected(): 789 # Not yet connected, retry soon 790 delay = min(1.0, delay) 791 continue 792 793 except _ImportExportError, err: 794 logging.exception("%s failed", diskie.MODE_TEXT) 795 diskie.Finalize(error=str(err)) 796 797 if not compat.any(diskie.active for diskie in self._queue): 798 break 799 800 # Wait a bit 801 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay)) 802 logging.debug("Waiting for %ss", delay) 803 time.sleep(delay)
804
805 - def FinalizeAll(self):
806 """Finalizes all pending transfers. 807 808 """ 809 success = True 810 811 for diskie in self._queue: 812 success = diskie.Finalize() and success 813 814 return success
815
816 817 -class _TransferInstCbBase(ImportExportCbBase):
818 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs, 819 dest_node, dest_ip):
820 """Initializes this class. 821 822 """ 823 ImportExportCbBase.__init__(self) 824 825 self.lu = lu 826 self.feedback_fn = feedback_fn 827 self.instance = instance 828 self.timeouts = timeouts 829 self.src_node = src_node 830 self.src_cbs = src_cbs 831 self.dest_node = dest_node 832 self.dest_ip = dest_ip
833
834 835 -class _TransferInstSourceCb(_TransferInstCbBase):
836 - def ReportConnected(self, ie, dtp):
837 """Called when a connection has been established. 838 839 """ 840 assert self.src_cbs is None 841 assert dtp.src_export == ie 842 assert dtp.dest_import 843 844 self.feedback_fn("%s is sending data on %s" % 845 (dtp.data.name, ie.node_name))
846
847 - def ReportProgress(self, ie, dtp):
848 """Called when new progress information should be reported. 849 850 """ 851 progress = ie.progress 852 if not progress: 853 return 854 855 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
856
857 - def ReportFinished(self, ie, dtp):
858 """Called when a transfer has finished. 859 860 """ 861 assert self.src_cbs is None 862 assert dtp.src_export == ie 863 assert dtp.dest_import 864 865 if ie.success: 866 self.feedback_fn("%s finished sending data" % dtp.data.name) 867 else: 868 self.feedback_fn("%s failed to send data: %s (recent output: %r)" % 869 (dtp.data.name, ie.final_message, ie.recent_output)) 870 871 dtp.RecordResult(ie.success) 872 873 cb = dtp.data.finished_fn 874 if cb: 875 cb() 876 877 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 878 # give the daemon a moment to sort things out 879 if dtp.dest_import and not ie.success: 880 dtp.dest_import.Abort()
881
882 883 -class _TransferInstDestCb(_TransferInstCbBase):
884 - def ReportListening(self, ie, dtp):
885 """Called when daemon started listening. 886 887 """ 888 assert self.src_cbs 889 assert dtp.src_export is None 890 assert dtp.dest_import 891 assert dtp.export_opts 892 893 self.feedback_fn("%s is now listening, starting export" % dtp.data.name) 894 895 # Start export on source node 896 de = DiskExport(self.lu, self.src_node, dtp.export_opts, 897 self.dest_ip, ie.listen_port, 898 self.instance, dtp.data.src_io, dtp.data.src_ioargs, 899 self.timeouts, self.src_cbs, private=dtp) 900 ie.loop.Add(de) 901 902 dtp.src_export = de
903
904 - def ReportConnected(self, ie, dtp):
905 """Called when a connection has been established. 906 907 """ 908 self.feedback_fn("%s is receiving data on %s" % 909 (dtp.data.name, self.dest_node))
910
911 - def ReportFinished(self, ie, dtp):
912 """Called when a transfer has finished. 913 914 """ 915 if ie.success: 916 self.feedback_fn("%s finished receiving data" % dtp.data.name) 917 else: 918 self.feedback_fn("%s failed to receive data: %s (recent output: %r)" % 919 (dtp.data.name, ie.final_message, ie.recent_output)) 920 921 dtp.RecordResult(ie.success) 922 923 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 924 # give the daemon a moment to sort things out 925 if dtp.src_export and not ie.success: 926 dtp.src_export.Abort()
927
928 929 -class DiskTransfer(object):
930 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs, 931 finished_fn):
932 """Initializes this class. 933 934 @type name: string 935 @param name: User-visible name for this transfer (e.g. "disk/0") 936 @param src_io: Source I/O type 937 @param src_ioargs: Source I/O arguments 938 @param dest_io: Destination I/O type 939 @param dest_ioargs: Destination I/O arguments 940 @type finished_fn: callable 941 @param finished_fn: Function called once transfer has finished 942 943 """ 944 self.name = name 945 946 self.src_io = src_io 947 self.src_ioargs = src_ioargs 948 949 self.dest_io = dest_io 950 self.dest_ioargs = dest_ioargs 951 952 self.finished_fn = finished_fn
953
954 955 -class _DiskTransferPrivate(object):
956 - def __init__(self, data, success, export_opts):
957 """Initializes this class. 958 959 @type data: L{DiskTransfer} 960 @type success: bool 961 962 """ 963 self.data = data 964 self.success = success 965 self.export_opts = export_opts 966 967 self.src_export = None 968 self.dest_import = None
969
970 - def RecordResult(self, success):
971 """Updates the status. 972 973 One failed part will cause the whole transfer to fail. 974 975 """ 976 self.success = self.success and success
977
978 979 -def _GetInstDiskMagic(base, instance_name, index):
980 """Computes the magic value for a disk export or import. 981 982 @type base: string 983 @param base: Random seed value (can be the same for all disks of a transfer) 984 @type instance_name: string 985 @param instance_name: Name of instance 986 @type index: number 987 @param index: Disk index 988 989 """ 990 h = compat.sha1_hash() 991 h.update(str(constants.RIE_VERSION)) 992 h.update(base) 993 h.update(instance_name) 994 h.update(str(index)) 995 return h.hexdigest()
996
997 998 -def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, 999 instance, all_transfers):
1000 """Transfers an instance's data from one node to another. 1001 1002 @param lu: Logical unit instance 1003 @param feedback_fn: Feedback function 1004 @type src_node: string 1005 @param src_node: Source node name 1006 @type dest_node: string 1007 @param dest_node: Destination node name 1008 @type dest_ip: string 1009 @param dest_ip: IP address of destination node 1010 @type instance: L{objects.Instance} 1011 @param instance: Instance object 1012 @type all_transfers: list of L{DiskTransfer} instances 1013 @param all_transfers: List of all disk transfers to be made 1014 @rtype: list 1015 @return: List with a boolean (True=successful, False=failed) for success for 1016 each transfer 1017 1018 """ 1019 # Disable compression for all moves as these are all within the same cluster 1020 compress = constants.IEC_NONE 1021 1022 logging.debug("Source node %s, destination node %s, compression '%s'", 1023 src_node, dest_node, compress) 1024 1025 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) 1026 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, 1027 src_node, None, dest_node, dest_ip) 1028 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts, 1029 src_node, src_cbs, dest_node, dest_ip) 1030 1031 all_dtp = [] 1032 1033 base_magic = utils.GenerateSecret(6) 1034 1035 ieloop = ImportExportLoop(lu) 1036 try: 1037 for idx, transfer in enumerate(all_transfers): 1038 if transfer: 1039 feedback_fn("Exporting %s from %s to %s" % 1040 (transfer.name, src_node, dest_node)) 1041 1042 magic = _GetInstDiskMagic(base_magic, instance.name, idx) 1043 opts = objects.ImportExportOptions(key_name=None, ca_pem=None, 1044 compress=compress, magic=magic) 1045 1046 dtp = _DiskTransferPrivate(transfer, True, opts) 1047 1048 di = DiskImport(lu, dest_node, opts, instance, 1049 transfer.dest_io, transfer.dest_ioargs, 1050 timeouts, dest_cbs, private=dtp) 1051 ieloop.Add(di) 1052 1053 dtp.dest_import = di 1054 else: 1055 dtp = _DiskTransferPrivate(None, False, None) 1056 1057 all_dtp.append(dtp) 1058 1059 ieloop.Run() 1060 finally: 1061 ieloop.FinalizeAll() 1062 1063 assert len(all_dtp) == len(all_transfers) 1064 assert compat.all((dtp.src_export is None or 1065 dtp.src_export.success is not None) and 1066 (dtp.dest_import is None or 1067 dtp.dest_import.success is not None) 1068 for dtp in all_dtp), \ 1069 "Not all imports/exports are finalized" 1070 1071 return [bool(dtp.success) for dtp in all_dtp]
1072
1073 1074 -class _RemoteExportCb(ImportExportCbBase):
1075 - def __init__(self, feedback_fn, disk_count):
1076 """Initializes this class. 1077 1078 """ 1079 ImportExportCbBase.__init__(self) 1080 self._feedback_fn = feedback_fn 1081 self._dresults = [None] * disk_count
1082 1083 @property
1084 - def disk_results(self):
1085 """Returns per-disk results. 1086 1087 """ 1088 return self._dresults
1089
1090 - def ReportConnected(self, ie, private):
1091 """Called when a connection has been established. 1092 1093 """ 1094 (idx, _) = private 1095 1096 self._feedback_fn("Disk %s is now sending data" % idx)
1097
1098 - def ReportProgress(self, ie, private):
1099 """Called when new progress information should be reported. 1100 1101 """ 1102 (idx, _) = private 1103 1104 progress = ie.progress 1105 if not progress: 1106 return 1107 1108 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1109
1110 - def ReportFinished(self, ie, private):
1111 """Called when a transfer has finished. 1112 1113 """ 1114 (idx, finished_fn) = private 1115 1116 if ie.success: 1117 self._feedback_fn("Disk %s finished sending data" % idx) 1118 else: 1119 self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" % 1120 (idx, ie.final_message, ie.recent_output)) 1121 1122 self._dresults[idx] = bool(ie.success) 1123 1124 if finished_fn: 1125 finished_fn()
1126
1127 1128 -class ExportInstanceHelper:
1129 - def __init__(self, lu, feedback_fn, instance):
1130 """Initializes this class. 1131 1132 @param lu: Logical unit instance 1133 @param feedback_fn: Feedback function 1134 @type instance: L{objects.Instance} 1135 @param instance: Instance object 1136 1137 """ 1138 self._lu = lu 1139 self._feedback_fn = feedback_fn 1140 self._instance = instance 1141 1142 self._snap_disks = [] 1143 self._removed_snaps = [False] * len(instance.disks)
1144
1145 - def CreateSnapshots(self):
1146 """Creates an LVM snapshot for every disk of the instance. 1147 1148 """ 1149 assert not self._snap_disks 1150 1151 instance = self._instance 1152 src_node = instance.primary_node 1153 1154 vgname = self._lu.cfg.GetVGName() 1155 1156 for idx, disk in enumerate(instance.disks): 1157 self._feedback_fn("Creating a snapshot of disk/%s on node %s" % 1158 (idx, src_node)) 1159 1160 # result.payload will be a snapshot of an lvm leaf of the one we 1161 # passed 1162 result = self._lu.rpc.call_blockdev_snapshot(src_node, disk) 1163 msg = result.fail_msg 1164 if msg: 1165 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s", 1166 idx, src_node, msg) 1167 new_dev = False 1168 else: 1169 disk_id = (vgname, result.payload) 1170 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, 1171 logical_id=disk_id, physical_id=disk_id, 1172 iv_name=disk.iv_name) 1173 1174 self._snap_disks.append(new_dev) 1175 1176 assert len(self._snap_disks) == len(instance.disks) 1177 assert len(self._removed_snaps) == len(instance.disks)
1178
1179 - def _RemoveSnapshot(self, disk_index):
1180 """Removes an LVM snapshot. 1181 1182 @type disk_index: number 1183 @param disk_index: Index of the snapshot to be removed 1184 1185 """ 1186 disk = self._snap_disks[disk_index] 1187 if disk and not self._removed_snaps[disk_index]: 1188 src_node = self._instance.primary_node 1189 1190 self._feedback_fn("Removing snapshot of disk/%s on node %s" % 1191 (disk_index, src_node)) 1192 1193 result = self._lu.rpc.call_blockdev_remove(src_node, disk) 1194 if result.fail_msg: 1195 self._lu.LogWarning("Could not remove snapshot for disk/%d from node" 1196 " %s: %s", disk_index, src_node, result.fail_msg) 1197 else: 1198 self._removed_snaps[disk_index] = True
1199
1200 - def LocalExport(self, dest_node):
1201 """Intra-cluster instance export. 1202 1203 @type dest_node: L{objects.Node} 1204 @param dest_node: Destination node 1205 1206 """ 1207 instance = self._instance 1208 src_node = instance.primary_node 1209 1210 assert len(self._snap_disks) == len(instance.disks) 1211 1212 transfers = [] 1213 1214 for idx, dev in enumerate(self._snap_disks): 1215 if not dev: 1216 transfers.append(None) 1217 continue 1218 1219 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name, 1220 dev.physical_id[1]) 1221 1222 finished_fn = compat.partial(self._TransferFinished, idx) 1223 1224 # FIXME: pass debug option from opcode to backend 1225 dt = DiskTransfer("snapshot/%s" % idx, 1226 constants.IEIO_SCRIPT, (dev, idx), 1227 constants.IEIO_FILE, (path, ), 1228 finished_fn) 1229 transfers.append(dt) 1230 1231 # Actually export data 1232 dresults = TransferInstanceData(self._lu, self._feedback_fn, 1233 src_node, dest_node.name, 1234 dest_node.secondary_ip, 1235 instance, transfers) 1236 1237 assert len(dresults) == len(instance.disks) 1238 1239 self._feedback_fn("Finalizing export on %s" % dest_node.name) 1240 result = self._lu.rpc.call_finalize_export(dest_node.name, instance, 1241 self._snap_disks) 1242 msg = result.fail_msg 1243 fin_resu = not msg 1244 if msg: 1245 self._lu.LogWarning("Could not finalize export for instance %s" 1246 " on node %s: %s", instance.name, dest_node.name, msg) 1247 1248 return (fin_resu, dresults)
1249
1250 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1251 """Inter-cluster instance export. 1252 1253 @type disk_info: list 1254 @param disk_info: Per-disk destination information 1255 @type key_name: string 1256 @param key_name: Name of X509 key to use 1257 @type dest_ca_pem: string 1258 @param dest_ca_pem: Destination X509 CA in PEM format 1259 @type timeouts: L{ImportExportTimeouts} 1260 @param timeouts: Timeouts for this import 1261 1262 """ 1263 instance = self._instance 1264 1265 assert len(disk_info) == len(instance.disks) 1266 1267 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks)) 1268 1269 ieloop = ImportExportLoop(self._lu) 1270 try: 1271 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks, 1272 disk_info)): 1273 opts = objects.ImportExportOptions(key_name=key_name, 1274 ca_pem=dest_ca_pem, 1275 magic=magic) 1276 1277 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port)) 1278 finished_fn = compat.partial(self._TransferFinished, idx) 1279 ieloop.Add(DiskExport(self._lu, instance.primary_node, 1280 opts, host, port, instance, 1281 constants.IEIO_SCRIPT, (dev, idx), 1282 timeouts, cbs, private=(idx, finished_fn))) 1283 1284 ieloop.Run() 1285 finally: 1286 ieloop.FinalizeAll() 1287 1288 return (True, cbs.disk_results)
1289
1290 - def _TransferFinished(self, idx):
1291 """Called once a transfer has finished. 1292 1293 @type idx: number 1294 @param idx: Disk index 1295 1296 """ 1297 logging.debug("Transfer %s finished", idx) 1298 self._RemoveSnapshot(idx)
1299
1300 - def Cleanup(self):
1301 """Remove all snapshots. 1302 1303 """ 1304 assert len(self._removed_snaps) == len(self._instance.disks) 1305 for idx in range(len(self._instance.disks)): 1306 self._RemoveSnapshot(idx)
1307
1308 1309 -class _RemoteImportCb(ImportExportCbBase):
1310 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count, 1311 external_address):
1312 """Initializes this class. 1313 1314 @type cds: string 1315 @param cds: Cluster domain secret 1316 @type x509_cert_pem: string 1317 @param x509_cert_pem: CA used for signing import key 1318 @type disk_count: number 1319 @param disk_count: Number of disks 1320 @type external_address: string 1321 @param external_address: External address of destination node 1322 1323 """ 1324 ImportExportCbBase.__init__(self) 1325 self._feedback_fn = feedback_fn 1326 self._cds = cds 1327 self._x509_cert_pem = x509_cert_pem 1328 self._disk_count = disk_count 1329 self._external_address = external_address 1330 1331 self._dresults = [None] * disk_count 1332 self._daemon_port = [None] * disk_count 1333 1334 self._salt = utils.GenerateSecret(8)
1335 1336 @property
1337 - def disk_results(self):
1338 """Returns per-disk results. 1339 1340 """ 1341 return self._dresults
1342
1343 - def _CheckAllListening(self):
1344 """Checks whether all daemons are listening. 1345 1346 If all daemons are listening, the information is sent to the client. 1347 1348 """ 1349 if not compat.all(dp is not None for dp in self._daemon_port): 1350 return 1351 1352 host = self._external_address 1353 1354 disks = [] 1355 for idx, (port, magic) in enumerate(self._daemon_port): 1356 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt, 1357 idx, host, port, magic)) 1358 1359 assert len(disks) == self._disk_count 1360 1361 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, { 1362 "disks": disks, 1363 "x509_ca": self._x509_cert_pem, 1364 })
1365
1366 - def ReportListening(self, ie, private):
1367 """Called when daemon started listening. 1368 1369 """ 1370 (idx, ) = private 1371 1372 self._feedback_fn("Disk %s is now listening" % idx) 1373 1374 assert self._daemon_port[idx] is None 1375 1376 self._daemon_port[idx] = (ie.listen_port, ie.magic) 1377 1378 self._CheckAllListening()
1379
1380 - def ReportConnected(self, ie, private):
1381 """Called when a connection has been established. 1382 1383 """ 1384 (idx, ) = private 1385 1386 self._feedback_fn("Disk %s is now receiving data" % idx)
1387
1388 - def ReportFinished(self, ie, private):
1389 """Called when a transfer has finished. 1390 1391 """ 1392 (idx, ) = private 1393 1394 # Daemon is certainly no longer listening 1395 self._daemon_port[idx] = None 1396 1397 if ie.success: 1398 self._feedback_fn("Disk %s finished receiving data" % idx) 1399 else: 1400 self._feedback_fn(("Disk %s failed to receive data: %s" 1401 " (recent output: %r)") % 1402 (idx, ie.final_message, ie.recent_output)) 1403 1404 self._dresults[idx] = bool(ie.success)
1405
1406 1407 -def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1408 """Imports an instance from another cluster. 1409 1410 @param lu: Logical unit instance 1411 @param feedback_fn: Feedback function 1412 @type instance: L{objects.Instance} 1413 @param instance: Instance object 1414 @type source_x509_ca: OpenSSL.crypto.X509 1415 @param source_x509_ca: Import source's X509 CA 1416 @type cds: string 1417 @param cds: Cluster domain secret 1418 @type timeouts: L{ImportExportTimeouts} 1419 @param timeouts: Timeouts for this import 1420 1421 """ 1422 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, 1423 source_x509_ca) 1424 1425 magic_base = utils.GenerateSecret(6) 1426 1427 # Create crypto key 1428 result = lu.rpc.call_x509_cert_create(instance.primary_node, 1429 constants.RIE_CERT_VALIDITY) 1430 result.Raise("Can't create X509 key and certificate on %s" % result.node) 1431 1432 (x509_key_name, x509_cert_pem) = result.payload 1433 try: 1434 # Load certificate 1435 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, 1436 x509_cert_pem) 1437 1438 # Sign certificate 1439 signed_x509_cert_pem = \ 1440 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8)) 1441 1442 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem, 1443 len(instance.disks), instance.primary_node) 1444 1445 ieloop = ImportExportLoop(lu) 1446 try: 1447 for idx, dev in enumerate(instance.disks): 1448 magic = _GetInstDiskMagic(magic_base, instance.name, idx) 1449 1450 # Import daemon options 1451 opts = objects.ImportExportOptions(key_name=x509_key_name, 1452 ca_pem=source_ca_pem, 1453 magic=magic) 1454 1455 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance, 1456 constants.IEIO_SCRIPT, (dev, idx), 1457 timeouts, cbs, private=(idx, ))) 1458 1459 ieloop.Run() 1460 finally: 1461 ieloop.FinalizeAll() 1462 finally: 1463 # Remove crypto key and certificate 1464 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name) 1465 result.Raise("Can't remove X509 key and certificate on %s" % result.node) 1466 1467 return cbs.disk_results
1468
1469 1470 -def _GetImportExportHandshakeMessage(version):
1471 """Returns the handshake message for a RIE protocol version. 1472 1473 @type version: number 1474 1475 """ 1476 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1477
1478 1479 -def ComputeRemoteExportHandshake(cds):
1480 """Computes the remote import/export handshake. 1481 1482 @type cds: string 1483 @param cds: Cluster domain secret 1484 1485 """ 1486 salt = utils.GenerateSecret(8) 1487 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION) 1488 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1489
1490 1491 -def CheckRemoteExportHandshake(cds, handshake):
1492 """Checks the handshake of a remote import/export. 1493 1494 @type cds: string 1495 @param cds: Cluster domain secret 1496 @type handshake: sequence 1497 @param handshake: Handshake sent by remote peer 1498 1499 """ 1500 try: 1501 (version, hmac_digest, hmac_salt) = handshake 1502 except (TypeError, ValueError), err: 1503 return "Invalid data: %s" % err 1504 1505 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version), 1506 hmac_digest, salt=hmac_salt): 1507 return "Hash didn't match, clusters don't share the same domain secret" 1508 1509 if version != constants.RIE_VERSION: 1510 return ("Clusters don't have the same remote import/export protocol" 1511 " (local=%s, remote=%s)" % 1512 (constants.RIE_VERSION, version)) 1513 1514 return None
1515
1516 1517 -def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1518 """Returns the hashed text for import/export disk information. 1519 1520 @type disk_index: number 1521 @param disk_index: Index of disk (included in hash) 1522 @type host: string 1523 @param host: Hostname 1524 @type port: number 1525 @param port: Daemon port 1526 @type magic: string 1527 @param magic: Magic value 1528 1529 """ 1530 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1531
1532 1533 -def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1534 """Verifies received disk information for an export. 1535 1536 @type cds: string 1537 @param cds: Cluster domain secret 1538 @type disk_index: number 1539 @param disk_index: Index of disk (included in hash) 1540 @type disk_info: sequence 1541 @param disk_info: Disk information sent by remote peer 1542 1543 """ 1544 try: 1545 (host, port, magic, hmac_digest, hmac_salt) = disk_info 1546 except (TypeError, ValueError), err: 1547 raise errors.GenericError("Invalid data: %s" % err) 1548 1549 if not (host and port and magic): 1550 raise errors.GenericError("Missing destination host, port or magic") 1551 1552 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1553 1554 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt): 1555 raise errors.GenericError("HMAC is wrong") 1556 1557 return (netutils.Hostname.GetNormalizedName(host), 1558 utils.ValidateServiceName(port), 1559 magic)
1560
1561 1562 -def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1563 """Computes the signed disk information for a remote import. 1564 1565 @type cds: string 1566 @param cds: Cluster domain secret 1567 @type salt: string 1568 @param salt: HMAC salt 1569 @type disk_index: number 1570 @param disk_index: Index of disk (included in hash) 1571 @type host: string 1572 @param host: Hostname 1573 @type port: number 1574 @param port: Daemon port 1575 @type magic: string 1576 @param magic: Magic value 1577 1578 """ 1579 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1580 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt) 1581 return (host, port, magic, hmac_digest, salt)
1582