Package ganeti :: Package masterd :: Module instance
[hide private]
[frames] | no frames]

Source Code for Module ganeti.masterd.instance

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2010, 2011 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Instance-related functions and classes for masterd. 
  23   
  24  """ 
  25   
  26  import logging 
  27  import time 
  28  import OpenSSL 
  29   
  30  from ganeti import constants 
  31  from ganeti import errors 
  32  from ganeti import compat 
  33  from ganeti import utils 
  34  from ganeti import objects 
  35  from ganeti import netutils 
36 37 38 -class _ImportExportError(Exception):
39 """Local exception to report import/export errors. 40 41 """
42
43 44 -class ImportExportTimeouts(object):
45 #: Time until daemon starts writing status file 46 DEFAULT_READY_TIMEOUT = 10 47 48 #: Length of time until errors cause hard failure 49 DEFAULT_ERROR_TIMEOUT = 10 50 51 #: Time after which daemon must be listening 52 DEFAULT_LISTEN_TIMEOUT = 10 53 54 #: Progress update interval 55 DEFAULT_PROGRESS_INTERVAL = 60 56 57 __slots__ = [ 58 "error", 59 "ready", 60 "listen", 61 "connect", 62 "progress", 63 ] 64
65 - def __init__(self, connect, 66 listen=DEFAULT_LISTEN_TIMEOUT, 67 error=DEFAULT_ERROR_TIMEOUT, 68 ready=DEFAULT_READY_TIMEOUT, 69 progress=DEFAULT_PROGRESS_INTERVAL):
70 """Initializes this class. 71 72 @type connect: number 73 @param connect: Timeout for establishing connection 74 @type listen: number 75 @param listen: Timeout for starting to listen for connections 76 @type error: number 77 @param error: Length of time until errors cause hard failure 78 @type ready: number 79 @param ready: Timeout for daemon to become ready 80 @type progress: number 81 @param progress: Progress update interval 82 83 """ 84 self.error = error 85 self.ready = ready 86 self.listen = listen 87 self.connect = connect 88 self.progress = progress
89
90 91 -class ImportExportCbBase(object):
92 """Callbacks for disk import/export. 93 94 """
95 - def ReportListening(self, ie, private, component):
96 """Called when daemon started listening. 97 98 @type ie: Subclass of L{_DiskImportExportBase} 99 @param ie: Import/export object 100 @param private: Private data passed to import/export object 101 @param component: transfer component name 102 103 """
104
105 - def ReportConnected(self, ie, private):
106 """Called when a connection has been established. 107 108 @type ie: Subclass of L{_DiskImportExportBase} 109 @param ie: Import/export object 110 @param private: Private data passed to import/export object 111 112 """
113
114 - def ReportProgress(self, ie, private):
115 """Called when new progress information should be reported. 116 117 @type ie: Subclass of L{_DiskImportExportBase} 118 @param ie: Import/export object 119 @param private: Private data passed to import/export object 120 121 """
122
123 - def ReportFinished(self, ie, private):
124 """Called when a transfer has finished. 125 126 @type ie: Subclass of L{_DiskImportExportBase} 127 @param ie: Import/export object 128 @param private: Private data passed to import/export object 129 130 """
131
132 133 -def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
134 """Checks whether a timeout has expired. 135 136 """ 137 return _time_fn() > (epoch + timeout)
138
139 140 -class _DiskImportExportBase(object):
141 MODE_TEXT = None 142
143 - def __init__(self, lu, node_name, opts, 144 instance, component, timeouts, cbs, private=None):
145 """Initializes this class. 146 147 @param lu: Logical unit instance 148 @type node_name: string 149 @param node_name: Node name for import 150 @type opts: L{objects.ImportExportOptions} 151 @param opts: Import/export daemon options 152 @type instance: L{objects.Instance} 153 @param instance: Instance object 154 @type component: string 155 @param component: which part of the instance is being imported 156 @type timeouts: L{ImportExportTimeouts} 157 @param timeouts: Timeouts for this import 158 @type cbs: L{ImportExportCbBase} 159 @param cbs: Callbacks 160 @param private: Private data for callback functions 161 162 """ 163 assert self.MODE_TEXT 164 165 self._lu = lu 166 self.node_name = node_name 167 self._opts = opts.Copy() 168 self._instance = instance 169 self._component = component 170 self._timeouts = timeouts 171 self._cbs = cbs 172 self._private = private 173 174 # Set master daemon's timeout in options for import/export daemon 175 assert self._opts.connect_timeout is None 176 self._opts.connect_timeout = timeouts.connect 177 178 # Parent loop 179 self._loop = None 180 181 # Timestamps 182 self._ts_begin = None 183 self._ts_connected = None 184 self._ts_finished = None 185 self._ts_cleanup = None 186 self._ts_last_progress = None 187 self._ts_last_error = None 188 189 # Transfer status 190 self.success = None 191 self.final_message = None 192 193 # Daemon status 194 self._daemon_name = None 195 self._daemon = None
196 197 @property
198 - def recent_output(self):
199 """Returns the most recent output from the daemon. 200 201 """ 202 if self._daemon: 203 return "\n".join(self._daemon.recent_output) 204 205 return None
206 207 @property
208 - def progress(self):
209 """Returns transfer progress information. 210 211 """ 212 if not self._daemon: 213 return None 214 215 return (self._daemon.progress_mbytes, 216 self._daemon.progress_throughput, 217 self._daemon.progress_percent, 218 self._daemon.progress_eta)
219 220 @property
221 - def magic(self):
222 """Returns the magic value for this import/export. 223 224 """ 225 return self._opts.magic
226 227 @property
228 - def active(self):
229 """Determines whether this transport is still active. 230 231 """ 232 return self.success is None
233 234 @property
235 - def loop(self):
236 """Returns parent loop. 237 238 @rtype: L{ImportExportLoop} 239 240 """ 241 return self._loop
242
243 - def SetLoop(self, loop):
244 """Sets the parent loop. 245 246 @type loop: L{ImportExportLoop} 247 248 """ 249 if self._loop: 250 raise errors.ProgrammerError("Loop can only be set once") 251 252 self._loop = loop
253
254 - def _StartDaemon(self):
255 """Starts the import/export daemon. 256 257 """ 258 raise NotImplementedError()
259
260 - def CheckDaemon(self):
261 """Checks whether daemon has been started and if not, starts it. 262 263 @rtype: string 264 @return: Daemon name 265 266 """ 267 assert self._ts_cleanup is None 268 269 if self._daemon_name is None: 270 assert self._ts_begin is None 271 272 result = self._StartDaemon() 273 if result.fail_msg: 274 raise _ImportExportError("Failed to start %s on %s: %s" % 275 (self.MODE_TEXT, self.node_name, 276 result.fail_msg)) 277 278 daemon_name = result.payload 279 280 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name, 281 self.node_name) 282 283 self._ts_begin = time.time() 284 self._daemon_name = daemon_name 285 286 return self._daemon_name
287
288 - def GetDaemonName(self):
289 """Returns the daemon name. 290 291 """ 292 assert self._daemon_name, "Daemon has not been started" 293 assert self._ts_cleanup is None 294 return self._daemon_name
295
296 - def Abort(self):
297 """Sends SIGTERM to import/export daemon (if still active). 298 299 """ 300 if self._daemon_name: 301 self._lu.LogWarning("Aborting %s '%s' on %s", 302 self.MODE_TEXT, self._daemon_name, self.node_name) 303 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name) 304 if result.fail_msg: 305 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s", 306 self.MODE_TEXT, self._daemon_name, 307 self.node_name, result.fail_msg) 308 return False 309 310 return True
311
312 - def _SetDaemonData(self, data):
313 """Internal function for updating status daemon data. 314 315 @type data: L{objects.ImportExportStatus} 316 @param data: Daemon status data 317 318 """ 319 assert self._ts_begin is not None 320 321 if not data: 322 if _TimeoutExpired(self._ts_begin, self._timeouts.ready): 323 raise _ImportExportError("Didn't become ready after %s seconds" % 324 self._timeouts.ready) 325 326 return False 327 328 self._daemon = data 329 330 return True
331
332 - def SetDaemonData(self, success, data):
333 """Updates daemon status data. 334 335 @type success: bool 336 @param success: Whether fetching data was successful or not 337 @type data: L{objects.ImportExportStatus} 338 @param data: Daemon status data 339 340 """ 341 if not success: 342 if self._ts_last_error is None: 343 self._ts_last_error = time.time() 344 345 elif _TimeoutExpired(self._ts_last_error, self._timeouts.error): 346 raise _ImportExportError("Too many errors while updating data") 347 348 return False 349 350 self._ts_last_error = None 351 352 return self._SetDaemonData(data)
353
354 - def CheckListening(self):
355 """Checks whether the daemon is listening. 356 357 """ 358 raise NotImplementedError()
359
360 - def _GetConnectedCheckEpoch(self):
361 """Returns timeout to calculate connect timeout. 362 363 """ 364 raise NotImplementedError()
365
366 - def CheckConnected(self):
367 """Checks whether the daemon is connected. 368 369 @rtype: bool 370 @return: Whether the daemon is connected 371 372 """ 373 assert self._daemon, "Daemon status missing" 374 375 if self._ts_connected is not None: 376 return True 377 378 if self._daemon.connected: 379 self._ts_connected = time.time() 380 381 # TODO: Log remote peer 382 logging.debug("%s '%s' on %s is now connected", 383 self.MODE_TEXT, self._daemon_name, self.node_name) 384 385 self._cbs.ReportConnected(self, self._private) 386 387 return True 388 389 if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect): 390 raise _ImportExportError("Not connected after %s seconds" % 391 self._timeouts.connect) 392 393 return False
394
395 - def _CheckProgress(self):
396 """Checks whether a progress update should be reported. 397 398 """ 399 if ((self._ts_last_progress is None or 400 _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and 401 self._daemon and 402 self._daemon.progress_mbytes is not None and 403 self._daemon.progress_throughput is not None): 404 self._cbs.ReportProgress(self, self._private) 405 self._ts_last_progress = time.time()
406
407 - def CheckFinished(self):
408 """Checks whether the daemon exited. 409 410 @rtype: bool 411 @return: Whether the transfer is finished 412 413 """ 414 assert self._daemon, "Daemon status missing" 415 416 if self._ts_finished: 417 return True 418 419 if self._daemon.exit_status is None: 420 # TODO: Adjust delay for ETA expiring soon 421 self._CheckProgress() 422 return False 423 424 self._ts_finished = time.time() 425 426 self._ReportFinished(self._daemon.exit_status == 0, 427 self._daemon.error_message) 428 429 return True
430
431 - def _ReportFinished(self, success, message):
432 """Transfer is finished or daemon exited. 433 434 @type success: bool 435 @param success: Whether the transfer was successful 436 @type message: string 437 @param message: Error message 438 439 """ 440 assert self.success is None 441 442 self.success = success 443 self.final_message = message 444 445 if success: 446 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT, 447 self._daemon_name, self.node_name) 448 elif self._daemon_name: 449 self._lu.LogWarning("%s '%s' on %s failed: %s", 450 self.MODE_TEXT, self._daemon_name, self.node_name, 451 message) 452 else: 453 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT, 454 self.node_name, message) 455 456 self._cbs.ReportFinished(self, self._private)
457
458 - def _Finalize(self):
459 """Makes the RPC call to finalize this import/export. 460 461 """ 462 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
463
464 - def Finalize(self, error=None):
465 """Finalizes this import/export. 466 467 """ 468 if self._daemon_name: 469 logging.info("Finalizing %s '%s' on %s", 470 self.MODE_TEXT, self._daemon_name, self.node_name) 471 472 result = self._Finalize() 473 if result.fail_msg: 474 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s", 475 self.MODE_TEXT, self._daemon_name, 476 self.node_name, result.fail_msg) 477 return False 478 479 # Daemon is no longer running 480 self._daemon_name = None 481 self._ts_cleanup = time.time() 482 483 if error: 484 self._ReportFinished(False, error) 485 486 return True
487
488 489 -class DiskImport(_DiskImportExportBase):
490 MODE_TEXT = "import" 491
492 - def __init__(self, lu, node_name, opts, instance, component, 493 dest, dest_args, timeouts, cbs, private=None):
494 """Initializes this class. 495 496 @param lu: Logical unit instance 497 @type node_name: string 498 @param node_name: Node name for import 499 @type opts: L{objects.ImportExportOptions} 500 @param opts: Import/export daemon options 501 @type instance: L{objects.Instance} 502 @param instance: Instance object 503 @type component: string 504 @param component: which part of the instance is being imported 505 @param dest: I/O destination 506 @param dest_args: I/O arguments 507 @type timeouts: L{ImportExportTimeouts} 508 @param timeouts: Timeouts for this import 509 @type cbs: L{ImportExportCbBase} 510 @param cbs: Callbacks 511 @param private: Private data for callback functions 512 513 """ 514 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance, 515 component, timeouts, cbs, private) 516 self._dest = dest 517 self._dest_args = dest_args 518 519 # Timestamps 520 self._ts_listening = None
521 522 @property
523 - def listen_port(self):
524 """Returns the port the daemon is listening on. 525 526 """ 527 if self._daemon: 528 return self._daemon.listen_port 529 530 return None
531
532 - def _StartDaemon(self):
533 """Starts the import daemon. 534 535 """ 536 return self._lu.rpc.call_import_start(self.node_name, self._opts, 537 self._instance, self._component, 538 self._dest, self._dest_args)
539
540 - def CheckListening(self):
541 """Checks whether the daemon is listening. 542 543 @rtype: bool 544 @return: Whether the daemon is listening 545 546 """ 547 assert self._daemon, "Daemon status missing" 548 549 if self._ts_listening is not None: 550 return True 551 552 port = self._daemon.listen_port 553 if port is not None: 554 self._ts_listening = time.time() 555 556 logging.debug("Import '%s' on %s is now listening on port %s", 557 self._daemon_name, self.node_name, port) 558 559 self._cbs.ReportListening(self, self._private, self._component) 560 561 return True 562 563 if _TimeoutExpired(self._ts_begin, self._timeouts.listen): 564 raise _ImportExportError("Not listening after %s seconds" % 565 self._timeouts.listen) 566 567 return False
568
569 - def _GetConnectedCheckEpoch(self):
570 """Returns the time since we started listening. 571 572 """ 573 assert self._ts_listening is not None, \ 574 ("Checking whether an import is connected is only useful" 575 " once it's been listening") 576 577 return self._ts_listening
578
579 580 -class DiskExport(_DiskImportExportBase):
581 MODE_TEXT = "export" 582
583 - def __init__(self, lu, node_name, opts, dest_host, dest_port, 584 instance, component, source, source_args, 585 timeouts, cbs, private=None):
586 """Initializes this class. 587 588 @param lu: Logical unit instance 589 @type node_name: string 590 @param node_name: Node name for import 591 @type opts: L{objects.ImportExportOptions} 592 @param opts: Import/export daemon options 593 @type dest_host: string 594 @param dest_host: Destination host name or IP address 595 @type dest_port: number 596 @param dest_port: Destination port number 597 @type instance: L{objects.Instance} 598 @param instance: Instance object 599 @type component: string 600 @param component: which part of the instance is being imported 601 @param source: I/O source 602 @param source_args: I/O source 603 @type timeouts: L{ImportExportTimeouts} 604 @param timeouts: Timeouts for this import 605 @type cbs: L{ImportExportCbBase} 606 @param cbs: Callbacks 607 @param private: Private data for callback functions 608 609 """ 610 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance, 611 component, timeouts, cbs, private) 612 self._dest_host = dest_host 613 self._dest_port = dest_port 614 self._source = source 615 self._source_args = source_args
616
617 - def _StartDaemon(self):
618 """Starts the export daemon. 619 620 """ 621 return self._lu.rpc.call_export_start(self.node_name, self._opts, 622 self._dest_host, self._dest_port, 623 self._instance, self._component, 624 self._source, self._source_args)
625
626 - def CheckListening(self):
627 """Checks whether the daemon is listening. 628 629 """ 630 # Only an import can be listening 631 return True
632
633 - def _GetConnectedCheckEpoch(self):
634 """Returns the time since the daemon started. 635 636 """ 637 assert self._ts_begin is not None 638 639 return self._ts_begin
640
641 642 -def FormatProgress(progress):
643 """Formats progress information for user consumption 644 645 """ 646 (mbytes, throughput, percent, eta) = progress 647 648 parts = [ 649 utils.FormatUnit(mbytes, "h"), 650 651 # Not using FormatUnit as it doesn't support kilobytes 652 "%0.1f MiB/s" % throughput, 653 ] 654 655 if percent is not None: 656 parts.append("%d%%" % percent) 657 658 if eta is not None: 659 parts.append("ETA %s" % utils.FormatSeconds(eta)) 660 661 return utils.CommaJoin(parts)
662
663 664 -class ImportExportLoop:
665 MIN_DELAY = 1.0 666 MAX_DELAY = 20.0 667
668 - def __init__(self, lu):
669 """Initializes this class. 670 671 """ 672 self._lu = lu 673 self._queue = [] 674 self._pending_add = []
675
676 - def Add(self, diskie):
677 """Adds an import/export object to the loop. 678 679 @type diskie: Subclass of L{_DiskImportExportBase} 680 @param diskie: Import/export object 681 682 """ 683 assert diskie not in self._pending_add 684 assert diskie.loop is None 685 686 diskie.SetLoop(self) 687 688 # Adding new objects to a staging list is necessary, otherwise the main 689 # loop gets confused if callbacks modify the queue while the main loop is 690 # iterating over it. 691 self._pending_add.append(diskie)
692 693 @staticmethod
694 - def _CollectDaemonStatus(lu, daemons):
695 """Collects the status for all import/export daemons. 696 697 """ 698 daemon_status = {} 699 700 for node_name, names in daemons.iteritems(): 701 result = lu.rpc.call_impexp_status(node_name, names) 702 if result.fail_msg: 703 lu.LogWarning("Failed to get daemon status on %s: %s", 704 node_name, result.fail_msg) 705 continue 706 707 assert len(names) == len(result.payload) 708 709 daemon_status[node_name] = dict(zip(names, result.payload)) 710 711 return daemon_status
712 713 @staticmethod
714 - def _GetActiveDaemonNames(queue):
715 """Gets the names of all active daemons. 716 717 """ 718 result = {} 719 for diskie in queue: 720 if not diskie.active: 721 continue 722 723 try: 724 # Start daemon if necessary 725 daemon_name = diskie.CheckDaemon() 726 except _ImportExportError, err: 727 logging.exception("%s failed", diskie.MODE_TEXT) 728 diskie.Finalize(error=str(err)) 729 continue 730 731 result.setdefault(diskie.node_name, []).append(daemon_name) 732 733 assert len(queue) >= len(result) 734 assert len(queue) >= sum([len(names) for names in result.itervalues()]) 735 736 logging.debug("daemons=%r", result) 737 738 return result
739
740 - def _AddPendingToQueue(self):
741 """Adds all pending import/export objects to the internal queue. 742 743 """ 744 assert compat.all(diskie not in self._queue and diskie.loop == self 745 for diskie in self._pending_add) 746 747 self._queue.extend(self._pending_add) 748 749 del self._pending_add[:]
750
751 - def Run(self):
752 """Utility main loop. 753 754 """ 755 while True: 756 self._AddPendingToQueue() 757 758 # Collect all active daemon names 759 daemons = self._GetActiveDaemonNames(self._queue) 760 if not daemons: 761 break 762 763 # Collection daemon status data 764 data = self._CollectDaemonStatus(self._lu, daemons) 765 766 # Use data 767 delay = self.MAX_DELAY 768 for diskie in self._queue: 769 if not diskie.active: 770 continue 771 772 try: 773 try: 774 all_daemon_data = data[diskie.node_name] 775 except KeyError: 776 result = diskie.SetDaemonData(False, None) 777 else: 778 result = \ 779 diskie.SetDaemonData(True, 780 all_daemon_data[diskie.GetDaemonName()]) 781 782 if not result: 783 # Daemon not yet ready, retry soon 784 delay = min(3.0, delay) 785 continue 786 787 if diskie.CheckFinished(): 788 # Transfer finished 789 diskie.Finalize() 790 continue 791 792 # Normal case: check again in 5 seconds 793 delay = min(5.0, delay) 794 795 if not diskie.CheckListening(): 796 # Not yet listening, retry soon 797 delay = min(1.0, delay) 798 continue 799 800 if not diskie.CheckConnected(): 801 # Not yet connected, retry soon 802 delay = min(1.0, delay) 803 continue 804 805 except _ImportExportError, err: 806 logging.exception("%s failed", diskie.MODE_TEXT) 807 diskie.Finalize(error=str(err)) 808 809 if not compat.any(diskie.active for diskie in self._queue): 810 break 811 812 # Wait a bit 813 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay)) 814 logging.debug("Waiting for %ss", delay) 815 time.sleep(delay)
816
817 - def FinalizeAll(self):
818 """Finalizes all pending transfers. 819 820 """ 821 success = True 822 823 for diskie in self._queue: 824 success = diskie.Finalize() and success 825 826 return success
827
828 829 -class _TransferInstCbBase(ImportExportCbBase):
830 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs, 831 dest_node, dest_ip):
832 """Initializes this class. 833 834 """ 835 ImportExportCbBase.__init__(self) 836 837 self.lu = lu 838 self.feedback_fn = feedback_fn 839 self.instance = instance 840 self.timeouts = timeouts 841 self.src_node = src_node 842 self.src_cbs = src_cbs 843 self.dest_node = dest_node 844 self.dest_ip = dest_ip
845
846 847 -class _TransferInstSourceCb(_TransferInstCbBase):
848 - def ReportConnected(self, ie, dtp):
849 """Called when a connection has been established. 850 851 """ 852 assert self.src_cbs is None 853 assert dtp.src_export == ie 854 assert dtp.dest_import 855 856 self.feedback_fn("%s is sending data on %s" % 857 (dtp.data.name, ie.node_name))
858
859 - def ReportProgress(self, ie, dtp):
860 """Called when new progress information should be reported. 861 862 """ 863 progress = ie.progress 864 if not progress: 865 return 866 867 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
868
869 - def ReportFinished(self, ie, dtp):
870 """Called when a transfer has finished. 871 872 """ 873 assert self.src_cbs is None 874 assert dtp.src_export == ie 875 assert dtp.dest_import 876 877 if ie.success: 878 self.feedback_fn("%s finished sending data" % dtp.data.name) 879 else: 880 self.feedback_fn("%s failed to send data: %s (recent output: %s)" % 881 (dtp.data.name, ie.final_message, ie.recent_output)) 882 883 dtp.RecordResult(ie.success) 884 885 cb = dtp.data.finished_fn 886 if cb: 887 cb() 888 889 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 890 # give the daemon a moment to sort things out 891 if dtp.dest_import and not ie.success: 892 dtp.dest_import.Abort()
893
894 895 -class _TransferInstDestCb(_TransferInstCbBase):
896 - def ReportListening(self, ie, dtp, component):
897 """Called when daemon started listening. 898 899 """ 900 assert self.src_cbs 901 assert dtp.src_export is None 902 assert dtp.dest_import 903 assert dtp.export_opts 904 905 self.feedback_fn("%s is now listening, starting export" % dtp.data.name) 906 907 # Start export on source node 908 de = DiskExport(self.lu, self.src_node, dtp.export_opts, 909 self.dest_ip, ie.listen_port, self.instance, 910 component, dtp.data.src_io, dtp.data.src_ioargs, 911 self.timeouts, self.src_cbs, private=dtp) 912 ie.loop.Add(de) 913 914 dtp.src_export = de
915
916 - def ReportConnected(self, ie, dtp):
917 """Called when a connection has been established. 918 919 """ 920 self.feedback_fn("%s is receiving data on %s" % 921 (dtp.data.name, self.dest_node))
922
923 - def ReportFinished(self, ie, dtp):
924 """Called when a transfer has finished. 925 926 """ 927 if ie.success: 928 self.feedback_fn("%s finished receiving data" % dtp.data.name) 929 else: 930 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" % 931 (dtp.data.name, ie.final_message, ie.recent_output)) 932 933 dtp.RecordResult(ie.success) 934 935 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 936 # give the daemon a moment to sort things out 937 if dtp.src_export and not ie.success: 938 dtp.src_export.Abort()
939
940 941 -class DiskTransfer(object):
942 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs, 943 finished_fn):
944 """Initializes this class. 945 946 @type name: string 947 @param name: User-visible name for this transfer (e.g. "disk/0") 948 @param src_io: Source I/O type 949 @param src_ioargs: Source I/O arguments 950 @param dest_io: Destination I/O type 951 @param dest_ioargs: Destination I/O arguments 952 @type finished_fn: callable 953 @param finished_fn: Function called once transfer has finished 954 955 """ 956 self.name = name 957 958 self.src_io = src_io 959 self.src_ioargs = src_ioargs 960 961 self.dest_io = dest_io 962 self.dest_ioargs = dest_ioargs 963 964 self.finished_fn = finished_fn
965
966 967 -class _DiskTransferPrivate(object):
968 - def __init__(self, data, success, export_opts):
969 """Initializes this class. 970 971 @type data: L{DiskTransfer} 972 @type success: bool 973 974 """ 975 self.data = data 976 self.success = success 977 self.export_opts = export_opts 978 979 self.src_export = None 980 self.dest_import = None
981
982 - def RecordResult(self, success):
983 """Updates the status. 984 985 One failed part will cause the whole transfer to fail. 986 987 """ 988 self.success = self.success and success
989
990 991 -def _GetInstDiskMagic(base, instance_name, index):
992 """Computes the magic value for a disk export or import. 993 994 @type base: string 995 @param base: Random seed value (can be the same for all disks of a transfer) 996 @type instance_name: string 997 @param instance_name: Name of instance 998 @type index: number 999 @param index: Disk index 1000 1001 """ 1002 h = compat.sha1_hash() 1003 h.update(str(constants.RIE_VERSION)) 1004 h.update(base) 1005 h.update(instance_name) 1006 h.update(str(index)) 1007 return h.hexdigest()
1008
1009 1010 -def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, 1011 instance, all_transfers):
1012 """Transfers an instance's data from one node to another. 1013 1014 @param lu: Logical unit instance 1015 @param feedback_fn: Feedback function 1016 @type src_node: string 1017 @param src_node: Source node name 1018 @type dest_node: string 1019 @param dest_node: Destination node name 1020 @type dest_ip: string 1021 @param dest_ip: IP address of destination node 1022 @type instance: L{objects.Instance} 1023 @param instance: Instance object 1024 @type all_transfers: list of L{DiskTransfer} instances 1025 @param all_transfers: List of all disk transfers to be made 1026 @rtype: list 1027 @return: List with a boolean (True=successful, False=failed) for success for 1028 each transfer 1029 1030 """ 1031 # Disable compression for all moves as these are all within the same cluster 1032 compress = constants.IEC_NONE 1033 1034 logging.debug("Source node %s, destination node %s, compression '%s'", 1035 src_node, dest_node, compress) 1036 1037 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) 1038 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, 1039 src_node, None, dest_node, dest_ip) 1040 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts, 1041 src_node, src_cbs, dest_node, dest_ip) 1042 1043 all_dtp = [] 1044 1045 base_magic = utils.GenerateSecret(6) 1046 1047 ieloop = ImportExportLoop(lu) 1048 try: 1049 for idx, transfer in enumerate(all_transfers): 1050 if transfer: 1051 feedback_fn("Exporting %s from %s to %s" % 1052 (transfer.name, src_node, dest_node)) 1053 1054 magic = _GetInstDiskMagic(base_magic, instance.name, idx) 1055 opts = objects.ImportExportOptions(key_name=None, ca_pem=None, 1056 compress=compress, magic=magic) 1057 1058 dtp = _DiskTransferPrivate(transfer, True, opts) 1059 1060 di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx, 1061 transfer.dest_io, transfer.dest_ioargs, 1062 timeouts, dest_cbs, private=dtp) 1063 ieloop.Add(di) 1064 1065 dtp.dest_import = di 1066 else: 1067 dtp = _DiskTransferPrivate(None, False, None) 1068 1069 all_dtp.append(dtp) 1070 1071 ieloop.Run() 1072 finally: 1073 ieloop.FinalizeAll() 1074 1075 assert len(all_dtp) == len(all_transfers) 1076 assert compat.all((dtp.src_export is None or 1077 dtp.src_export.success is not None) and 1078 (dtp.dest_import is None or 1079 dtp.dest_import.success is not None) 1080 for dtp in all_dtp), \ 1081 "Not all imports/exports are finalized" 1082 1083 return [bool(dtp.success) for dtp in all_dtp]
1084
1085 1086 -class _RemoteExportCb(ImportExportCbBase):
1087 - def __init__(self, feedback_fn, disk_count):
1088 """Initializes this class. 1089 1090 """ 1091 ImportExportCbBase.__init__(self) 1092 self._feedback_fn = feedback_fn 1093 self._dresults = [None] * disk_count
1094 1095 @property
1096 - def disk_results(self):
1097 """Returns per-disk results. 1098 1099 """ 1100 return self._dresults
1101
1102 - def ReportConnected(self, ie, private):
1103 """Called when a connection has been established. 1104 1105 """ 1106 (idx, _) = private 1107 1108 self._feedback_fn("Disk %s is now sending data" % idx)
1109
1110 - def ReportProgress(self, ie, private):
1111 """Called when new progress information should be reported. 1112 1113 """ 1114 (idx, _) = private 1115 1116 progress = ie.progress 1117 if not progress: 1118 return 1119 1120 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1121
1122 - def ReportFinished(self, ie, private):
1123 """Called when a transfer has finished. 1124 1125 """ 1126 (idx, finished_fn) = private 1127 1128 if ie.success: 1129 self._feedback_fn("Disk %s finished sending data" % idx) 1130 else: 1131 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" % 1132 (idx, ie.final_message, ie.recent_output)) 1133 1134 self._dresults[idx] = bool(ie.success) 1135 1136 if finished_fn: 1137 finished_fn()
1138
1139 1140 -class ExportInstanceHelper:
1141 - def __init__(self, lu, feedback_fn, instance):
1142 """Initializes this class. 1143 1144 @param lu: Logical unit instance 1145 @param feedback_fn: Feedback function 1146 @type instance: L{objects.Instance} 1147 @param instance: Instance object 1148 1149 """ 1150 self._lu = lu 1151 self._feedback_fn = feedback_fn 1152 self._instance = instance 1153 1154 self._snap_disks = [] 1155 self._removed_snaps = [False] * len(instance.disks)
1156
1157 - def CreateSnapshots(self):
1158 """Creates an LVM snapshot for every disk of the instance. 1159 1160 """ 1161 assert not self._snap_disks 1162 1163 instance = self._instance 1164 src_node = instance.primary_node 1165 1166 for idx, disk in enumerate(instance.disks): 1167 self._feedback_fn("Creating a snapshot of disk/%s on node %s" % 1168 (idx, src_node)) 1169 1170 # result.payload will be a snapshot of an lvm leaf of the one we 1171 # passed 1172 result = self._lu.rpc.call_blockdev_snapshot(src_node, disk) 1173 new_dev = False 1174 msg = result.fail_msg 1175 if msg: 1176 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s", 1177 idx, src_node, msg) 1178 elif (not isinstance(result.payload, (tuple, list)) or 1179 len(result.payload) != 2): 1180 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid" 1181 " result '%s'", idx, src_node, result.payload) 1182 else: 1183 disk_id = tuple(result.payload) 1184 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, 1185 logical_id=disk_id, physical_id=disk_id, 1186 iv_name=disk.iv_name) 1187 1188 self._snap_disks.append(new_dev) 1189 1190 assert len(self._snap_disks) == len(instance.disks) 1191 assert len(self._removed_snaps) == len(instance.disks)
1192
1193 - def _RemoveSnapshot(self, disk_index):
1194 """Removes an LVM snapshot. 1195 1196 @type disk_index: number 1197 @param disk_index: Index of the snapshot to be removed 1198 1199 """ 1200 disk = self._snap_disks[disk_index] 1201 if disk and not self._removed_snaps[disk_index]: 1202 src_node = self._instance.primary_node 1203 1204 self._feedback_fn("Removing snapshot of disk/%s on node %s" % 1205 (disk_index, src_node)) 1206 1207 result = self._lu.rpc.call_blockdev_remove(src_node, disk) 1208 if result.fail_msg: 1209 self._lu.LogWarning("Could not remove snapshot for disk/%d from node" 1210 " %s: %s", disk_index, src_node, result.fail_msg) 1211 else: 1212 self._removed_snaps[disk_index] = True
1213
1214 - def LocalExport(self, dest_node):
1215 """Intra-cluster instance export. 1216 1217 @type dest_node: L{objects.Node} 1218 @param dest_node: Destination node 1219 1220 """ 1221 instance = self._instance 1222 src_node = instance.primary_node 1223 1224 assert len(self._snap_disks) == len(instance.disks) 1225 1226 transfers = [] 1227 1228 for idx, dev in enumerate(self._snap_disks): 1229 if not dev: 1230 transfers.append(None) 1231 continue 1232 1233 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name, 1234 dev.physical_id[1]) 1235 1236 finished_fn = compat.partial(self._TransferFinished, idx) 1237 1238 # FIXME: pass debug option from opcode to backend 1239 dt = DiskTransfer("snapshot/%s" % idx, 1240 constants.IEIO_SCRIPT, (dev, idx), 1241 constants.IEIO_FILE, (path, ), 1242 finished_fn) 1243 transfers.append(dt) 1244 1245 # Actually export data 1246 dresults = TransferInstanceData(self._lu, self._feedback_fn, 1247 src_node, dest_node.name, 1248 dest_node.secondary_ip, 1249 instance, transfers) 1250 1251 assert len(dresults) == len(instance.disks) 1252 1253 self._feedback_fn("Finalizing export on %s" % dest_node.name) 1254 result = self._lu.rpc.call_finalize_export(dest_node.name, instance, 1255 self._snap_disks) 1256 msg = result.fail_msg 1257 fin_resu = not msg 1258 if msg: 1259 self._lu.LogWarning("Could not finalize export for instance %s" 1260 " on node %s: %s", instance.name, dest_node.name, msg) 1261 1262 return (fin_resu, dresults)
1263
1264 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1265 """Inter-cluster instance export. 1266 1267 @type disk_info: list 1268 @param disk_info: Per-disk destination information 1269 @type key_name: string 1270 @param key_name: Name of X509 key to use 1271 @type dest_ca_pem: string 1272 @param dest_ca_pem: Destination X509 CA in PEM format 1273 @type timeouts: L{ImportExportTimeouts} 1274 @param timeouts: Timeouts for this import 1275 1276 """ 1277 instance = self._instance 1278 1279 assert len(disk_info) == len(instance.disks) 1280 1281 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks)) 1282 1283 ieloop = ImportExportLoop(self._lu) 1284 try: 1285 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks, 1286 disk_info)): 1287 # Decide whether to use IPv6 1288 ipv6 = netutils.IP6Address.IsValid(host) 1289 1290 opts = objects.ImportExportOptions(key_name=key_name, 1291 ca_pem=dest_ca_pem, 1292 magic=magic, ipv6=ipv6) 1293 1294 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port)) 1295 finished_fn = compat.partial(self._TransferFinished, idx) 1296 ieloop.Add(DiskExport(self._lu, instance.primary_node, 1297 opts, host, port, instance, "disk%d" % idx, 1298 constants.IEIO_SCRIPT, (dev, idx), 1299 timeouts, cbs, private=(idx, finished_fn))) 1300 1301 ieloop.Run() 1302 finally: 1303 ieloop.FinalizeAll() 1304 1305 return (True, cbs.disk_results)
1306
1307 - def _TransferFinished(self, idx):
1308 """Called once a transfer has finished. 1309 1310 @type idx: number 1311 @param idx: Disk index 1312 1313 """ 1314 logging.debug("Transfer %s finished", idx) 1315 self._RemoveSnapshot(idx)
1316
1317 - def Cleanup(self):
1318 """Remove all snapshots. 1319 1320 """ 1321 assert len(self._removed_snaps) == len(self._instance.disks) 1322 for idx in range(len(self._instance.disks)): 1323 self._RemoveSnapshot(idx)
1324
1325 1326 -class _RemoteImportCb(ImportExportCbBase):
1327 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count, 1328 external_address):
1329 """Initializes this class. 1330 1331 @type cds: string 1332 @param cds: Cluster domain secret 1333 @type x509_cert_pem: string 1334 @param x509_cert_pem: CA used for signing import key 1335 @type disk_count: number 1336 @param disk_count: Number of disks 1337 @type external_address: string 1338 @param external_address: External address of destination node 1339 1340 """ 1341 ImportExportCbBase.__init__(self) 1342 self._feedback_fn = feedback_fn 1343 self._cds = cds 1344 self._x509_cert_pem = x509_cert_pem 1345 self._disk_count = disk_count 1346 self._external_address = external_address 1347 1348 self._dresults = [None] * disk_count 1349 self._daemon_port = [None] * disk_count 1350 1351 self._salt = utils.GenerateSecret(8)
1352 1353 @property
1354 - def disk_results(self):
1355 """Returns per-disk results. 1356 1357 """ 1358 return self._dresults
1359
1360 - def _CheckAllListening(self):
1361 """Checks whether all daemons are listening. 1362 1363 If all daemons are listening, the information is sent to the client. 1364 1365 """ 1366 if not compat.all(dp is not None for dp in self._daemon_port): 1367 return 1368 1369 host = self._external_address 1370 1371 disks = [] 1372 for idx, (port, magic) in enumerate(self._daemon_port): 1373 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt, 1374 idx, host, port, magic)) 1375 1376 assert len(disks) == self._disk_count 1377 1378 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, { 1379 "disks": disks, 1380 "x509_ca": self._x509_cert_pem, 1381 })
1382
1383 - def ReportListening(self, ie, private, _):
1384 """Called when daemon started listening. 1385 1386 """ 1387 (idx, ) = private 1388 1389 self._feedback_fn("Disk %s is now listening" % idx) 1390 1391 assert self._daemon_port[idx] is None 1392 1393 self._daemon_port[idx] = (ie.listen_port, ie.magic) 1394 1395 self._CheckAllListening()
1396
1397 - def ReportConnected(self, ie, private):
1398 """Called when a connection has been established. 1399 1400 """ 1401 (idx, ) = private 1402 1403 self._feedback_fn("Disk %s is now receiving data" % idx)
1404
1405 - def ReportFinished(self, ie, private):
1406 """Called when a transfer has finished. 1407 1408 """ 1409 (idx, ) = private 1410 1411 # Daemon is certainly no longer listening 1412 self._daemon_port[idx] = None 1413 1414 if ie.success: 1415 self._feedback_fn("Disk %s finished receiving data" % idx) 1416 else: 1417 self._feedback_fn(("Disk %s failed to receive data: %s" 1418 " (recent output: %s)") % 1419 (idx, ie.final_message, ie.recent_output)) 1420 1421 self._dresults[idx] = bool(ie.success)
1422
1423 1424 -def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca, 1425 cds, timeouts):
1426 """Imports an instance from another cluster. 1427 1428 @param lu: Logical unit instance 1429 @param feedback_fn: Feedback function 1430 @type instance: L{objects.Instance} 1431 @param instance: Instance object 1432 @type pnode: L{objects.Node} 1433 @param pnode: Primary node of instance as an object 1434 @type source_x509_ca: OpenSSL.crypto.X509 1435 @param source_x509_ca: Import source's X509 CA 1436 @type cds: string 1437 @param cds: Cluster domain secret 1438 @type timeouts: L{ImportExportTimeouts} 1439 @param timeouts: Timeouts for this import 1440 1441 """ 1442 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, 1443 source_x509_ca) 1444 1445 magic_base = utils.GenerateSecret(6) 1446 1447 # Decide whether to use IPv6 1448 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip) 1449 1450 # Create crypto key 1451 result = lu.rpc.call_x509_cert_create(instance.primary_node, 1452 constants.RIE_CERT_VALIDITY) 1453 result.Raise("Can't create X509 key and certificate on %s" % result.node) 1454 1455 (x509_key_name, x509_cert_pem) = result.payload 1456 try: 1457 # Load certificate 1458 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, 1459 x509_cert_pem) 1460 1461 # Sign certificate 1462 signed_x509_cert_pem = \ 1463 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8)) 1464 1465 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem, 1466 len(instance.disks), pnode.primary_ip) 1467 1468 ieloop = ImportExportLoop(lu) 1469 try: 1470 for idx, dev in enumerate(instance.disks): 1471 magic = _GetInstDiskMagic(magic_base, instance.name, idx) 1472 1473 # Import daemon options 1474 opts = objects.ImportExportOptions(key_name=x509_key_name, 1475 ca_pem=source_ca_pem, 1476 magic=magic, ipv6=ipv6) 1477 1478 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance, 1479 "disk%d" % idx, 1480 constants.IEIO_SCRIPT, (dev, idx), 1481 timeouts, cbs, private=(idx, ))) 1482 1483 ieloop.Run() 1484 finally: 1485 ieloop.FinalizeAll() 1486 finally: 1487 # Remove crypto key and certificate 1488 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name) 1489 result.Raise("Can't remove X509 key and certificate on %s" % result.node) 1490 1491 return cbs.disk_results
1492
1493 1494 -def _GetImportExportHandshakeMessage(version):
1495 """Returns the handshake message for a RIE protocol version. 1496 1497 @type version: number 1498 1499 """ 1500 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1501
1502 1503 -def ComputeRemoteExportHandshake(cds):
1504 """Computes the remote import/export handshake. 1505 1506 @type cds: string 1507 @param cds: Cluster domain secret 1508 1509 """ 1510 salt = utils.GenerateSecret(8) 1511 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION) 1512 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1513
1514 1515 -def CheckRemoteExportHandshake(cds, handshake):
1516 """Checks the handshake of a remote import/export. 1517 1518 @type cds: string 1519 @param cds: Cluster domain secret 1520 @type handshake: sequence 1521 @param handshake: Handshake sent by remote peer 1522 1523 """ 1524 try: 1525 (version, hmac_digest, hmac_salt) = handshake 1526 except (TypeError, ValueError), err: 1527 return "Invalid data: %s" % err 1528 1529 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version), 1530 hmac_digest, salt=hmac_salt): 1531 return "Hash didn't match, clusters don't share the same domain secret" 1532 1533 if version != constants.RIE_VERSION: 1534 return ("Clusters don't have the same remote import/export protocol" 1535 " (local=%s, remote=%s)" % 1536 (constants.RIE_VERSION, version)) 1537 1538 return None
1539
1540 1541 -def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1542 """Returns the hashed text for import/export disk information. 1543 1544 @type disk_index: number 1545 @param disk_index: Index of disk (included in hash) 1546 @type host: string 1547 @param host: Hostname 1548 @type port: number 1549 @param port: Daemon port 1550 @type magic: string 1551 @param magic: Magic value 1552 1553 """ 1554 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1555
1556 1557 -def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1558 """Verifies received disk information for an export. 1559 1560 @type cds: string 1561 @param cds: Cluster domain secret 1562 @type disk_index: number 1563 @param disk_index: Index of disk (included in hash) 1564 @type disk_info: sequence 1565 @param disk_info: Disk information sent by remote peer 1566 1567 """ 1568 try: 1569 (host, port, magic, hmac_digest, hmac_salt) = disk_info 1570 except (TypeError, ValueError), err: 1571 raise errors.GenericError("Invalid data: %s" % err) 1572 1573 if not (host and port and magic): 1574 raise errors.GenericError("Missing destination host, port or magic") 1575 1576 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1577 1578 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt): 1579 raise errors.GenericError("HMAC is wrong") 1580 1581 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host): 1582 destination = host 1583 else: 1584 destination = netutils.Hostname.GetNormalizedName(host) 1585 1586 return (destination, 1587 utils.ValidateServiceName(port), 1588 magic)
1589
1590 1591 -def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1592 """Computes the signed disk information for a remote import. 1593 1594 @type cds: string 1595 @param cds: Cluster domain secret 1596 @type salt: string 1597 @param salt: HMAC salt 1598 @type disk_index: number 1599 @param disk_index: Index of disk (included in hash) 1600 @type host: string 1601 @param host: Hostname 1602 @type port: number 1603 @param port: Daemon port 1604 @type magic: string 1605 @param magic: Magic value 1606 1607 """ 1608 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1609 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt) 1610 return (host, port, magic, hmac_digest, salt)
1611