Package ganeti :: Package masterd :: Module instance
[hide private]
[frames] | no frames]

Source Code for Module ganeti.masterd.instance

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2010, 2011 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Instance-related functions and classes for masterd. 
  32   
  33  """ 
  34   
  35  import logging 
  36  import time 
  37  import OpenSSL 
  38   
  39  from ganeti import constants 
  40  from ganeti import errors 
  41  from ganeti import compat 
  42  from ganeti import utils 
  43  from ganeti import objects 
  44  from ganeti import netutils 
  45  from ganeti import pathutils 
46 47 48 -class _ImportExportError(Exception):
49 """Local exception to report import/export errors. 50 51 """
52
53 54 -class ImportExportTimeouts(object):
55 #: Time until daemon starts writing status file 56 DEFAULT_READY_TIMEOUT = 10 57 58 #: Length of time until errors cause hard failure 59 DEFAULT_ERROR_TIMEOUT = 10 60 61 #: Time after which daemon must be listening 62 DEFAULT_LISTEN_TIMEOUT = 10 63 64 #: Progress update interval 65 DEFAULT_PROGRESS_INTERVAL = 60 66 67 __slots__ = [ 68 "error", 69 "ready", 70 "listen", 71 "connect", 72 "progress", 73 ] 74
75 - def __init__(self, connect, 76 listen=DEFAULT_LISTEN_TIMEOUT, 77 error=DEFAULT_ERROR_TIMEOUT, 78 ready=DEFAULT_READY_TIMEOUT, 79 progress=DEFAULT_PROGRESS_INTERVAL):
80 """Initializes this class. 81 82 @type connect: number 83 @param connect: Timeout for establishing connection 84 @type listen: number 85 @param listen: Timeout for starting to listen for connections 86 @type error: number 87 @param error: Length of time until errors cause hard failure 88 @type ready: number 89 @param ready: Timeout for daemon to become ready 90 @type progress: number 91 @param progress: Progress update interval 92 93 """ 94 self.error = error 95 self.ready = ready 96 self.listen = listen 97 self.connect = connect 98 self.progress = progress
99
100 101 -class ImportExportCbBase(object):
102 """Callbacks for disk import/export. 103 104 """
105 - def ReportListening(self, ie, private, component):
106 """Called when daemon started listening. 107 108 @type ie: Subclass of L{_DiskImportExportBase} 109 @param ie: Import/export object 110 @param private: Private data passed to import/export object 111 @param component: transfer component name 112 113 """
114
115 - def ReportConnected(self, ie, private):
116 """Called when a connection has been established. 117 118 @type ie: Subclass of L{_DiskImportExportBase} 119 @param ie: Import/export object 120 @param private: Private data passed to import/export object 121 122 """
123
124 - def ReportProgress(self, ie, private):
125 """Called when new progress information should be reported. 126 127 @type ie: Subclass of L{_DiskImportExportBase} 128 @param ie: Import/export object 129 @param private: Private data passed to import/export object 130 131 """
132
133 - def ReportFinished(self, ie, private):
134 """Called when a transfer has finished. 135 136 @type ie: Subclass of L{_DiskImportExportBase} 137 @param ie: Import/export object 138 @param private: Private data passed to import/export object 139 140 """
141
142 143 -class _DiskImportExportBase(object):
144 MODE_TEXT = None 145
146 - def __init__(self, lu, node_uuid, opts, 147 instance, component, timeouts, cbs, private=None):
148 """Initializes this class. 149 150 @param lu: Logical unit instance 151 @type node_uuid: string 152 @param node_uuid: Node UUID for import 153 @type opts: L{objects.ImportExportOptions} 154 @param opts: Import/export daemon options 155 @type instance: L{objects.Instance} 156 @param instance: Instance object 157 @type component: string 158 @param component: which part of the instance is being imported 159 @type timeouts: L{ImportExportTimeouts} 160 @param timeouts: Timeouts for this import 161 @type cbs: L{ImportExportCbBase} 162 @param cbs: Callbacks 163 @param private: Private data for callback functions 164 165 """ 166 assert self.MODE_TEXT 167 168 self._lu = lu 169 self.node_uuid = node_uuid 170 self.node_name = lu.cfg.GetNodeName(node_uuid) 171 self._opts = opts.Copy() 172 self._instance = instance 173 self._component = component 174 self._timeouts = timeouts 175 self._cbs = cbs 176 self._private = private 177 178 # Set master daemon's timeout in options for import/export daemon 179 assert self._opts.connect_timeout is None 180 self._opts.connect_timeout = timeouts.connect 181 182 # Parent loop 183 self._loop = None 184 185 # Timestamps 186 self._ts_begin = None 187 self._ts_connected = None 188 self._ts_finished = None 189 self._ts_cleanup = None 190 self._ts_last_progress = None 191 self._ts_last_error = None 192 193 # Transfer status 194 self.success = None 195 self.final_message = None 196 197 # Daemon status 198 self._daemon_name = None 199 self._daemon = None
200 201 @property
202 - def recent_output(self):
203 """Returns the most recent output from the daemon. 204 205 """ 206 if self._daemon: 207 return "\n".join(self._daemon.recent_output) 208 209 return None
210 211 @property
212 - def progress(self):
213 """Returns transfer progress information. 214 215 """ 216 if not self._daemon: 217 return None 218 219 return (self._daemon.progress_mbytes, 220 self._daemon.progress_throughput, 221 self._daemon.progress_percent, 222 self._daemon.progress_eta)
223 224 @property
225 - def magic(self):
226 """Returns the magic value for this import/export. 227 228 """ 229 return self._opts.magic
230 231 @property
232 - def active(self):
233 """Determines whether this transport is still active. 234 235 """ 236 return self.success is None
237 238 @property
239 - def loop(self):
240 """Returns parent loop. 241 242 @rtype: L{ImportExportLoop} 243 244 """ 245 return self._loop
246
247 - def SetLoop(self, loop):
248 """Sets the parent loop. 249 250 @type loop: L{ImportExportLoop} 251 252 """ 253 if self._loop: 254 raise errors.ProgrammerError("Loop can only be set once") 255 256 self._loop = loop
257
258 - def _StartDaemon(self):
259 """Starts the import/export daemon. 260 261 """ 262 raise NotImplementedError()
263
264 - def CheckDaemon(self):
265 """Checks whether daemon has been started and if not, starts it. 266 267 @rtype: string 268 @return: Daemon name 269 270 """ 271 assert self._ts_cleanup is None 272 273 if self._daemon_name is None: 274 assert self._ts_begin is None 275 276 result = self._StartDaemon() 277 if result.fail_msg: 278 raise _ImportExportError("Failed to start %s on %s: %s" % 279 (self.MODE_TEXT, self.node_name, 280 result.fail_msg)) 281 282 daemon_name = result.payload 283 284 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name, 285 self.node_name) 286 287 self._ts_begin = time.time() 288 self._daemon_name = daemon_name 289 290 return self._daemon_name
291
292 - def GetDaemonName(self):
293 """Returns the daemon name. 294 295 """ 296 assert self._daemon_name, "Daemon has not been started" 297 assert self._ts_cleanup is None 298 return self._daemon_name
299
300 - def Abort(self):
301 """Sends SIGTERM to import/export daemon (if still active). 302 303 """ 304 if self._daemon_name: 305 self._lu.LogWarning("Aborting %s '%s' on %s", 306 self.MODE_TEXT, self._daemon_name, self.node_uuid) 307 result = self._lu.rpc.call_impexp_abort(self.node_uuid, self._daemon_name) 308 if result.fail_msg: 309 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s", 310 self.MODE_TEXT, self._daemon_name, 311 self.node_uuid, result.fail_msg) 312 return False 313 314 return True
315
316 - def _SetDaemonData(self, data):
317 """Internal function for updating status daemon data. 318 319 @type data: L{objects.ImportExportStatus} 320 @param data: Daemon status data 321 322 """ 323 assert self._ts_begin is not None 324 325 if not data: 326 if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready): 327 raise _ImportExportError("Didn't become ready after %s seconds" % 328 self._timeouts.ready) 329 330 return False 331 332 self._daemon = data 333 334 return True
335
336 - def SetDaemonData(self, success, data):
337 """Updates daemon status data. 338 339 @type success: bool 340 @param success: Whether fetching data was successful or not 341 @type data: L{objects.ImportExportStatus} 342 @param data: Daemon status data 343 344 """ 345 if not success: 346 if self._ts_last_error is None: 347 self._ts_last_error = time.time() 348 349 elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error): 350 raise _ImportExportError("Too many errors while updating data") 351 352 return False 353 354 self._ts_last_error = None 355 356 return self._SetDaemonData(data)
357
358 - def CheckListening(self):
359 """Checks whether the daemon is listening. 360 361 """ 362 raise NotImplementedError()
363
364 - def _GetConnectedCheckEpoch(self):
365 """Returns timeout to calculate connect timeout. 366 367 """ 368 raise NotImplementedError()
369
370 - def CheckConnected(self):
371 """Checks whether the daemon is connected. 372 373 @rtype: bool 374 @return: Whether the daemon is connected 375 376 """ 377 assert self._daemon, "Daemon status missing" 378 379 if self._ts_connected is not None: 380 return True 381 382 if self._daemon.connected: 383 self._ts_connected = time.time() 384 385 # TODO: Log remote peer 386 logging.debug("%s '%s' on %s is now connected", 387 self.MODE_TEXT, self._daemon_name, self.node_uuid) 388 389 self._cbs.ReportConnected(self, self._private) 390 391 return True 392 393 if utils.TimeoutExpired(self._GetConnectedCheckEpoch(), 394 self._timeouts.connect): 395 raise _ImportExportError("Not connected after %s seconds" % 396 self._timeouts.connect) 397 398 return False
399
400 - def _CheckProgress(self):
401 """Checks whether a progress update should be reported. 402 403 """ 404 if ((self._ts_last_progress is None or 405 utils.TimeoutExpired(self._ts_last_progress, 406 self._timeouts.progress)) and 407 self._daemon and 408 self._daemon.progress_mbytes is not None and 409 self._daemon.progress_throughput is not None): 410 self._cbs.ReportProgress(self, self._private) 411 self._ts_last_progress = time.time()
412
413 - def CheckFinished(self):
414 """Checks whether the daemon exited. 415 416 @rtype: bool 417 @return: Whether the transfer is finished 418 419 """ 420 assert self._daemon, "Daemon status missing" 421 422 if self._ts_finished: 423 return True 424 425 if self._daemon.exit_status is None: 426 # TODO: Adjust delay for ETA expiring soon 427 self._CheckProgress() 428 return False 429 430 self._ts_finished = time.time() 431 432 self._ReportFinished(self._daemon.exit_status == 0, 433 self._daemon.error_message) 434 435 return True
436
437 - def _ReportFinished(self, success, message):
438 """Transfer is finished or daemon exited. 439 440 @type success: bool 441 @param success: Whether the transfer was successful 442 @type message: string 443 @param message: Error message 444 445 """ 446 assert self.success is None 447 448 self.success = success 449 self.final_message = message 450 451 if success: 452 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT, 453 self._daemon_name, self.node_uuid) 454 elif self._daemon_name: 455 self._lu.LogWarning("%s '%s' on %s failed: %s", 456 self.MODE_TEXT, self._daemon_name, 457 self._lu.cfg.GetNodeName(self.node_uuid), 458 message) 459 else: 460 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT, 461 self._lu.cfg.GetNodeName(self.node_uuid), message) 462 463 self._cbs.ReportFinished(self, self._private)
464
465 - def _Finalize(self):
466 """Makes the RPC call to finalize this import/export. 467 468 """ 469 return self._lu.rpc.call_impexp_cleanup(self.node_uuid, self._daemon_name)
470
471 - def Finalize(self, error=None):
472 """Finalizes this import/export. 473 474 """ 475 if self._daemon_name: 476 logging.info("Finalizing %s '%s' on %s", 477 self.MODE_TEXT, self._daemon_name, self.node_uuid) 478 479 result = self._Finalize() 480 if result.fail_msg: 481 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s", 482 self.MODE_TEXT, self._daemon_name, 483 self.node_uuid, result.fail_msg) 484 return False 485 486 # Daemon is no longer running 487 self._daemon_name = None 488 self._ts_cleanup = time.time() 489 490 if error: 491 self._ReportFinished(False, error) 492 493 return True
494
495 496 -class DiskImport(_DiskImportExportBase):
497 MODE_TEXT = "import" 498
499 - def __init__(self, lu, node_uuid, opts, instance, component, 500 dest, dest_args, timeouts, cbs, private=None):
501 """Initializes this class. 502 503 @param lu: Logical unit instance 504 @type node_uuid: string 505 @param node_uuid: Node name for import 506 @type opts: L{objects.ImportExportOptions} 507 @param opts: Import/export daemon options 508 @type instance: L{objects.Instance} 509 @param instance: Instance object 510 @type component: string 511 @param component: which part of the instance is being imported 512 @param dest: I/O destination 513 @param dest_args: I/O arguments 514 @type timeouts: L{ImportExportTimeouts} 515 @param timeouts: Timeouts for this import 516 @type cbs: L{ImportExportCbBase} 517 @param cbs: Callbacks 518 @param private: Private data for callback functions 519 520 """ 521 _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance, 522 component, timeouts, cbs, private) 523 self._dest = dest 524 self._dest_args = dest_args 525 526 # Timestamps 527 self._ts_listening = None
528 529 @property
530 - def listen_port(self):
531 """Returns the port the daemon is listening on. 532 533 """ 534 if self._daemon: 535 return self._daemon.listen_port 536 537 return None
538
539 - def _StartDaemon(self):
540 """Starts the import daemon. 541 542 """ 543 return self._lu.rpc.call_import_start(self.node_uuid, self._opts, 544 self._instance, self._component, 545 (self._dest, self._dest_args))
546
547 - def CheckListening(self):
548 """Checks whether the daemon is listening. 549 550 @rtype: bool 551 @return: Whether the daemon is listening 552 553 """ 554 assert self._daemon, "Daemon status missing" 555 556 if self._ts_listening is not None: 557 return True 558 559 port = self._daemon.listen_port 560 if port is not None: 561 self._ts_listening = time.time() 562 563 logging.debug("Import '%s' on %s is now listening on port %s", 564 self._daemon_name, self.node_uuid, port) 565 566 self._cbs.ReportListening(self, self._private, self._component) 567 568 return True 569 570 if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen): 571 raise _ImportExportError("Not listening after %s seconds" % 572 self._timeouts.listen) 573 574 return False
575
576 - def _GetConnectedCheckEpoch(self):
577 """Returns the time since we started listening. 578 579 """ 580 assert self._ts_listening is not None, \ 581 ("Checking whether an import is connected is only useful" 582 " once it's been listening") 583 584 return self._ts_listening
585
586 587 -class DiskExport(_DiskImportExportBase):
588 MODE_TEXT = "export" 589
590 - def __init__(self, lu, node_uuid, opts, dest_host, dest_port, 591 instance, component, source, source_args, 592 timeouts, cbs, private=None):
593 """Initializes this class. 594 595 @param lu: Logical unit instance 596 @type node_uuid: string 597 @param node_uuid: Node UUID for import 598 @type opts: L{objects.ImportExportOptions} 599 @param opts: Import/export daemon options 600 @type dest_host: string 601 @param dest_host: Destination host name or IP address 602 @type dest_port: number 603 @param dest_port: Destination port number 604 @type instance: L{objects.Instance} 605 @param instance: Instance object 606 @type component: string 607 @param component: which part of the instance is being imported 608 @param source: I/O source 609 @param source_args: I/O source 610 @type timeouts: L{ImportExportTimeouts} 611 @param timeouts: Timeouts for this import 612 @type cbs: L{ImportExportCbBase} 613 @param cbs: Callbacks 614 @param private: Private data for callback functions 615 616 """ 617 _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance, 618 component, timeouts, cbs, private) 619 self._dest_host = dest_host 620 self._dest_port = dest_port 621 self._source = source 622 self._source_args = source_args
623
624 - def _StartDaemon(self):
625 """Starts the export daemon. 626 627 """ 628 return self._lu.rpc.call_export_start(self.node_uuid, self._opts, 629 self._dest_host, self._dest_port, 630 self._instance, self._component, 631 (self._source, self._source_args))
632
633 - def CheckListening(self):
634 """Checks whether the daemon is listening. 635 636 """ 637 # Only an import can be listening 638 return True
639
640 - def _GetConnectedCheckEpoch(self):
641 """Returns the time since the daemon started. 642 643 """ 644 assert self._ts_begin is not None 645 646 return self._ts_begin
647
648 649 -def FormatProgress(progress):
650 """Formats progress information for user consumption 651 652 """ 653 (mbytes, throughput, percent, eta) = progress 654 655 parts = [ 656 utils.FormatUnit(mbytes, "h"), 657 658 # Not using FormatUnit as it doesn't support kilobytes 659 "%0.1f MiB/s" % throughput, 660 ] 661 662 if percent is not None: 663 parts.append("%d%%" % percent) 664 665 if eta is not None: 666 parts.append("ETA %s" % utils.FormatSeconds(eta)) 667 668 return utils.CommaJoin(parts)
669
670 671 -class ImportExportLoop(object):
672 MIN_DELAY = 1.0 673 MAX_DELAY = 20.0 674
675 - def __init__(self, lu):
676 """Initializes this class. 677 678 """ 679 self._lu = lu 680 self._queue = [] 681 self._pending_add = []
682
683 - def Add(self, diskie):
684 """Adds an import/export object to the loop. 685 686 @type diskie: Subclass of L{_DiskImportExportBase} 687 @param diskie: Import/export object 688 689 """ 690 assert diskie not in self._pending_add 691 assert diskie.loop is None 692 693 diskie.SetLoop(self) 694 695 # Adding new objects to a staging list is necessary, otherwise the main 696 # loop gets confused if callbacks modify the queue while the main loop is 697 # iterating over it. 698 self._pending_add.append(diskie)
699 700 @staticmethod
701 - def _CollectDaemonStatus(lu, daemons):
702 """Collects the status for all import/export daemons. 703 704 """ 705 daemon_status = {} 706 707 for node_name, names in daemons.iteritems(): 708 result = lu.rpc.call_impexp_status(node_name, names) 709 if result.fail_msg: 710 lu.LogWarning("Failed to get daemon status on %s: %s", 711 node_name, result.fail_msg) 712 continue 713 714 assert len(names) == len(result.payload) 715 716 daemon_status[node_name] = dict(zip(names, result.payload)) 717 718 return daemon_status
719 720 @staticmethod
721 - def _GetActiveDaemonNames(queue):
722 """Gets the names of all active daemons. 723 724 """ 725 result = {} 726 for diskie in queue: 727 if not diskie.active: 728 continue 729 730 try: 731 # Start daemon if necessary 732 daemon_name = diskie.CheckDaemon() 733 except _ImportExportError, err: 734 logging.exception("%s failed", diskie.MODE_TEXT) 735 diskie.Finalize(error=str(err)) 736 continue 737 738 result.setdefault(diskie.node_name, []).append(daemon_name) 739 740 assert len(queue) >= len(result) 741 assert len(queue) >= sum([len(names) for names in result.itervalues()]) 742 743 logging.debug("daemons=%r", result) 744 745 return result
746
747 - def _AddPendingToQueue(self):
748 """Adds all pending import/export objects to the internal queue. 749 750 """ 751 assert compat.all(diskie not in self._queue and diskie.loop == self 752 for diskie in self._pending_add) 753 754 self._queue.extend(self._pending_add) 755 756 del self._pending_add[:]
757
758 - def Run(self):
759 """Utility main loop. 760 761 """ 762 while True: 763 self._AddPendingToQueue() 764 765 # Collect all active daemon names 766 daemons = self._GetActiveDaemonNames(self._queue) 767 if not daemons: 768 break 769 770 # Collection daemon status data 771 data = self._CollectDaemonStatus(self._lu, daemons) 772 773 # Use data 774 delay = self.MAX_DELAY 775 for diskie in self._queue: 776 if not diskie.active: 777 continue 778 779 try: 780 try: 781 all_daemon_data = data[diskie.node_name] 782 except KeyError: 783 result = diskie.SetDaemonData(False, None) 784 else: 785 result = \ 786 diskie.SetDaemonData(True, 787 all_daemon_data[diskie.GetDaemonName()]) 788 789 if not result: 790 # Daemon not yet ready, retry soon 791 delay = min(3.0, delay) 792 continue 793 794 if diskie.CheckFinished(): 795 # Transfer finished 796 diskie.Finalize() 797 continue 798 799 # Normal case: check again in 5 seconds 800 delay = min(5.0, delay) 801 802 if not diskie.CheckListening(): 803 # Not yet listening, retry soon 804 delay = min(1.0, delay) 805 continue 806 807 if not diskie.CheckConnected(): 808 # Not yet connected, retry soon 809 delay = min(1.0, delay) 810 continue 811 812 except _ImportExportError, err: 813 logging.exception("%s failed", diskie.MODE_TEXT) 814 diskie.Finalize(error=str(err)) 815 816 if not compat.any(diskie.active for diskie in self._queue): 817 break 818 819 # Wait a bit 820 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay)) 821 logging.debug("Waiting for %ss", delay) 822 time.sleep(delay)
823
824 - def FinalizeAll(self):
825 """Finalizes all pending transfers. 826 827 """ 828 success = True 829 830 for diskie in self._queue: 831 success = diskie.Finalize() and success 832 833 return success
834
835 836 -class _TransferInstCbBase(ImportExportCbBase):
837 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node_uuid, 838 src_cbs, dest_node_uuid, dest_ip):
839 """Initializes this class. 840 841 """ 842 ImportExportCbBase.__init__(self) 843 844 self.lu = lu 845 self.feedback_fn = feedback_fn 846 self.instance = instance 847 self.timeouts = timeouts 848 self.src_node_uuid = src_node_uuid 849 self.src_cbs = src_cbs 850 self.dest_node_uuid = dest_node_uuid 851 self.dest_ip = dest_ip
852
853 854 -class _TransferInstSourceCb(_TransferInstCbBase):
855 - def ReportConnected(self, ie, dtp):
856 """Called when a connection has been established. 857 858 """ 859 assert self.src_cbs is None 860 assert dtp.src_export == ie 861 assert dtp.dest_import 862 863 self.feedback_fn("%s is sending data on %s" % 864 (dtp.data.name, ie.node_name))
865
866 - def ReportProgress(self, ie, dtp):
867 """Called when new progress information should be reported. 868 869 """ 870 progress = ie.progress 871 if not progress: 872 return 873 874 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
875
876 - def ReportFinished(self, ie, dtp):
877 """Called when a transfer has finished. 878 879 """ 880 assert self.src_cbs is None 881 assert dtp.src_export == ie 882 assert dtp.dest_import 883 884 if ie.success: 885 self.feedback_fn("%s finished sending data" % dtp.data.name) 886 else: 887 self.feedback_fn("%s failed to send data: %s (recent output: %s)" % 888 (dtp.data.name, ie.final_message, ie.recent_output)) 889 890 dtp.RecordResult(ie.success) 891 892 cb = dtp.data.finished_fn 893 if cb: 894 cb() 895 896 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 897 # give the daemon a moment to sort things out 898 if dtp.dest_import and not ie.success: 899 dtp.dest_import.Abort()
900
901 902 -class _TransferInstDestCb(_TransferInstCbBase):
903 - def ReportListening(self, ie, dtp, component):
904 """Called when daemon started listening. 905 906 """ 907 assert self.src_cbs 908 assert dtp.src_export is None 909 assert dtp.dest_import 910 assert dtp.export_opts 911 912 self.feedback_fn("%s is now listening, starting export" % dtp.data.name) 913 914 # Start export on source node 915 de = DiskExport(self.lu, self.src_node_uuid, dtp.export_opts, 916 self.dest_ip, ie.listen_port, self.instance, 917 component, dtp.data.src_io, dtp.data.src_ioargs, 918 self.timeouts, self.src_cbs, private=dtp) 919 ie.loop.Add(de) 920 921 dtp.src_export = de
922
923 - def ReportConnected(self, ie, dtp):
924 """Called when a connection has been established. 925 926 """ 927 self.feedback_fn("%s is receiving data on %s" % 928 (dtp.data.name, 929 self.lu.cfg.GetNodeName(self.dest_node_uuid)))
930
931 - def ReportFinished(self, ie, dtp):
932 """Called when a transfer has finished. 933 934 """ 935 if ie.success: 936 self.feedback_fn("%s finished receiving data" % dtp.data.name) 937 else: 938 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" % 939 (dtp.data.name, ie.final_message, ie.recent_output)) 940 941 dtp.RecordResult(ie.success) 942 943 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 944 # give the daemon a moment to sort things out 945 if dtp.src_export and not ie.success: 946 dtp.src_export.Abort()
947
948 949 -class DiskTransfer(object):
950 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs, 951 finished_fn):
952 """Initializes this class. 953 954 @type name: string 955 @param name: User-visible name for this transfer (e.g. "disk/0") 956 @param src_io: Source I/O type 957 @param src_ioargs: Source I/O arguments 958 @param dest_io: Destination I/O type 959 @param dest_ioargs: Destination I/O arguments 960 @type finished_fn: callable 961 @param finished_fn: Function called once transfer has finished 962 963 """ 964 self.name = name 965 966 self.src_io = src_io 967 self.src_ioargs = src_ioargs 968 969 self.dest_io = dest_io 970 self.dest_ioargs = dest_ioargs 971 972 self.finished_fn = finished_fn
973
974 975 -class _DiskTransferPrivate(object):
976 - def __init__(self, data, success, export_opts):
977 """Initializes this class. 978 979 @type data: L{DiskTransfer} 980 @type success: bool 981 982 """ 983 self.data = data 984 self.success = success 985 self.export_opts = export_opts 986 987 self.src_export = None 988 self.dest_import = None
989
990 - def RecordResult(self, success):
991 """Updates the status. 992 993 One failed part will cause the whole transfer to fail. 994 995 """ 996 self.success = self.success and success
997
998 999 -def _GetInstDiskMagic(base, instance_name, index):
1000 """Computes the magic value for a disk export or import. 1001 1002 @type base: string 1003 @param base: Random seed value (can be the same for all disks of a transfer) 1004 @type instance_name: string 1005 @param instance_name: Name of instance 1006 @type index: number 1007 @param index: Disk index 1008 1009 """ 1010 h = compat.sha1_hash() 1011 h.update(str(constants.RIE_VERSION)) 1012 h.update(base) 1013 h.update(instance_name) 1014 h.update(str(index)) 1015 return h.hexdigest()
1016
1017 1018 -def TransferInstanceData(lu, feedback_fn, src_node_uuid, dest_node_uuid, 1019 dest_ip, compress, instance, all_transfers):
1020 """Transfers an instance's data from one node to another. 1021 1022 @param lu: Logical unit instance 1023 @param feedback_fn: Feedback function 1024 @type src_node_uuid: string 1025 @param src_node_uuid: Source node UUID 1026 @type dest_node_uuid: string 1027 @param dest_node_uuid: Destination node UUID 1028 @type dest_ip: string 1029 @param dest_ip: IP address of destination node 1030 @type compress: string 1031 @param compress: Compression tool to use 1032 @type instance: L{objects.Instance} 1033 @param instance: Instance object 1034 @type all_transfers: list of L{DiskTransfer} instances 1035 @param all_transfers: List of all disk transfers to be made 1036 @rtype: list 1037 @return: List with a boolean (True=successful, False=failed) for success for 1038 each transfer 1039 1040 """ 1041 src_node_name = lu.cfg.GetNodeName(src_node_uuid) 1042 dest_node_name = lu.cfg.GetNodeName(dest_node_uuid) 1043 1044 logging.debug("Source node %s, destination node %s, compression '%s'", 1045 src_node_name, dest_node_name, compress) 1046 1047 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) 1048 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, 1049 src_node_uuid, None, dest_node_uuid, dest_ip) 1050 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts, 1051 src_node_uuid, src_cbs, dest_node_uuid, 1052 dest_ip) 1053 1054 all_dtp = [] 1055 1056 base_magic = utils.GenerateSecret(6) 1057 1058 ieloop = ImportExportLoop(lu) 1059 try: 1060 for idx, transfer in enumerate(all_transfers): 1061 if transfer: 1062 feedback_fn("Exporting %s from %s to %s" % 1063 (transfer.name, src_node_name, dest_node_name)) 1064 1065 magic = _GetInstDiskMagic(base_magic, instance.name, idx) 1066 opts = objects.ImportExportOptions(key_name=None, ca_pem=None, 1067 compress=compress, magic=magic) 1068 1069 dtp = _DiskTransferPrivate(transfer, True, opts) 1070 1071 di = DiskImport(lu, dest_node_uuid, opts, instance, "disk%d" % idx, 1072 transfer.dest_io, transfer.dest_ioargs, 1073 timeouts, dest_cbs, private=dtp) 1074 ieloop.Add(di) 1075 1076 dtp.dest_import = di 1077 else: 1078 dtp = _DiskTransferPrivate(None, False, None) 1079 1080 all_dtp.append(dtp) 1081 1082 ieloop.Run() 1083 finally: 1084 ieloop.FinalizeAll() 1085 1086 assert len(all_dtp) == len(all_transfers) 1087 assert compat.all((dtp.src_export is None or 1088 dtp.src_export.success is not None) and 1089 (dtp.dest_import is None or 1090 dtp.dest_import.success is not None) 1091 for dtp in all_dtp), \ 1092 "Not all imports/exports are finalized" 1093 1094 return [bool(dtp.success) for dtp in all_dtp]
1095
1096 1097 -class _RemoteExportCb(ImportExportCbBase):
1098 - def __init__(self, feedback_fn, disk_count):
1099 """Initializes this class. 1100 1101 """ 1102 ImportExportCbBase.__init__(self) 1103 self._feedback_fn = feedback_fn 1104 self._dresults = [None] * disk_count
1105 1106 @property
1107 - def disk_results(self):
1108 """Returns per-disk results. 1109 1110 """ 1111 return self._dresults
1112
1113 - def ReportConnected(self, ie, private):
1114 """Called when a connection has been established. 1115 1116 """ 1117 (idx, _) = private 1118 1119 self._feedback_fn("Disk %s is now sending data" % idx)
1120
1121 - def ReportProgress(self, ie, private):
1122 """Called when new progress information should be reported. 1123 1124 """ 1125 (idx, _) = private 1126 1127 progress = ie.progress 1128 if not progress: 1129 return 1130 1131 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1132
1133 - def ReportFinished(self, ie, private):
1134 """Called when a transfer has finished. 1135 1136 """ 1137 (idx, finished_fn) = private 1138 1139 if ie.success: 1140 self._feedback_fn("Disk %s finished sending data" % idx) 1141 else: 1142 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" % 1143 (idx, ie.final_message, ie.recent_output)) 1144 1145 self._dresults[idx] = bool(ie.success) 1146 1147 if finished_fn: 1148 finished_fn()
1149
1150 1151 -class ExportInstanceHelper(object):
1152 - def __init__(self, lu, feedback_fn, instance):
1153 """Initializes this class. 1154 1155 @param lu: Logical unit instance 1156 @param feedback_fn: Feedback function 1157 @type instance: L{objects.Instance} 1158 @param instance: Instance object 1159 1160 """ 1161 self._lu = lu 1162 self._feedback_fn = feedback_fn 1163 self._instance = instance 1164 1165 self._snapshots = [None] * len(instance.disks) 1166 self._snapshots_removed = [False] * len(instance.disks)
1167
1168 - def _SnapshotsReady(self):
1169 """Returns true if snapshots are ready to be used in exports. 1170 1171 """ 1172 return all(self._snapshots)
1173
1174 - def CreateSnapshots(self):
1175 """Attempts to create a snapshot for every disk of the instance. 1176 1177 Currently support drbd, plain and ext disk templates. 1178 1179 @rtype: bool 1180 @return: Whether following transfers can use snapshots 1181 1182 """ 1183 if any(self._snapshots): 1184 raise errors.ProgrammerError("Snapshot creation was invoked more than " 1185 "once") 1186 1187 instance = self._instance 1188 inst_disks = self._lu.cfg.GetInstanceDisks(instance.uuid) 1189 1190 # A quick check whether we can support snapshots at all 1191 if not all([d.SupportsSnapshots() for d in inst_disks]): 1192 return False 1193 1194 src_node = instance.primary_node 1195 src_node_name = self._lu.cfg.GetNodeName(src_node) 1196 1197 for idx, disk in enumerate(inst_disks): 1198 self._feedback_fn("Creating a snapshot of disk/%s on node %s" % 1199 (idx, src_node_name)) 1200 1201 # result.payload will be a snapshot of an lvm leaf of the one we 1202 # passed 1203 result = self._lu.rpc.call_blockdev_snapshot(src_node, 1204 (disk, instance), 1205 None, None) 1206 msg = result.fail_msg 1207 if msg: 1208 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s", 1209 idx, src_node_name, msg) 1210 elif (not isinstance(result.payload, (tuple, list)) or 1211 len(result.payload) != 2): 1212 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid" 1213 " result '%s'", idx, src_node_name, result.payload) 1214 else: 1215 disk_id = tuple(result.payload) 1216 # Snapshot is currently supported for ExtStorage and LogicalVolume. 1217 # In case disk is of type drbd the snapshot will be of type plain. 1218 if disk.dev_type == constants.DT_EXT: 1219 dev_type = constants.DT_EXT 1220 else: 1221 dev_type = constants.DT_PLAIN 1222 disk_params = constants.DISK_LD_DEFAULTS[dev_type].copy() 1223 new_dev = objects.Disk(dev_type=dev_type, size=disk.size, 1224 logical_id=disk_id, iv_name=disk.iv_name, 1225 params=disk_params) 1226 new_dev.uuid = self._lu.cfg.GenerateUniqueID(self._lu.proc.GetECId()) 1227 1228 self._snapshots[idx] = new_dev 1229 self._snapshots_removed[idx] = False 1230 1231 # One final check to see if we have managed to snapshot everything 1232 if self._SnapshotsReady(): 1233 return True 1234 else: 1235 # If we failed to do so, the existing snapshots are of little value to us 1236 # so we can remove them straight away. 1237 self.Cleanup() 1238 return False
1239
1240 - def _RemoveSnapshot(self, disk_index):
1241 """Removes an LVM snapshot. 1242 1243 @type disk_index: number 1244 @param disk_index: Index of the snapshot to be removed 1245 1246 """ 1247 snapshot = self._snapshots[disk_index] 1248 if snapshot is not None and not self._snapshots_removed[disk_index]: 1249 src_node_uuid = self._instance.primary_node 1250 src_node_name = self._lu.cfg.GetNodeName(src_node_uuid) 1251 1252 self._feedback_fn("Removing snapshot of disk/%s on node %s" % 1253 (disk_index, src_node_name)) 1254 1255 result = self._lu.rpc.call_blockdev_remove(src_node_uuid, 1256 (snapshot, self._instance)) 1257 if result.fail_msg: 1258 self._lu.LogWarning("Could not remove snapshot for disk/%d from node" 1259 " %s: %s", disk_index, src_node_name, 1260 result.fail_msg) 1261 else: 1262 self._snapshots_removed[disk_index] = True
1263
1264 - def _GetDisksToTransfer(self):
1265 """Returns disks to be transferred, whether snapshots or instance disks. 1266 1267 @rtype: list of L{objects.Disk} 1268 @return: The disks to transfer 1269 1270 """ 1271 if self._SnapshotsReady(): 1272 return self._snapshots 1273 else: 1274 return self._lu.cfg.GetInstanceDisks(self._instance.uuid)
1275
1276 - def _GetDiskLabel(self, idx):
1277 """Returns a label which should be used to represent a disk to transfer. 1278 1279 @type idx: int 1280 @param idx: The disk index 1281 @rtype: string 1282 1283 """ 1284 if self._SnapshotsReady(): 1285 return "snapshot/%d" % idx 1286 else: 1287 return "disk/%d" % idx
1288
1289 - def LocalExport(self, dest_node, compress):
1290 """Intra-cluster instance export. 1291 1292 @type dest_node: L{objects.Node} 1293 @param dest_node: Destination node 1294 @type compress: string 1295 @param compress: Compression tool to use 1296 1297 """ 1298 disks_to_transfer = self._GetDisksToTransfer() 1299 1300 instance = self._instance 1301 src_node_uuid = instance.primary_node 1302 1303 transfers = [] 1304 1305 for idx, dev in enumerate(disks_to_transfer): 1306 path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name, 1307 dev.uuid) 1308 1309 finished_fn = compat.partial(self._TransferFinished, idx) 1310 1311 if instance.os: 1312 src_io = constants.IEIO_SCRIPT 1313 src_ioargs = ((dev, instance), idx) 1314 else: 1315 src_io = constants.IEIO_RAW_DISK 1316 src_ioargs = (dev, instance) 1317 1318 # FIXME: pass debug option from opcode to backend 1319 dt = DiskTransfer(self._GetDiskLabel(idx), src_io, src_ioargs, 1320 constants.IEIO_FILE, (path, ), finished_fn) 1321 transfers.append(dt) 1322 1323 # Actually export data 1324 dresults = TransferInstanceData(self._lu, self._feedback_fn, 1325 src_node_uuid, dest_node.uuid, 1326 dest_node.secondary_ip, 1327 compress, 1328 instance, transfers) 1329 1330 assert len(dresults) == len(instance.disks) 1331 1332 # Finalize only if all the disks have been exported successfully 1333 if all(dresults): 1334 self._feedback_fn("Finalizing export on %s" % dest_node.name) 1335 result = self._lu.rpc.call_finalize_export(dest_node.uuid, instance, 1336 disks_to_transfer) 1337 msg = result.fail_msg 1338 fin_resu = not msg 1339 if msg: 1340 self._lu.LogWarning("Could not finalize export for instance %s" 1341 " on node %s: %s", instance.name, dest_node.name, 1342 msg) 1343 else: 1344 fin_resu = False 1345 self._lu.LogWarning("Some disk exports have failed; there may be " 1346 "leftover data for instance %s on node %s", 1347 instance.name, dest_node.name) 1348 1349 return (fin_resu, dresults)
1350
1351 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, compress, timeouts):
1352 """Inter-cluster instance export. 1353 1354 @type disk_info: list 1355 @param disk_info: Per-disk destination information 1356 @type key_name: string 1357 @param key_name: Name of X509 key to use 1358 @type dest_ca_pem: string 1359 @param dest_ca_pem: Destination X509 CA in PEM format 1360 @type compress: string 1361 @param compress: Compression tool to use 1362 @type timeouts: L{ImportExportTimeouts} 1363 @param timeouts: Timeouts for this import 1364 1365 """ 1366 instance = self._instance 1367 disks_to_transfer = self._GetDisksToTransfer() 1368 1369 assert len(disk_info) == len(disks_to_transfer) 1370 1371 cbs = _RemoteExportCb(self._feedback_fn, len(disks_to_transfer)) 1372 1373 ieloop = ImportExportLoop(self._lu) 1374 try: 1375 for idx, (dev, (host, port, magic)) in enumerate(zip(disks_to_transfer, 1376 disk_info)): 1377 # Decide whether to use IPv6 1378 ipv6 = netutils.IP6Address.IsValid(host) 1379 1380 opts = objects.ImportExportOptions(key_name=key_name, 1381 ca_pem=dest_ca_pem, 1382 magic=magic, 1383 compress=compress, 1384 ipv6=ipv6) 1385 1386 if instance.os: 1387 src_io = constants.IEIO_SCRIPT 1388 src_ioargs = ((dev, instance), idx) 1389 else: 1390 src_io = constants.IEIO_RAW_DISK 1391 src_ioargs = (dev, instance) 1392 1393 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port)) 1394 finished_fn = compat.partial(self._TransferFinished, idx) 1395 ieloop.Add(DiskExport(self._lu, instance.primary_node, 1396 opts, host, port, instance, "disk%d" % idx, 1397 src_io, src_ioargs, 1398 timeouts, cbs, private=(idx, finished_fn))) 1399 1400 ieloop.Run() 1401 finally: 1402 ieloop.FinalizeAll() 1403 1404 return (True, cbs.disk_results)
1405
1406 - def _TransferFinished(self, idx):
1407 """Called once a transfer has finished. 1408 1409 @type idx: number 1410 @param idx: Disk index 1411 1412 """ 1413 logging.debug("Transfer %s finished", idx) 1414 self._RemoveSnapshot(idx)
1415
1416 - def Cleanup(self):
1417 """Remove all snapshots. 1418 1419 """ 1420 for idx in range(len(self._snapshots)): 1421 self._RemoveSnapshot(idx)
1422
1423 1424 -class _RemoteImportCb(ImportExportCbBase):
1425 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count, 1426 external_address):
1427 """Initializes this class. 1428 1429 @type cds: string 1430 @param cds: Cluster domain secret 1431 @type x509_cert_pem: string 1432 @param x509_cert_pem: CA used for signing import key 1433 @type disk_count: number 1434 @param disk_count: Number of disks 1435 @type external_address: string 1436 @param external_address: External address of destination node 1437 1438 """ 1439 ImportExportCbBase.__init__(self) 1440 self._feedback_fn = feedback_fn 1441 self._cds = cds 1442 self._x509_cert_pem = x509_cert_pem 1443 self._disk_count = disk_count 1444 self._external_address = external_address 1445 1446 self._dresults = [None] * disk_count 1447 self._daemon_port = [None] * disk_count 1448 1449 self._salt = utils.GenerateSecret(8)
1450 1451 @property
1452 - def disk_results(self):
1453 """Returns per-disk results. 1454 1455 """ 1456 return self._dresults
1457
1458 - def _CheckAllListening(self):
1459 """Checks whether all daemons are listening. 1460 1461 If all daemons are listening, the information is sent to the client. 1462 1463 """ 1464 if not compat.all(dp is not None for dp in self._daemon_port): 1465 return 1466 1467 host = self._external_address 1468 1469 disks = [] 1470 for idx, (port, magic) in enumerate(self._daemon_port): 1471 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt, 1472 idx, host, port, magic)) 1473 1474 assert len(disks) == self._disk_count 1475 1476 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, { 1477 "disks": disks, 1478 "x509_ca": self._x509_cert_pem, 1479 })
1480
1481 - def ReportListening(self, ie, private, _):
1482 """Called when daemon started listening. 1483 1484 """ 1485 (idx, ) = private 1486 1487 self._feedback_fn("Disk %s is now listening" % idx) 1488 1489 assert self._daemon_port[idx] is None 1490 1491 self._daemon_port[idx] = (ie.listen_port, ie.magic) 1492 1493 self._CheckAllListening()
1494
1495 - def ReportConnected(self, ie, private):
1496 """Called when a connection has been established. 1497 1498 """ 1499 (idx, ) = private 1500 1501 self._feedback_fn("Disk %s is now receiving data" % idx)
1502
1503 - def ReportFinished(self, ie, private):
1504 """Called when a transfer has finished. 1505 1506 """ 1507 (idx, ) = private 1508 1509 # Daemon is certainly no longer listening 1510 self._daemon_port[idx] = None 1511 1512 if ie.success: 1513 self._feedback_fn("Disk %s finished receiving data" % idx) 1514 else: 1515 self._feedback_fn(("Disk %s failed to receive data: %s" 1516 " (recent output: %s)") % 1517 (idx, ie.final_message, ie.recent_output)) 1518 1519 self._dresults[idx] = bool(ie.success)
1520
1521 1522 -def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca, 1523 cds, compress, timeouts):
1524 """Imports an instance from another cluster. 1525 1526 @param lu: Logical unit instance 1527 @param feedback_fn: Feedback function 1528 @type instance: L{objects.Instance} 1529 @param instance: Instance object 1530 @type pnode: L{objects.Node} 1531 @param pnode: Primary node of instance as an object 1532 @type source_x509_ca: OpenSSL.crypto.X509 1533 @param source_x509_ca: Import source's X509 CA 1534 @type cds: string 1535 @param cds: Cluster domain secret 1536 @type compress: string 1537 @param compress: Compression tool to use 1538 @type timeouts: L{ImportExportTimeouts} 1539 @param timeouts: Timeouts for this import 1540 1541 """ 1542 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, 1543 source_x509_ca) 1544 1545 magic_base = utils.GenerateSecret(6) 1546 1547 # Decide whether to use IPv6 1548 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip) 1549 1550 # Create crypto key 1551 result = lu.rpc.call_x509_cert_create(instance.primary_node, 1552 constants.RIE_CERT_VALIDITY) 1553 result.Raise("Can't create X509 key and certificate on %s" % result.node) 1554 1555 (x509_key_name, x509_cert_pem) = result.payload 1556 try: 1557 # Load certificate 1558 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, 1559 x509_cert_pem) 1560 1561 # Sign certificate 1562 signed_x509_cert_pem = \ 1563 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8)) 1564 1565 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem, 1566 len(instance.disks), pnode.primary_ip) 1567 1568 ieloop = ImportExportLoop(lu) 1569 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1570 try: 1571 for idx, dev in enumerate(inst_disks): 1572 magic = _GetInstDiskMagic(magic_base, instance.name, idx) 1573 1574 # Import daemon options 1575 opts = objects.ImportExportOptions(key_name=x509_key_name, 1576 ca_pem=source_ca_pem, 1577 magic=magic, 1578 compress=compress, 1579 ipv6=ipv6) 1580 1581 if instance.os: 1582 src_io = constants.IEIO_SCRIPT 1583 src_ioargs = ((dev, instance), idx) 1584 else: 1585 src_io = constants.IEIO_RAW_DISK 1586 src_ioargs = (dev, instance) 1587 1588 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance, 1589 "disk%d" % idx, 1590 src_io, src_ioargs, 1591 timeouts, cbs, private=(idx, ))) 1592 1593 ieloop.Run() 1594 finally: 1595 ieloop.FinalizeAll() 1596 finally: 1597 # Remove crypto key and certificate 1598 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name) 1599 result.Raise("Can't remove X509 key and certificate on %s" % result.node) 1600 1601 return cbs.disk_results
1602
1603 1604 -def _GetImportExportHandshakeMessage(version):
1605 """Returns the handshake message for a RIE protocol version. 1606 1607 @type version: number 1608 1609 """ 1610 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1611
1612 1613 -def ComputeRemoteExportHandshake(cds):
1614 """Computes the remote import/export handshake. 1615 1616 @type cds: string 1617 @param cds: Cluster domain secret 1618 1619 """ 1620 salt = utils.GenerateSecret(8) 1621 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION) 1622 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1623
1624 1625 -def CheckRemoteExportHandshake(cds, handshake):
1626 """Checks the handshake of a remote import/export. 1627 1628 @type cds: string 1629 @param cds: Cluster domain secret 1630 @type handshake: sequence 1631 @param handshake: Handshake sent by remote peer 1632 1633 """ 1634 try: 1635 (version, hmac_digest, hmac_salt) = handshake 1636 except (TypeError, ValueError), err: 1637 return "Invalid data: %s" % err 1638 1639 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version), 1640 hmac_digest, salt=hmac_salt): 1641 return "Hash didn't match, clusters don't share the same domain secret" 1642 1643 if version != constants.RIE_VERSION: 1644 return ("Clusters don't have the same remote import/export protocol" 1645 " (local=%s, remote=%s)" % 1646 (constants.RIE_VERSION, version)) 1647 1648 return None
1649
1650 1651 -def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1652 """Returns the hashed text for import/export disk information. 1653 1654 @type disk_index: number 1655 @param disk_index: Index of disk (included in hash) 1656 @type host: string 1657 @param host: Hostname 1658 @type port: number 1659 @param port: Daemon port 1660 @type magic: string 1661 @param magic: Magic value 1662 1663 """ 1664 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1665
1666 1667 -def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1668 """Verifies received disk information for an export. 1669 1670 @type cds: string 1671 @param cds: Cluster domain secret 1672 @type disk_index: number 1673 @param disk_index: Index of disk (included in hash) 1674 @type disk_info: sequence 1675 @param disk_info: Disk information sent by remote peer 1676 1677 """ 1678 try: 1679 (host, port, magic, hmac_digest, hmac_salt) = disk_info 1680 except (TypeError, ValueError), err: 1681 raise errors.GenericError("Invalid data: %s" % err) 1682 1683 if not (host and port and magic): 1684 raise errors.GenericError("Missing destination host, port or magic") 1685 1686 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1687 1688 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt): 1689 raise errors.GenericError("HMAC is wrong") 1690 1691 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host): 1692 destination = host 1693 else: 1694 destination = netutils.Hostname.GetNormalizedName(host) 1695 1696 return (destination, 1697 utils.ValidateServiceName(port), 1698 magic)
1699
1700 1701 -def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1702 """Computes the signed disk information for a remote import. 1703 1704 @type cds: string 1705 @param cds: Cluster domain secret 1706 @type salt: string 1707 @param salt: HMAC salt 1708 @type disk_index: number 1709 @param disk_index: Index of disk (included in hash) 1710 @type host: string 1711 @param host: Hostname 1712 @type port: number 1713 @param port: Daemon port 1714 @type magic: string 1715 @param magic: Magic value 1716 1717 """ 1718 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1719 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt) 1720 return (host, port, magic, hmac_digest, salt)
1721
1722 1723 -def CalculateGroupIPolicy(cluster, group):
1724 """Calculate instance policy for group. 1725 1726 """ 1727 return cluster.SimpleFillIPolicy(group.ipolicy)
1728
1729 1730 -def ComputeDiskSize(disks):
1731 """Compute disk size requirements according to disk template 1732 1733 """ 1734 # Required free disk space as a function of disk and swap space 1735 def size_f(d): 1736 dev_type = d[constants.IDISK_TYPE] 1737 req_size_dict = { 1738 constants.DT_DISKLESS: 0, 1739 constants.DT_PLAIN: d[constants.IDISK_SIZE], 1740 # Extra space for drbd metadata is added to each disk 1741 constants.DT_DRBD8: 1742 d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE, 1743 constants.DT_FILE: d[constants.IDISK_SIZE], 1744 constants.DT_SHARED_FILE: d[constants.IDISK_SIZE], 1745 constants.DT_GLUSTER: d[constants.IDISK_SIZE], 1746 constants.DT_BLOCK: 0, 1747 constants.DT_RBD: d[constants.IDISK_SIZE], 1748 constants.DT_EXT: d[constants.IDISK_SIZE], 1749 } 1750 if dev_type not in req_size_dict: 1751 raise errors.ProgrammerError("Disk template '%s' size requirement" 1752 " is unknown" % dev_type) 1753 return req_size_dict[dev_type]
1754 1755 return sum(map(size_f, disks)) 1756