Package ganeti :: Package masterd :: Module instance
[hide private]
[frames] | no frames]

Source Code for Module ganeti.masterd.instance

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2010, 2011 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Instance-related functions and classes for masterd. 
  23   
  24  """ 
  25   
  26  import logging 
  27  import time 
  28  import OpenSSL 
  29   
  30  from ganeti import constants 
  31  from ganeti import errors 
  32  from ganeti import compat 
  33  from ganeti import utils 
  34  from ganeti import objects 
  35  from ganeti import netutils 
36 37 38 -class _ImportExportError(Exception):
39 """Local exception to report import/export errors. 40 41 """
42
43 44 -class ImportExportTimeouts(object):
45 #: Time until daemon starts writing status file 46 DEFAULT_READY_TIMEOUT = 10 47 48 #: Length of time until errors cause hard failure 49 DEFAULT_ERROR_TIMEOUT = 10 50 51 #: Time after which daemon must be listening 52 DEFAULT_LISTEN_TIMEOUT = 10 53 54 #: Progress update interval 55 DEFAULT_PROGRESS_INTERVAL = 60 56 57 __slots__ = [ 58 "error", 59 "ready", 60 "listen", 61 "connect", 62 "progress", 63 ] 64
65 - def __init__(self, connect, 66 listen=DEFAULT_LISTEN_TIMEOUT, 67 error=DEFAULT_ERROR_TIMEOUT, 68 ready=DEFAULT_READY_TIMEOUT, 69 progress=DEFAULT_PROGRESS_INTERVAL):
70 """Initializes this class. 71 72 @type connect: number 73 @param connect: Timeout for establishing connection 74 @type listen: number 75 @param listen: Timeout for starting to listen for connections 76 @type error: number 77 @param error: Length of time until errors cause hard failure 78 @type ready: number 79 @param ready: Timeout for daemon to become ready 80 @type progress: number 81 @param progress: Progress update interval 82 83 """ 84 self.error = error 85 self.ready = ready 86 self.listen = listen 87 self.connect = connect 88 self.progress = progress
89
90 91 -class ImportExportCbBase(object):
92 """Callbacks for disk import/export. 93 94 """
95 - def ReportListening(self, ie, private, component):
96 """Called when daemon started listening. 97 98 @type ie: Subclass of L{_DiskImportExportBase} 99 @param ie: Import/export object 100 @param private: Private data passed to import/export object 101 @param component: transfer component name 102 103 """
104
105 - def ReportConnected(self, ie, private):
106 """Called when a connection has been established. 107 108 @type ie: Subclass of L{_DiskImportExportBase} 109 @param ie: Import/export object 110 @param private: Private data passed to import/export object 111 112 """
113
114 - def ReportProgress(self, ie, private):
115 """Called when new progress information should be reported. 116 117 @type ie: Subclass of L{_DiskImportExportBase} 118 @param ie: Import/export object 119 @param private: Private data passed to import/export object 120 121 """
122
123 - def ReportFinished(self, ie, private):
124 """Called when a transfer has finished. 125 126 @type ie: Subclass of L{_DiskImportExportBase} 127 @param ie: Import/export object 128 @param private: Private data passed to import/export object 129 130 """
131
132 133 -class _DiskImportExportBase(object):
134 MODE_TEXT = None 135
136 - def __init__(self, lu, node_name, opts, 137 instance, component, timeouts, cbs, private=None):
138 """Initializes this class. 139 140 @param lu: Logical unit instance 141 @type node_name: string 142 @param node_name: Node name for import 143 @type opts: L{objects.ImportExportOptions} 144 @param opts: Import/export daemon options 145 @type instance: L{objects.Instance} 146 @param instance: Instance object 147 @type component: string 148 @param component: which part of the instance is being imported 149 @type timeouts: L{ImportExportTimeouts} 150 @param timeouts: Timeouts for this import 151 @type cbs: L{ImportExportCbBase} 152 @param cbs: Callbacks 153 @param private: Private data for callback functions 154 155 """ 156 assert self.MODE_TEXT 157 158 self._lu = lu 159 self.node_name = node_name 160 self._opts = opts.Copy() 161 self._instance = instance 162 self._component = component 163 self._timeouts = timeouts 164 self._cbs = cbs 165 self._private = private 166 167 # Set master daemon's timeout in options for import/export daemon 168 assert self._opts.connect_timeout is None 169 self._opts.connect_timeout = timeouts.connect 170 171 # Parent loop 172 self._loop = None 173 174 # Timestamps 175 self._ts_begin = None 176 self._ts_connected = None 177 self._ts_finished = None 178 self._ts_cleanup = None 179 self._ts_last_progress = None 180 self._ts_last_error = None 181 182 # Transfer status 183 self.success = None 184 self.final_message = None 185 186 # Daemon status 187 self._daemon_name = None 188 self._daemon = None
189 190 @property
191 - def recent_output(self):
192 """Returns the most recent output from the daemon. 193 194 """ 195 if self._daemon: 196 return "\n".join(self._daemon.recent_output) 197 198 return None
199 200 @property
201 - def progress(self):
202 """Returns transfer progress information. 203 204 """ 205 if not self._daemon: 206 return None 207 208 return (self._daemon.progress_mbytes, 209 self._daemon.progress_throughput, 210 self._daemon.progress_percent, 211 self._daemon.progress_eta)
212 213 @property
214 - def magic(self):
215 """Returns the magic value for this import/export. 216 217 """ 218 return self._opts.magic
219 220 @property
221 - def active(self):
222 """Determines whether this transport is still active. 223 224 """ 225 return self.success is None
226 227 @property
228 - def loop(self):
229 """Returns parent loop. 230 231 @rtype: L{ImportExportLoop} 232 233 """ 234 return self._loop
235
236 - def SetLoop(self, loop):
237 """Sets the parent loop. 238 239 @type loop: L{ImportExportLoop} 240 241 """ 242 if self._loop: 243 raise errors.ProgrammerError("Loop can only be set once") 244 245 self._loop = loop
246
247 - def _StartDaemon(self):
248 """Starts the import/export daemon. 249 250 """ 251 raise NotImplementedError()
252
253 - def CheckDaemon(self):
254 """Checks whether daemon has been started and if not, starts it. 255 256 @rtype: string 257 @return: Daemon name 258 259 """ 260 assert self._ts_cleanup is None 261 262 if self._daemon_name is None: 263 assert self._ts_begin is None 264 265 result = self._StartDaemon() 266 if result.fail_msg: 267 raise _ImportExportError("Failed to start %s on %s: %s" % 268 (self.MODE_TEXT, self.node_name, 269 result.fail_msg)) 270 271 daemon_name = result.payload 272 273 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name, 274 self.node_name) 275 276 self._ts_begin = time.time() 277 self._daemon_name = daemon_name 278 279 return self._daemon_name
280
281 - def GetDaemonName(self):
282 """Returns the daemon name. 283 284 """ 285 assert self._daemon_name, "Daemon has not been started" 286 assert self._ts_cleanup is None 287 return self._daemon_name
288
289 - def Abort(self):
290 """Sends SIGTERM to import/export daemon (if still active). 291 292 """ 293 if self._daemon_name: 294 self._lu.LogWarning("Aborting %s '%s' on %s", 295 self.MODE_TEXT, self._daemon_name, self.node_name) 296 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name) 297 if result.fail_msg: 298 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s", 299 self.MODE_TEXT, self._daemon_name, 300 self.node_name, result.fail_msg) 301 return False 302 303 return True
304
305 - def _SetDaemonData(self, data):
306 """Internal function for updating status daemon data. 307 308 @type data: L{objects.ImportExportStatus} 309 @param data: Daemon status data 310 311 """ 312 assert self._ts_begin is not None 313 314 if not data: 315 if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready): 316 raise _ImportExportError("Didn't become ready after %s seconds" % 317 self._timeouts.ready) 318 319 return False 320 321 self._daemon = data 322 323 return True
324
325 - def SetDaemonData(self, success, data):
326 """Updates daemon status data. 327 328 @type success: bool 329 @param success: Whether fetching data was successful or not 330 @type data: L{objects.ImportExportStatus} 331 @param data: Daemon status data 332 333 """ 334 if not success: 335 if self._ts_last_error is None: 336 self._ts_last_error = time.time() 337 338 elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error): 339 raise _ImportExportError("Too many errors while updating data") 340 341 return False 342 343 self._ts_last_error = None 344 345 return self._SetDaemonData(data)
346
347 - def CheckListening(self):
348 """Checks whether the daemon is listening. 349 350 """ 351 raise NotImplementedError()
352
353 - def _GetConnectedCheckEpoch(self):
354 """Returns timeout to calculate connect timeout. 355 356 """ 357 raise NotImplementedError()
358
359 - def CheckConnected(self):
360 """Checks whether the daemon is connected. 361 362 @rtype: bool 363 @return: Whether the daemon is connected 364 365 """ 366 assert self._daemon, "Daemon status missing" 367 368 if self._ts_connected is not None: 369 return True 370 371 if self._daemon.connected: 372 self._ts_connected = time.time() 373 374 # TODO: Log remote peer 375 logging.debug("%s '%s' on %s is now connected", 376 self.MODE_TEXT, self._daemon_name, self.node_name) 377 378 self._cbs.ReportConnected(self, self._private) 379 380 return True 381 382 if utils.TimeoutExpired(self._GetConnectedCheckEpoch(), 383 self._timeouts.connect): 384 raise _ImportExportError("Not connected after %s seconds" % 385 self._timeouts.connect) 386 387 return False
388
389 - def _CheckProgress(self):
390 """Checks whether a progress update should be reported. 391 392 """ 393 if ((self._ts_last_progress is None or 394 utils.TimeoutExpired(self._ts_last_progress, 395 self._timeouts.progress)) and 396 self._daemon and 397 self._daemon.progress_mbytes is not None and 398 self._daemon.progress_throughput is not None): 399 self._cbs.ReportProgress(self, self._private) 400 self._ts_last_progress = time.time()
401
402 - def CheckFinished(self):
403 """Checks whether the daemon exited. 404 405 @rtype: bool 406 @return: Whether the transfer is finished 407 408 """ 409 assert self._daemon, "Daemon status missing" 410 411 if self._ts_finished: 412 return True 413 414 if self._daemon.exit_status is None: 415 # TODO: Adjust delay for ETA expiring soon 416 self._CheckProgress() 417 return False 418 419 self._ts_finished = time.time() 420 421 self._ReportFinished(self._daemon.exit_status == 0, 422 self._daemon.error_message) 423 424 return True
425
426 - def _ReportFinished(self, success, message):
427 """Transfer is finished or daemon exited. 428 429 @type success: bool 430 @param success: Whether the transfer was successful 431 @type message: string 432 @param message: Error message 433 434 """ 435 assert self.success is None 436 437 self.success = success 438 self.final_message = message 439 440 if success: 441 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT, 442 self._daemon_name, self.node_name) 443 elif self._daemon_name: 444 self._lu.LogWarning("%s '%s' on %s failed: %s", 445 self.MODE_TEXT, self._daemon_name, self.node_name, 446 message) 447 else: 448 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT, 449 self.node_name, message) 450 451 self._cbs.ReportFinished(self, self._private)
452
453 - def _Finalize(self):
454 """Makes the RPC call to finalize this import/export. 455 456 """ 457 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
458
459 - def Finalize(self, error=None):
460 """Finalizes this import/export. 461 462 """ 463 if self._daemon_name: 464 logging.info("Finalizing %s '%s' on %s", 465 self.MODE_TEXT, self._daemon_name, self.node_name) 466 467 result = self._Finalize() 468 if result.fail_msg: 469 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s", 470 self.MODE_TEXT, self._daemon_name, 471 self.node_name, result.fail_msg) 472 return False 473 474 # Daemon is no longer running 475 self._daemon_name = None 476 self._ts_cleanup = time.time() 477 478 if error: 479 self._ReportFinished(False, error) 480 481 return True
482
483 484 -class DiskImport(_DiskImportExportBase):
485 MODE_TEXT = "import" 486
487 - def __init__(self, lu, node_name, opts, instance, component, 488 dest, dest_args, timeouts, cbs, private=None):
489 """Initializes this class. 490 491 @param lu: Logical unit instance 492 @type node_name: string 493 @param node_name: Node name for import 494 @type opts: L{objects.ImportExportOptions} 495 @param opts: Import/export daemon options 496 @type instance: L{objects.Instance} 497 @param instance: Instance object 498 @type component: string 499 @param component: which part of the instance is being imported 500 @param dest: I/O destination 501 @param dest_args: I/O arguments 502 @type timeouts: L{ImportExportTimeouts} 503 @param timeouts: Timeouts for this import 504 @type cbs: L{ImportExportCbBase} 505 @param cbs: Callbacks 506 @param private: Private data for callback functions 507 508 """ 509 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance, 510 component, timeouts, cbs, private) 511 self._dest = dest 512 self._dest_args = dest_args 513 514 # Timestamps 515 self._ts_listening = None
516 517 @property
518 - def listen_port(self):
519 """Returns the port the daemon is listening on. 520 521 """ 522 if self._daemon: 523 return self._daemon.listen_port 524 525 return None
526
527 - def _StartDaemon(self):
528 """Starts the import daemon. 529 530 """ 531 return self._lu.rpc.call_import_start(self.node_name, self._opts, 532 self._instance, self._component, 533 (self._dest, self._dest_args))
534
535 - def CheckListening(self):
536 """Checks whether the daemon is listening. 537 538 @rtype: bool 539 @return: Whether the daemon is listening 540 541 """ 542 assert self._daemon, "Daemon status missing" 543 544 if self._ts_listening is not None: 545 return True 546 547 port = self._daemon.listen_port 548 if port is not None: 549 self._ts_listening = time.time() 550 551 logging.debug("Import '%s' on %s is now listening on port %s", 552 self._daemon_name, self.node_name, port) 553 554 self._cbs.ReportListening(self, self._private, self._component) 555 556 return True 557 558 if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen): 559 raise _ImportExportError("Not listening after %s seconds" % 560 self._timeouts.listen) 561 562 return False
563
564 - def _GetConnectedCheckEpoch(self):
565 """Returns the time since we started listening. 566 567 """ 568 assert self._ts_listening is not None, \ 569 ("Checking whether an import is connected is only useful" 570 " once it's been listening") 571 572 return self._ts_listening
573
574 575 -class DiskExport(_DiskImportExportBase):
576 MODE_TEXT = "export" 577
578 - def __init__(self, lu, node_name, opts, dest_host, dest_port, 579 instance, component, source, source_args, 580 timeouts, cbs, private=None):
581 """Initializes this class. 582 583 @param lu: Logical unit instance 584 @type node_name: string 585 @param node_name: Node name for import 586 @type opts: L{objects.ImportExportOptions} 587 @param opts: Import/export daemon options 588 @type dest_host: string 589 @param dest_host: Destination host name or IP address 590 @type dest_port: number 591 @param dest_port: Destination port number 592 @type instance: L{objects.Instance} 593 @param instance: Instance object 594 @type component: string 595 @param component: which part of the instance is being imported 596 @param source: I/O source 597 @param source_args: I/O source 598 @type timeouts: L{ImportExportTimeouts} 599 @param timeouts: Timeouts for this import 600 @type cbs: L{ImportExportCbBase} 601 @param cbs: Callbacks 602 @param private: Private data for callback functions 603 604 """ 605 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance, 606 component, timeouts, cbs, private) 607 self._dest_host = dest_host 608 self._dest_port = dest_port 609 self._source = source 610 self._source_args = source_args
611
612 - def _StartDaemon(self):
613 """Starts the export daemon. 614 615 """ 616 return self._lu.rpc.call_export_start(self.node_name, self._opts, 617 self._dest_host, self._dest_port, 618 self._instance, self._component, 619 (self._source, self._source_args))
620
621 - def CheckListening(self):
622 """Checks whether the daemon is listening. 623 624 """ 625 # Only an import can be listening 626 return True
627
628 - def _GetConnectedCheckEpoch(self):
629 """Returns the time since the daemon started. 630 631 """ 632 assert self._ts_begin is not None 633 634 return self._ts_begin
635
636 637 -def FormatProgress(progress):
638 """Formats progress information for user consumption 639 640 """ 641 (mbytes, throughput, percent, eta) = progress 642 643 parts = [ 644 utils.FormatUnit(mbytes, "h"), 645 646 # Not using FormatUnit as it doesn't support kilobytes 647 "%0.1f MiB/s" % throughput, 648 ] 649 650 if percent is not None: 651 parts.append("%d%%" % percent) 652 653 if eta is not None: 654 parts.append("ETA %s" % utils.FormatSeconds(eta)) 655 656 return utils.CommaJoin(parts)
657
658 659 -class ImportExportLoop:
660 MIN_DELAY = 1.0 661 MAX_DELAY = 20.0 662
663 - def __init__(self, lu):
664 """Initializes this class. 665 666 """ 667 self._lu = lu 668 self._queue = [] 669 self._pending_add = []
670
671 - def Add(self, diskie):
672 """Adds an import/export object to the loop. 673 674 @type diskie: Subclass of L{_DiskImportExportBase} 675 @param diskie: Import/export object 676 677 """ 678 assert diskie not in self._pending_add 679 assert diskie.loop is None 680 681 diskie.SetLoop(self) 682 683 # Adding new objects to a staging list is necessary, otherwise the main 684 # loop gets confused if callbacks modify the queue while the main loop is 685 # iterating over it. 686 self._pending_add.append(diskie)
687 688 @staticmethod
689 - def _CollectDaemonStatus(lu, daemons):
690 """Collects the status for all import/export daemons. 691 692 """ 693 daemon_status = {} 694 695 for node_name, names in daemons.iteritems(): 696 result = lu.rpc.call_impexp_status(node_name, names) 697 if result.fail_msg: 698 lu.LogWarning("Failed to get daemon status on %s: %s", 699 node_name, result.fail_msg) 700 continue 701 702 assert len(names) == len(result.payload) 703 704 daemon_status[node_name] = dict(zip(names, result.payload)) 705 706 return daemon_status
707 708 @staticmethod
709 - def _GetActiveDaemonNames(queue):
710 """Gets the names of all active daemons. 711 712 """ 713 result = {} 714 for diskie in queue: 715 if not diskie.active: 716 continue 717 718 try: 719 # Start daemon if necessary 720 daemon_name = diskie.CheckDaemon() 721 except _ImportExportError, err: 722 logging.exception("%s failed", diskie.MODE_TEXT) 723 diskie.Finalize(error=str(err)) 724 continue 725 726 result.setdefault(diskie.node_name, []).append(daemon_name) 727 728 assert len(queue) >= len(result) 729 assert len(queue) >= sum([len(names) for names in result.itervalues()]) 730 731 logging.debug("daemons=%r", result) 732 733 return result
734
735 - def _AddPendingToQueue(self):
736 """Adds all pending import/export objects to the internal queue. 737 738 """ 739 assert compat.all(diskie not in self._queue and diskie.loop == self 740 for diskie in self._pending_add) 741 742 self._queue.extend(self._pending_add) 743 744 del self._pending_add[:]
745
746 - def Run(self):
747 """Utility main loop. 748 749 """ 750 while True: 751 self._AddPendingToQueue() 752 753 # Collect all active daemon names 754 daemons = self._GetActiveDaemonNames(self._queue) 755 if not daemons: 756 break 757 758 # Collection daemon status data 759 data = self._CollectDaemonStatus(self._lu, daemons) 760 761 # Use data 762 delay = self.MAX_DELAY 763 for diskie in self._queue: 764 if not diskie.active: 765 continue 766 767 try: 768 try: 769 all_daemon_data = data[diskie.node_name] 770 except KeyError: 771 result = diskie.SetDaemonData(False, None) 772 else: 773 result = \ 774 diskie.SetDaemonData(True, 775 all_daemon_data[diskie.GetDaemonName()]) 776 777 if not result: 778 # Daemon not yet ready, retry soon 779 delay = min(3.0, delay) 780 continue 781 782 if diskie.CheckFinished(): 783 # Transfer finished 784 diskie.Finalize() 785 continue 786 787 # Normal case: check again in 5 seconds 788 delay = min(5.0, delay) 789 790 if not diskie.CheckListening(): 791 # Not yet listening, retry soon 792 delay = min(1.0, delay) 793 continue 794 795 if not diskie.CheckConnected(): 796 # Not yet connected, retry soon 797 delay = min(1.0, delay) 798 continue 799 800 except _ImportExportError, err: 801 logging.exception("%s failed", diskie.MODE_TEXT) 802 diskie.Finalize(error=str(err)) 803 804 if not compat.any(diskie.active for diskie in self._queue): 805 break 806 807 # Wait a bit 808 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay)) 809 logging.debug("Waiting for %ss", delay) 810 time.sleep(delay)
811
812 - def FinalizeAll(self):
813 """Finalizes all pending transfers. 814 815 """ 816 success = True 817 818 for diskie in self._queue: 819 success = diskie.Finalize() and success 820 821 return success
822
823 824 -class _TransferInstCbBase(ImportExportCbBase):
825 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs, 826 dest_node, dest_ip):
827 """Initializes this class. 828 829 """ 830 ImportExportCbBase.__init__(self) 831 832 self.lu = lu 833 self.feedback_fn = feedback_fn 834 self.instance = instance 835 self.timeouts = timeouts 836 self.src_node = src_node 837 self.src_cbs = src_cbs 838 self.dest_node = dest_node 839 self.dest_ip = dest_ip
840
841 842 -class _TransferInstSourceCb(_TransferInstCbBase):
843 - def ReportConnected(self, ie, dtp):
844 """Called when a connection has been established. 845 846 """ 847 assert self.src_cbs is None 848 assert dtp.src_export == ie 849 assert dtp.dest_import 850 851 self.feedback_fn("%s is sending data on %s" % 852 (dtp.data.name, ie.node_name))
853
854 - def ReportProgress(self, ie, dtp):
855 """Called when new progress information should be reported. 856 857 """ 858 progress = ie.progress 859 if not progress: 860 return 861 862 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
863
864 - def ReportFinished(self, ie, dtp):
865 """Called when a transfer has finished. 866 867 """ 868 assert self.src_cbs is None 869 assert dtp.src_export == ie 870 assert dtp.dest_import 871 872 if ie.success: 873 self.feedback_fn("%s finished sending data" % dtp.data.name) 874 else: 875 self.feedback_fn("%s failed to send data: %s (recent output: %s)" % 876 (dtp.data.name, ie.final_message, ie.recent_output)) 877 878 dtp.RecordResult(ie.success) 879 880 cb = dtp.data.finished_fn 881 if cb: 882 cb() 883 884 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 885 # give the daemon a moment to sort things out 886 if dtp.dest_import and not ie.success: 887 dtp.dest_import.Abort()
888
889 890 -class _TransferInstDestCb(_TransferInstCbBase):
891 - def ReportListening(self, ie, dtp, component):
892 """Called when daemon started listening. 893 894 """ 895 assert self.src_cbs 896 assert dtp.src_export is None 897 assert dtp.dest_import 898 assert dtp.export_opts 899 900 self.feedback_fn("%s is now listening, starting export" % dtp.data.name) 901 902 # Start export on source node 903 de = DiskExport(self.lu, self.src_node, dtp.export_opts, 904 self.dest_ip, ie.listen_port, self.instance, 905 component, dtp.data.src_io, dtp.data.src_ioargs, 906 self.timeouts, self.src_cbs, private=dtp) 907 ie.loop.Add(de) 908 909 dtp.src_export = de
910
911 - def ReportConnected(self, ie, dtp):
912 """Called when a connection has been established. 913 914 """ 915 self.feedback_fn("%s is receiving data on %s" % 916 (dtp.data.name, self.dest_node))
917
918 - def ReportFinished(self, ie, dtp):
919 """Called when a transfer has finished. 920 921 """ 922 if ie.success: 923 self.feedback_fn("%s finished receiving data" % dtp.data.name) 924 else: 925 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" % 926 (dtp.data.name, ie.final_message, ie.recent_output)) 927 928 dtp.RecordResult(ie.success) 929 930 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 931 # give the daemon a moment to sort things out 932 if dtp.src_export and not ie.success: 933 dtp.src_export.Abort()
934
935 936 -class DiskTransfer(object):
937 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs, 938 finished_fn):
939 """Initializes this class. 940 941 @type name: string 942 @param name: User-visible name for this transfer (e.g. "disk/0") 943 @param src_io: Source I/O type 944 @param src_ioargs: Source I/O arguments 945 @param dest_io: Destination I/O type 946 @param dest_ioargs: Destination I/O arguments 947 @type finished_fn: callable 948 @param finished_fn: Function called once transfer has finished 949 950 """ 951 self.name = name 952 953 self.src_io = src_io 954 self.src_ioargs = src_ioargs 955 956 self.dest_io = dest_io 957 self.dest_ioargs = dest_ioargs 958 959 self.finished_fn = finished_fn
960
961 962 -class _DiskTransferPrivate(object):
963 - def __init__(self, data, success, export_opts):
964 """Initializes this class. 965 966 @type data: L{DiskTransfer} 967 @type success: bool 968 969 """ 970 self.data = data 971 self.success = success 972 self.export_opts = export_opts 973 974 self.src_export = None 975 self.dest_import = None
976
977 - def RecordResult(self, success):
978 """Updates the status. 979 980 One failed part will cause the whole transfer to fail. 981 982 """ 983 self.success = self.success and success
984
985 986 -def _GetInstDiskMagic(base, instance_name, index):
987 """Computes the magic value for a disk export or import. 988 989 @type base: string 990 @param base: Random seed value (can be the same for all disks of a transfer) 991 @type instance_name: string 992 @param instance_name: Name of instance 993 @type index: number 994 @param index: Disk index 995 996 """ 997 h = compat.sha1_hash() 998 h.update(str(constants.RIE_VERSION)) 999 h.update(base) 1000 h.update(instance_name) 1001 h.update(str(index)) 1002 return h.hexdigest()
1003
1004 1005 -def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, 1006 instance, all_transfers):
1007 """Transfers an instance's data from one node to another. 1008 1009 @param lu: Logical unit instance 1010 @param feedback_fn: Feedback function 1011 @type src_node: string 1012 @param src_node: Source node name 1013 @type dest_node: string 1014 @param dest_node: Destination node name 1015 @type dest_ip: string 1016 @param dest_ip: IP address of destination node 1017 @type instance: L{objects.Instance} 1018 @param instance: Instance object 1019 @type all_transfers: list of L{DiskTransfer} instances 1020 @param all_transfers: List of all disk transfers to be made 1021 @rtype: list 1022 @return: List with a boolean (True=successful, False=failed) for success for 1023 each transfer 1024 1025 """ 1026 # Disable compression for all moves as these are all within the same cluster 1027 compress = constants.IEC_NONE 1028 1029 logging.debug("Source node %s, destination node %s, compression '%s'", 1030 src_node, dest_node, compress) 1031 1032 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) 1033 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, 1034 src_node, None, dest_node, dest_ip) 1035 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts, 1036 src_node, src_cbs, dest_node, dest_ip) 1037 1038 all_dtp = [] 1039 1040 base_magic = utils.GenerateSecret(6) 1041 1042 ieloop = ImportExportLoop(lu) 1043 try: 1044 for idx, transfer in enumerate(all_transfers): 1045 if transfer: 1046 feedback_fn("Exporting %s from %s to %s" % 1047 (transfer.name, src_node, dest_node)) 1048 1049 magic = _GetInstDiskMagic(base_magic, instance.name, idx) 1050 opts = objects.ImportExportOptions(key_name=None, ca_pem=None, 1051 compress=compress, magic=magic) 1052 1053 dtp = _DiskTransferPrivate(transfer, True, opts) 1054 1055 di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx, 1056 transfer.dest_io, transfer.dest_ioargs, 1057 timeouts, dest_cbs, private=dtp) 1058 ieloop.Add(di) 1059 1060 dtp.dest_import = di 1061 else: 1062 dtp = _DiskTransferPrivate(None, False, None) 1063 1064 all_dtp.append(dtp) 1065 1066 ieloop.Run() 1067 finally: 1068 ieloop.FinalizeAll() 1069 1070 assert len(all_dtp) == len(all_transfers) 1071 assert compat.all((dtp.src_export is None or 1072 dtp.src_export.success is not None) and 1073 (dtp.dest_import is None or 1074 dtp.dest_import.success is not None) 1075 for dtp in all_dtp), \ 1076 "Not all imports/exports are finalized" 1077 1078 return [bool(dtp.success) for dtp in all_dtp]
1079
1080 1081 -class _RemoteExportCb(ImportExportCbBase):
1082 - def __init__(self, feedback_fn, disk_count):
1083 """Initializes this class. 1084 1085 """ 1086 ImportExportCbBase.__init__(self) 1087 self._feedback_fn = feedback_fn 1088 self._dresults = [None] * disk_count
1089 1090 @property
1091 - def disk_results(self):
1092 """Returns per-disk results. 1093 1094 """ 1095 return self._dresults
1096
1097 - def ReportConnected(self, ie, private):
1098 """Called when a connection has been established. 1099 1100 """ 1101 (idx, _) = private 1102 1103 self._feedback_fn("Disk %s is now sending data" % idx)
1104
1105 - def ReportProgress(self, ie, private):
1106 """Called when new progress information should be reported. 1107 1108 """ 1109 (idx, _) = private 1110 1111 progress = ie.progress 1112 if not progress: 1113 return 1114 1115 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1116
1117 - def ReportFinished(self, ie, private):
1118 """Called when a transfer has finished. 1119 1120 """ 1121 (idx, finished_fn) = private 1122 1123 if ie.success: 1124 self._feedback_fn("Disk %s finished sending data" % idx) 1125 else: 1126 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" % 1127 (idx, ie.final_message, ie.recent_output)) 1128 1129 self._dresults[idx] = bool(ie.success) 1130 1131 if finished_fn: 1132 finished_fn()
1133
1134 1135 -class ExportInstanceHelper:
1136 - def __init__(self, lu, feedback_fn, instance):
1137 """Initializes this class. 1138 1139 @param lu: Logical unit instance 1140 @param feedback_fn: Feedback function 1141 @type instance: L{objects.Instance} 1142 @param instance: Instance object 1143 1144 """ 1145 self._lu = lu 1146 self._feedback_fn = feedback_fn 1147 self._instance = instance 1148 1149 self._snap_disks = [] 1150 self._removed_snaps = [False] * len(instance.disks)
1151
1152 - def CreateSnapshots(self):
1153 """Creates an LVM snapshot for every disk of the instance. 1154 1155 """ 1156 assert not self._snap_disks 1157 1158 instance = self._instance 1159 src_node = instance.primary_node 1160 1161 for idx, disk in enumerate(instance.disks): 1162 self._feedback_fn("Creating a snapshot of disk/%s on node %s" % 1163 (idx, src_node)) 1164 1165 # result.payload will be a snapshot of an lvm leaf of the one we 1166 # passed 1167 result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance)) 1168 new_dev = False 1169 msg = result.fail_msg 1170 if msg: 1171 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s", 1172 idx, src_node, msg) 1173 elif (not isinstance(result.payload, (tuple, list)) or 1174 len(result.payload) != 2): 1175 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid" 1176 " result '%s'", idx, src_node, result.payload) 1177 else: 1178 disk_id = tuple(result.payload) 1179 disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy() 1180 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, 1181 logical_id=disk_id, physical_id=disk_id, 1182 iv_name=disk.iv_name, 1183 params=disk_params) 1184 1185 self._snap_disks.append(new_dev) 1186 1187 assert len(self._snap_disks) == len(instance.disks) 1188 assert len(self._removed_snaps) == len(instance.disks)
1189
1190 - def _RemoveSnapshot(self, disk_index):
1191 """Removes an LVM snapshot. 1192 1193 @type disk_index: number 1194 @param disk_index: Index of the snapshot to be removed 1195 1196 """ 1197 disk = self._snap_disks[disk_index] 1198 if disk and not self._removed_snaps[disk_index]: 1199 src_node = self._instance.primary_node 1200 1201 self._feedback_fn("Removing snapshot of disk/%s on node %s" % 1202 (disk_index, src_node)) 1203 1204 result = self._lu.rpc.call_blockdev_remove(src_node, disk) 1205 if result.fail_msg: 1206 self._lu.LogWarning("Could not remove snapshot for disk/%d from node" 1207 " %s: %s", disk_index, src_node, result.fail_msg) 1208 else: 1209 self._removed_snaps[disk_index] = True
1210
1211 - def LocalExport(self, dest_node):
1212 """Intra-cluster instance export. 1213 1214 @type dest_node: L{objects.Node} 1215 @param dest_node: Destination node 1216 1217 """ 1218 instance = self._instance 1219 src_node = instance.primary_node 1220 1221 assert len(self._snap_disks) == len(instance.disks) 1222 1223 transfers = [] 1224 1225 for idx, dev in enumerate(self._snap_disks): 1226 if not dev: 1227 transfers.append(None) 1228 continue 1229 1230 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name, 1231 dev.physical_id[1]) 1232 1233 finished_fn = compat.partial(self._TransferFinished, idx) 1234 1235 # FIXME: pass debug option from opcode to backend 1236 dt = DiskTransfer("snapshot/%s" % idx, 1237 constants.IEIO_SCRIPT, (dev, idx), 1238 constants.IEIO_FILE, (path, ), 1239 finished_fn) 1240 transfers.append(dt) 1241 1242 # Actually export data 1243 dresults = TransferInstanceData(self._lu, self._feedback_fn, 1244 src_node, dest_node.name, 1245 dest_node.secondary_ip, 1246 instance, transfers) 1247 1248 assert len(dresults) == len(instance.disks) 1249 1250 self._feedback_fn("Finalizing export on %s" % dest_node.name) 1251 result = self._lu.rpc.call_finalize_export(dest_node.name, instance, 1252 self._snap_disks) 1253 msg = result.fail_msg 1254 fin_resu = not msg 1255 if msg: 1256 self._lu.LogWarning("Could not finalize export for instance %s" 1257 " on node %s: %s", instance.name, dest_node.name, msg) 1258 1259 return (fin_resu, dresults)
1260
1261 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1262 """Inter-cluster instance export. 1263 1264 @type disk_info: list 1265 @param disk_info: Per-disk destination information 1266 @type key_name: string 1267 @param key_name: Name of X509 key to use 1268 @type dest_ca_pem: string 1269 @param dest_ca_pem: Destination X509 CA in PEM format 1270 @type timeouts: L{ImportExportTimeouts} 1271 @param timeouts: Timeouts for this import 1272 1273 """ 1274 instance = self._instance 1275 1276 assert len(disk_info) == len(instance.disks) 1277 1278 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks)) 1279 1280 ieloop = ImportExportLoop(self._lu) 1281 try: 1282 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks, 1283 disk_info)): 1284 # Decide whether to use IPv6 1285 ipv6 = netutils.IP6Address.IsValid(host) 1286 1287 opts = objects.ImportExportOptions(key_name=key_name, 1288 ca_pem=dest_ca_pem, 1289 magic=magic, ipv6=ipv6) 1290 1291 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port)) 1292 finished_fn = compat.partial(self._TransferFinished, idx) 1293 ieloop.Add(DiskExport(self._lu, instance.primary_node, 1294 opts, host, port, instance, "disk%d" % idx, 1295 constants.IEIO_SCRIPT, (dev, idx), 1296 timeouts, cbs, private=(idx, finished_fn))) 1297 1298 ieloop.Run() 1299 finally: 1300 ieloop.FinalizeAll() 1301 1302 return (True, cbs.disk_results)
1303
1304 - def _TransferFinished(self, idx):
1305 """Called once a transfer has finished. 1306 1307 @type idx: number 1308 @param idx: Disk index 1309 1310 """ 1311 logging.debug("Transfer %s finished", idx) 1312 self._RemoveSnapshot(idx)
1313
1314 - def Cleanup(self):
1315 """Remove all snapshots. 1316 1317 """ 1318 assert len(self._removed_snaps) == len(self._instance.disks) 1319 for idx in range(len(self._instance.disks)): 1320 self._RemoveSnapshot(idx)
1321
1322 1323 -class _RemoteImportCb(ImportExportCbBase):
1324 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count, 1325 external_address):
1326 """Initializes this class. 1327 1328 @type cds: string 1329 @param cds: Cluster domain secret 1330 @type x509_cert_pem: string 1331 @param x509_cert_pem: CA used for signing import key 1332 @type disk_count: number 1333 @param disk_count: Number of disks 1334 @type external_address: string 1335 @param external_address: External address of destination node 1336 1337 """ 1338 ImportExportCbBase.__init__(self) 1339 self._feedback_fn = feedback_fn 1340 self._cds = cds 1341 self._x509_cert_pem = x509_cert_pem 1342 self._disk_count = disk_count 1343 self._external_address = external_address 1344 1345 self._dresults = [None] * disk_count 1346 self._daemon_port = [None] * disk_count 1347 1348 self._salt = utils.GenerateSecret(8)
1349 1350 @property
1351 - def disk_results(self):
1352 """Returns per-disk results. 1353 1354 """ 1355 return self._dresults
1356
1357 - def _CheckAllListening(self):
1358 """Checks whether all daemons are listening. 1359 1360 If all daemons are listening, the information is sent to the client. 1361 1362 """ 1363 if not compat.all(dp is not None for dp in self._daemon_port): 1364 return 1365 1366 host = self._external_address 1367 1368 disks = [] 1369 for idx, (port, magic) in enumerate(self._daemon_port): 1370 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt, 1371 idx, host, port, magic)) 1372 1373 assert len(disks) == self._disk_count 1374 1375 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, { 1376 "disks": disks, 1377 "x509_ca": self._x509_cert_pem, 1378 })
1379
1380 - def ReportListening(self, ie, private, _):
1381 """Called when daemon started listening. 1382 1383 """ 1384 (idx, ) = private 1385 1386 self._feedback_fn("Disk %s is now listening" % idx) 1387 1388 assert self._daemon_port[idx] is None 1389 1390 self._daemon_port[idx] = (ie.listen_port, ie.magic) 1391 1392 self._CheckAllListening()
1393
1394 - def ReportConnected(self, ie, private):
1395 """Called when a connection has been established. 1396 1397 """ 1398 (idx, ) = private 1399 1400 self._feedback_fn("Disk %s is now receiving data" % idx)
1401
1402 - def ReportFinished(self, ie, private):
1403 """Called when a transfer has finished. 1404 1405 """ 1406 (idx, ) = private 1407 1408 # Daemon is certainly no longer listening 1409 self._daemon_port[idx] = None 1410 1411 if ie.success: 1412 self._feedback_fn("Disk %s finished receiving data" % idx) 1413 else: 1414 self._feedback_fn(("Disk %s failed to receive data: %s" 1415 " (recent output: %s)") % 1416 (idx, ie.final_message, ie.recent_output)) 1417 1418 self._dresults[idx] = bool(ie.success)
1419
1420 1421 -def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca, 1422 cds, timeouts):
1423 """Imports an instance from another cluster. 1424 1425 @param lu: Logical unit instance 1426 @param feedback_fn: Feedback function 1427 @type instance: L{objects.Instance} 1428 @param instance: Instance object 1429 @type pnode: L{objects.Node} 1430 @param pnode: Primary node of instance as an object 1431 @type source_x509_ca: OpenSSL.crypto.X509 1432 @param source_x509_ca: Import source's X509 CA 1433 @type cds: string 1434 @param cds: Cluster domain secret 1435 @type timeouts: L{ImportExportTimeouts} 1436 @param timeouts: Timeouts for this import 1437 1438 """ 1439 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, 1440 source_x509_ca) 1441 1442 magic_base = utils.GenerateSecret(6) 1443 1444 # Decide whether to use IPv6 1445 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip) 1446 1447 # Create crypto key 1448 result = lu.rpc.call_x509_cert_create(instance.primary_node, 1449 constants.RIE_CERT_VALIDITY) 1450 result.Raise("Can't create X509 key and certificate on %s" % result.node) 1451 1452 (x509_key_name, x509_cert_pem) = result.payload 1453 try: 1454 # Load certificate 1455 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, 1456 x509_cert_pem) 1457 1458 # Sign certificate 1459 signed_x509_cert_pem = \ 1460 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8)) 1461 1462 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem, 1463 len(instance.disks), pnode.primary_ip) 1464 1465 ieloop = ImportExportLoop(lu) 1466 try: 1467 for idx, dev in enumerate(instance.disks): 1468 magic = _GetInstDiskMagic(magic_base, instance.name, idx) 1469 1470 # Import daemon options 1471 opts = objects.ImportExportOptions(key_name=x509_key_name, 1472 ca_pem=source_ca_pem, 1473 magic=magic, ipv6=ipv6) 1474 1475 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance, 1476 "disk%d" % idx, 1477 constants.IEIO_SCRIPT, (dev, idx), 1478 timeouts, cbs, private=(idx, ))) 1479 1480 ieloop.Run() 1481 finally: 1482 ieloop.FinalizeAll() 1483 finally: 1484 # Remove crypto key and certificate 1485 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name) 1486 result.Raise("Can't remove X509 key and certificate on %s" % result.node) 1487 1488 return cbs.disk_results
1489
1490 1491 -def _GetImportExportHandshakeMessage(version):
1492 """Returns the handshake message for a RIE protocol version. 1493 1494 @type version: number 1495 1496 """ 1497 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1498
1499 1500 -def ComputeRemoteExportHandshake(cds):
1501 """Computes the remote import/export handshake. 1502 1503 @type cds: string 1504 @param cds: Cluster domain secret 1505 1506 """ 1507 salt = utils.GenerateSecret(8) 1508 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION) 1509 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1510
1511 1512 -def CheckRemoteExportHandshake(cds, handshake):
1513 """Checks the handshake of a remote import/export. 1514 1515 @type cds: string 1516 @param cds: Cluster domain secret 1517 @type handshake: sequence 1518 @param handshake: Handshake sent by remote peer 1519 1520 """ 1521 try: 1522 (version, hmac_digest, hmac_salt) = handshake 1523 except (TypeError, ValueError), err: 1524 return "Invalid data: %s" % err 1525 1526 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version), 1527 hmac_digest, salt=hmac_salt): 1528 return "Hash didn't match, clusters don't share the same domain secret" 1529 1530 if version != constants.RIE_VERSION: 1531 return ("Clusters don't have the same remote import/export protocol" 1532 " (local=%s, remote=%s)" % 1533 (constants.RIE_VERSION, version)) 1534 1535 return None
1536
1537 1538 -def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1539 """Returns the hashed text for import/export disk information. 1540 1541 @type disk_index: number 1542 @param disk_index: Index of disk (included in hash) 1543 @type host: string 1544 @param host: Hostname 1545 @type port: number 1546 @param port: Daemon port 1547 @type magic: string 1548 @param magic: Magic value 1549 1550 """ 1551 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1552
1553 1554 -def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1555 """Verifies received disk information for an export. 1556 1557 @type cds: string 1558 @param cds: Cluster domain secret 1559 @type disk_index: number 1560 @param disk_index: Index of disk (included in hash) 1561 @type disk_info: sequence 1562 @param disk_info: Disk information sent by remote peer 1563 1564 """ 1565 try: 1566 (host, port, magic, hmac_digest, hmac_salt) = disk_info 1567 except (TypeError, ValueError), err: 1568 raise errors.GenericError("Invalid data: %s" % err) 1569 1570 if not (host and port and magic): 1571 raise errors.GenericError("Missing destination host, port or magic") 1572 1573 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1574 1575 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt): 1576 raise errors.GenericError("HMAC is wrong") 1577 1578 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host): 1579 destination = host 1580 else: 1581 destination = netutils.Hostname.GetNormalizedName(host) 1582 1583 return (destination, 1584 utils.ValidateServiceName(port), 1585 magic)
1586
1587 1588 -def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1589 """Computes the signed disk information for a remote import. 1590 1591 @type cds: string 1592 @param cds: Cluster domain secret 1593 @type salt: string 1594 @param salt: HMAC salt 1595 @type disk_index: number 1596 @param disk_index: Index of disk (included in hash) 1597 @type host: string 1598 @param host: Hostname 1599 @type port: number 1600 @param port: Daemon port 1601 @type magic: string 1602 @param magic: Magic value 1603 1604 """ 1605 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1606 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt) 1607 return (host, port, magic, hmac_digest, salt)
1608