Package ganeti :: Package masterd :: Module instance
[hide private]
[frames] | no frames]

Source Code for Module ganeti.masterd.instance

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2010, 2011 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Instance-related functions and classes for masterd. 
  23   
  24  """ 
  25   
  26  import logging 
  27  import time 
  28  import OpenSSL 
  29   
  30  from ganeti import constants 
  31  from ganeti import errors 
  32  from ganeti import compat 
  33  from ganeti import utils 
  34  from ganeti import objects 
  35  from ganeti import netutils 
  36  from ganeti import pathutils 
37 38 39 -class _ImportExportError(Exception):
40 """Local exception to report import/export errors. 41 42 """
43
44 45 -class ImportExportTimeouts(object):
46 #: Time until daemon starts writing status file 47 DEFAULT_READY_TIMEOUT = 10 48 49 #: Length of time until errors cause hard failure 50 DEFAULT_ERROR_TIMEOUT = 10 51 52 #: Time after which daemon must be listening 53 DEFAULT_LISTEN_TIMEOUT = 10 54 55 #: Progress update interval 56 DEFAULT_PROGRESS_INTERVAL = 60 57 58 __slots__ = [ 59 "error", 60 "ready", 61 "listen", 62 "connect", 63 "progress", 64 ] 65
66 - def __init__(self, connect, 67 listen=DEFAULT_LISTEN_TIMEOUT, 68 error=DEFAULT_ERROR_TIMEOUT, 69 ready=DEFAULT_READY_TIMEOUT, 70 progress=DEFAULT_PROGRESS_INTERVAL):
71 """Initializes this class. 72 73 @type connect: number 74 @param connect: Timeout for establishing connection 75 @type listen: number 76 @param listen: Timeout for starting to listen for connections 77 @type error: number 78 @param error: Length of time until errors cause hard failure 79 @type ready: number 80 @param ready: Timeout for daemon to become ready 81 @type progress: number 82 @param progress: Progress update interval 83 84 """ 85 self.error = error 86 self.ready = ready 87 self.listen = listen 88 self.connect = connect 89 self.progress = progress
90
91 92 -class ImportExportCbBase(object):
93 """Callbacks for disk import/export. 94 95 """
96 - def ReportListening(self, ie, private, component):
97 """Called when daemon started listening. 98 99 @type ie: Subclass of L{_DiskImportExportBase} 100 @param ie: Import/export object 101 @param private: Private data passed to import/export object 102 @param component: transfer component name 103 104 """
105
106 - def ReportConnected(self, ie, private):
107 """Called when a connection has been established. 108 109 @type ie: Subclass of L{_DiskImportExportBase} 110 @param ie: Import/export object 111 @param private: Private data passed to import/export object 112 113 """
114
115 - def ReportProgress(self, ie, private):
116 """Called when new progress information should be reported. 117 118 @type ie: Subclass of L{_DiskImportExportBase} 119 @param ie: Import/export object 120 @param private: Private data passed to import/export object 121 122 """
123
124 - def ReportFinished(self, ie, private):
125 """Called when a transfer has finished. 126 127 @type ie: Subclass of L{_DiskImportExportBase} 128 @param ie: Import/export object 129 @param private: Private data passed to import/export object 130 131 """
132
133 134 -class _DiskImportExportBase(object):
135 MODE_TEXT = None 136
137 - def __init__(self, lu, node_uuid, opts, 138 instance, component, timeouts, cbs, private=None):
139 """Initializes this class. 140 141 @param lu: Logical unit instance 142 @type node_uuid: string 143 @param node_uuid: Node UUID for import 144 @type opts: L{objects.ImportExportOptions} 145 @param opts: Import/export daemon options 146 @type instance: L{objects.Instance} 147 @param instance: Instance object 148 @type component: string 149 @param component: which part of the instance is being imported 150 @type timeouts: L{ImportExportTimeouts} 151 @param timeouts: Timeouts for this import 152 @type cbs: L{ImportExportCbBase} 153 @param cbs: Callbacks 154 @param private: Private data for callback functions 155 156 """ 157 assert self.MODE_TEXT 158 159 self._lu = lu 160 self.node_uuid = node_uuid 161 self.node_name = lu.cfg.GetNodeName(node_uuid) 162 self._opts = opts.Copy() 163 self._instance = instance 164 self._component = component 165 self._timeouts = timeouts 166 self._cbs = cbs 167 self._private = private 168 169 # Set master daemon's timeout in options for import/export daemon 170 assert self._opts.connect_timeout is None 171 self._opts.connect_timeout = timeouts.connect 172 173 # Parent loop 174 self._loop = None 175 176 # Timestamps 177 self._ts_begin = None 178 self._ts_connected = None 179 self._ts_finished = None 180 self._ts_cleanup = None 181 self._ts_last_progress = None 182 self._ts_last_error = None 183 184 # Transfer status 185 self.success = None 186 self.final_message = None 187 188 # Daemon status 189 self._daemon_name = None 190 self._daemon = None
191 192 @property
193 - def recent_output(self):
194 """Returns the most recent output from the daemon. 195 196 """ 197 if self._daemon: 198 return "\n".join(self._daemon.recent_output) 199 200 return None
201 202 @property
203 - def progress(self):
204 """Returns transfer progress information. 205 206 """ 207 if not self._daemon: 208 return None 209 210 return (self._daemon.progress_mbytes, 211 self._daemon.progress_throughput, 212 self._daemon.progress_percent, 213 self._daemon.progress_eta)
214 215 @property
216 - def magic(self):
217 """Returns the magic value for this import/export. 218 219 """ 220 return self._opts.magic
221 222 @property
223 - def active(self):
224 """Determines whether this transport is still active. 225 226 """ 227 return self.success is None
228 229 @property
230 - def loop(self):
231 """Returns parent loop. 232 233 @rtype: L{ImportExportLoop} 234 235 """ 236 return self._loop
237
238 - def SetLoop(self, loop):
239 """Sets the parent loop. 240 241 @type loop: L{ImportExportLoop} 242 243 """ 244 if self._loop: 245 raise errors.ProgrammerError("Loop can only be set once") 246 247 self._loop = loop
248
249 - def _StartDaemon(self):
250 """Starts the import/export daemon. 251 252 """ 253 raise NotImplementedError()
254
255 - def CheckDaemon(self):
256 """Checks whether daemon has been started and if not, starts it. 257 258 @rtype: string 259 @return: Daemon name 260 261 """ 262 assert self._ts_cleanup is None 263 264 if self._daemon_name is None: 265 assert self._ts_begin is None 266 267 result = self._StartDaemon() 268 if result.fail_msg: 269 raise _ImportExportError("Failed to start %s on %s: %s" % 270 (self.MODE_TEXT, self.node_name, 271 result.fail_msg)) 272 273 daemon_name = result.payload 274 275 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name, 276 self.node_name) 277 278 self._ts_begin = time.time() 279 self._daemon_name = daemon_name 280 281 return self._daemon_name
282
283 - def GetDaemonName(self):
284 """Returns the daemon name. 285 286 """ 287 assert self._daemon_name, "Daemon has not been started" 288 assert self._ts_cleanup is None 289 return self._daemon_name
290
291 - def Abort(self):
292 """Sends SIGTERM to import/export daemon (if still active). 293 294 """ 295 if self._daemon_name: 296 self._lu.LogWarning("Aborting %s '%s' on %s", 297 self.MODE_TEXT, self._daemon_name, self.node_uuid) 298 result = self._lu.rpc.call_impexp_abort(self.node_uuid, self._daemon_name) 299 if result.fail_msg: 300 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s", 301 self.MODE_TEXT, self._daemon_name, 302 self.node_uuid, result.fail_msg) 303 return False 304 305 return True
306
307 - def _SetDaemonData(self, data):
308 """Internal function for updating status daemon data. 309 310 @type data: L{objects.ImportExportStatus} 311 @param data: Daemon status data 312 313 """ 314 assert self._ts_begin is not None 315 316 if not data: 317 if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready): 318 raise _ImportExportError("Didn't become ready after %s seconds" % 319 self._timeouts.ready) 320 321 return False 322 323 self._daemon = data 324 325 return True
326
327 - def SetDaemonData(self, success, data):
328 """Updates daemon status data. 329 330 @type success: bool 331 @param success: Whether fetching data was successful or not 332 @type data: L{objects.ImportExportStatus} 333 @param data: Daemon status data 334 335 """ 336 if not success: 337 if self._ts_last_error is None: 338 self._ts_last_error = time.time() 339 340 elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error): 341 raise _ImportExportError("Too many errors while updating data") 342 343 return False 344 345 self._ts_last_error = None 346 347 return self._SetDaemonData(data)
348
349 - def CheckListening(self):
350 """Checks whether the daemon is listening. 351 352 """ 353 raise NotImplementedError()
354
355 - def _GetConnectedCheckEpoch(self):
356 """Returns timeout to calculate connect timeout. 357 358 """ 359 raise NotImplementedError()
360
361 - def CheckConnected(self):
362 """Checks whether the daemon is connected. 363 364 @rtype: bool 365 @return: Whether the daemon is connected 366 367 """ 368 assert self._daemon, "Daemon status missing" 369 370 if self._ts_connected is not None: 371 return True 372 373 if self._daemon.connected: 374 self._ts_connected = time.time() 375 376 # TODO: Log remote peer 377 logging.debug("%s '%s' on %s is now connected", 378 self.MODE_TEXT, self._daemon_name, self.node_uuid) 379 380 self._cbs.ReportConnected(self, self._private) 381 382 return True 383 384 if utils.TimeoutExpired(self._GetConnectedCheckEpoch(), 385 self._timeouts.connect): 386 raise _ImportExportError("Not connected after %s seconds" % 387 self._timeouts.connect) 388 389 return False
390
391 - def _CheckProgress(self):
392 """Checks whether a progress update should be reported. 393 394 """ 395 if ((self._ts_last_progress is None or 396 utils.TimeoutExpired(self._ts_last_progress, 397 self._timeouts.progress)) and 398 self._daemon and 399 self._daemon.progress_mbytes is not None and 400 self._daemon.progress_throughput is not None): 401 self._cbs.ReportProgress(self, self._private) 402 self._ts_last_progress = time.time()
403
404 - def CheckFinished(self):
405 """Checks whether the daemon exited. 406 407 @rtype: bool 408 @return: Whether the transfer is finished 409 410 """ 411 assert self._daemon, "Daemon status missing" 412 413 if self._ts_finished: 414 return True 415 416 if self._daemon.exit_status is None: 417 # TODO: Adjust delay for ETA expiring soon 418 self._CheckProgress() 419 return False 420 421 self._ts_finished = time.time() 422 423 self._ReportFinished(self._daemon.exit_status == 0, 424 self._daemon.error_message) 425 426 return True
427
428 - def _ReportFinished(self, success, message):
429 """Transfer is finished or daemon exited. 430 431 @type success: bool 432 @param success: Whether the transfer was successful 433 @type message: string 434 @param message: Error message 435 436 """ 437 assert self.success is None 438 439 self.success = success 440 self.final_message = message 441 442 if success: 443 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT, 444 self._daemon_name, self.node_uuid) 445 elif self._daemon_name: 446 self._lu.LogWarning("%s '%s' on %s failed: %s", 447 self.MODE_TEXT, self._daemon_name, 448 self._lu.cfg.GetNodeName(self.node_uuid), 449 message) 450 else: 451 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT, 452 self._lu.cfg.GetNodeName(self.node_uuid), message) 453 454 self._cbs.ReportFinished(self, self._private)
455
456 - def _Finalize(self):
457 """Makes the RPC call to finalize this import/export. 458 459 """ 460 return self._lu.rpc.call_impexp_cleanup(self.node_uuid, self._daemon_name)
461
462 - def Finalize(self, error=None):
463 """Finalizes this import/export. 464 465 """ 466 if self._daemon_name: 467 logging.info("Finalizing %s '%s' on %s", 468 self.MODE_TEXT, self._daemon_name, self.node_uuid) 469 470 result = self._Finalize() 471 if result.fail_msg: 472 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s", 473 self.MODE_TEXT, self._daemon_name, 474 self.node_uuid, result.fail_msg) 475 return False 476 477 # Daemon is no longer running 478 self._daemon_name = None 479 self._ts_cleanup = time.time() 480 481 if error: 482 self._ReportFinished(False, error) 483 484 return True
485
486 487 -class DiskImport(_DiskImportExportBase):
488 MODE_TEXT = "import" 489
490 - def __init__(self, lu, node_uuid, opts, instance, component, 491 dest, dest_args, timeouts, cbs, private=None):
492 """Initializes this class. 493 494 @param lu: Logical unit instance 495 @type node_uuid: string 496 @param node_uuid: Node name for import 497 @type opts: L{objects.ImportExportOptions} 498 @param opts: Import/export daemon options 499 @type instance: L{objects.Instance} 500 @param instance: Instance object 501 @type component: string 502 @param component: which part of the instance is being imported 503 @param dest: I/O destination 504 @param dest_args: I/O arguments 505 @type timeouts: L{ImportExportTimeouts} 506 @param timeouts: Timeouts for this import 507 @type cbs: L{ImportExportCbBase} 508 @param cbs: Callbacks 509 @param private: Private data for callback functions 510 511 """ 512 _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance, 513 component, timeouts, cbs, private) 514 self._dest = dest 515 self._dest_args = dest_args 516 517 # Timestamps 518 self._ts_listening = None
519 520 @property
521 - def listen_port(self):
522 """Returns the port the daemon is listening on. 523 524 """ 525 if self._daemon: 526 return self._daemon.listen_port 527 528 return None
529
530 - def _StartDaemon(self):
531 """Starts the import daemon. 532 533 """ 534 return self._lu.rpc.call_import_start(self.node_uuid, self._opts, 535 self._instance, self._component, 536 (self._dest, self._dest_args))
537
538 - def CheckListening(self):
539 """Checks whether the daemon is listening. 540 541 @rtype: bool 542 @return: Whether the daemon is listening 543 544 """ 545 assert self._daemon, "Daemon status missing" 546 547 if self._ts_listening is not None: 548 return True 549 550 port = self._daemon.listen_port 551 if port is not None: 552 self._ts_listening = time.time() 553 554 logging.debug("Import '%s' on %s is now listening on port %s", 555 self._daemon_name, self.node_uuid, port) 556 557 self._cbs.ReportListening(self, self._private, self._component) 558 559 return True 560 561 if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen): 562 raise _ImportExportError("Not listening after %s seconds" % 563 self._timeouts.listen) 564 565 return False
566
567 - def _GetConnectedCheckEpoch(self):
568 """Returns the time since we started listening. 569 570 """ 571 assert self._ts_listening is not None, \ 572 ("Checking whether an import is connected is only useful" 573 " once it's been listening") 574 575 return self._ts_listening
576
577 578 -class DiskExport(_DiskImportExportBase):
579 MODE_TEXT = "export" 580
581 - def __init__(self, lu, node_uuid, opts, dest_host, dest_port, 582 instance, component, source, source_args, 583 timeouts, cbs, private=None):
584 """Initializes this class. 585 586 @param lu: Logical unit instance 587 @type node_uuid: string 588 @param node_uuid: Node UUID for import 589 @type opts: L{objects.ImportExportOptions} 590 @param opts: Import/export daemon options 591 @type dest_host: string 592 @param dest_host: Destination host name or IP address 593 @type dest_port: number 594 @param dest_port: Destination port number 595 @type instance: L{objects.Instance} 596 @param instance: Instance object 597 @type component: string 598 @param component: which part of the instance is being imported 599 @param source: I/O source 600 @param source_args: I/O source 601 @type timeouts: L{ImportExportTimeouts} 602 @param timeouts: Timeouts for this import 603 @type cbs: L{ImportExportCbBase} 604 @param cbs: Callbacks 605 @param private: Private data for callback functions 606 607 """ 608 _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance, 609 component, timeouts, cbs, private) 610 self._dest_host = dest_host 611 self._dest_port = dest_port 612 self._source = source 613 self._source_args = source_args
614
615 - def _StartDaemon(self):
616 """Starts the export daemon. 617 618 """ 619 return self._lu.rpc.call_export_start(self.node_uuid, self._opts, 620 self._dest_host, self._dest_port, 621 self._instance, self._component, 622 (self._source, self._source_args))
623
624 - def CheckListening(self):
625 """Checks whether the daemon is listening. 626 627 """ 628 # Only an import can be listening 629 return True
630
631 - def _GetConnectedCheckEpoch(self):
632 """Returns the time since the daemon started. 633 634 """ 635 assert self._ts_begin is not None 636 637 return self._ts_begin
638
639 640 -def FormatProgress(progress):
641 """Formats progress information for user consumption 642 643 """ 644 (mbytes, throughput, percent, eta) = progress 645 646 parts = [ 647 utils.FormatUnit(mbytes, "h"), 648 649 # Not using FormatUnit as it doesn't support kilobytes 650 "%0.1f MiB/s" % throughput, 651 ] 652 653 if percent is not None: 654 parts.append("%d%%" % percent) 655 656 if eta is not None: 657 parts.append("ETA %s" % utils.FormatSeconds(eta)) 658 659 return utils.CommaJoin(parts)
660
661 662 -class ImportExportLoop:
663 MIN_DELAY = 1.0 664 MAX_DELAY = 20.0 665
666 - def __init__(self, lu):
667 """Initializes this class. 668 669 """ 670 self._lu = lu 671 self._queue = [] 672 self._pending_add = []
673
674 - def Add(self, diskie):
675 """Adds an import/export object to the loop. 676 677 @type diskie: Subclass of L{_DiskImportExportBase} 678 @param diskie: Import/export object 679 680 """ 681 assert diskie not in self._pending_add 682 assert diskie.loop is None 683 684 diskie.SetLoop(self) 685 686 # Adding new objects to a staging list is necessary, otherwise the main 687 # loop gets confused if callbacks modify the queue while the main loop is 688 # iterating over it. 689 self._pending_add.append(diskie)
690 691 @staticmethod
692 - def _CollectDaemonStatus(lu, daemons):
693 """Collects the status for all import/export daemons. 694 695 """ 696 daemon_status = {} 697 698 for node_name, names in daemons.iteritems(): 699 result = lu.rpc.call_impexp_status(node_name, names) 700 if result.fail_msg: 701 lu.LogWarning("Failed to get daemon status on %s: %s", 702 node_name, result.fail_msg) 703 continue 704 705 assert len(names) == len(result.payload) 706 707 daemon_status[node_name] = dict(zip(names, result.payload)) 708 709 return daemon_status
710 711 @staticmethod
712 - def _GetActiveDaemonNames(queue):
713 """Gets the names of all active daemons. 714 715 """ 716 result = {} 717 for diskie in queue: 718 if not diskie.active: 719 continue 720 721 try: 722 # Start daemon if necessary 723 daemon_name = diskie.CheckDaemon() 724 except _ImportExportError, err: 725 logging.exception("%s failed", diskie.MODE_TEXT) 726 diskie.Finalize(error=str(err)) 727 continue 728 729 result.setdefault(diskie.node_name, []).append(daemon_name) 730 731 assert len(queue) >= len(result) 732 assert len(queue) >= sum([len(names) for names in result.itervalues()]) 733 734 logging.debug("daemons=%r", result) 735 736 return result
737
738 - def _AddPendingToQueue(self):
739 """Adds all pending import/export objects to the internal queue. 740 741 """ 742 assert compat.all(diskie not in self._queue and diskie.loop == self 743 for diskie in self._pending_add) 744 745 self._queue.extend(self._pending_add) 746 747 del self._pending_add[:]
748
749 - def Run(self):
750 """Utility main loop. 751 752 """ 753 while True: 754 self._AddPendingToQueue() 755 756 # Collect all active daemon names 757 daemons = self._GetActiveDaemonNames(self._queue) 758 if not daemons: 759 break 760 761 # Collection daemon status data 762 data = self._CollectDaemonStatus(self._lu, daemons) 763 764 # Use data 765 delay = self.MAX_DELAY 766 for diskie in self._queue: 767 if not diskie.active: 768 continue 769 770 try: 771 try: 772 all_daemon_data = data[diskie.node_name] 773 except KeyError: 774 result = diskie.SetDaemonData(False, None) 775 else: 776 result = \ 777 diskie.SetDaemonData(True, 778 all_daemon_data[diskie.GetDaemonName()]) 779 780 if not result: 781 # Daemon not yet ready, retry soon 782 delay = min(3.0, delay) 783 continue 784 785 if diskie.CheckFinished(): 786 # Transfer finished 787 diskie.Finalize() 788 continue 789 790 # Normal case: check again in 5 seconds 791 delay = min(5.0, delay) 792 793 if not diskie.CheckListening(): 794 # Not yet listening, retry soon 795 delay = min(1.0, delay) 796 continue 797 798 if not diskie.CheckConnected(): 799 # Not yet connected, retry soon 800 delay = min(1.0, delay) 801 continue 802 803 except _ImportExportError, err: 804 logging.exception("%s failed", diskie.MODE_TEXT) 805 diskie.Finalize(error=str(err)) 806 807 if not compat.any(diskie.active for diskie in self._queue): 808 break 809 810 # Wait a bit 811 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay)) 812 logging.debug("Waiting for %ss", delay) 813 time.sleep(delay)
814
815 - def FinalizeAll(self):
816 """Finalizes all pending transfers. 817 818 """ 819 success = True 820 821 for diskie in self._queue: 822 success = diskie.Finalize() and success 823 824 return success
825
826 827 -class _TransferInstCbBase(ImportExportCbBase):
828 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node_uuid, 829 src_cbs, dest_node_uuid, dest_ip):
830 """Initializes this class. 831 832 """ 833 ImportExportCbBase.__init__(self) 834 835 self.lu = lu 836 self.feedback_fn = feedback_fn 837 self.instance = instance 838 self.timeouts = timeouts 839 self.src_node_uuid = src_node_uuid 840 self.src_cbs = src_cbs 841 self.dest_node_uuid = dest_node_uuid 842 self.dest_ip = dest_ip
843
844 845 -class _TransferInstSourceCb(_TransferInstCbBase):
846 - def ReportConnected(self, ie, dtp):
847 """Called when a connection has been established. 848 849 """ 850 assert self.src_cbs is None 851 assert dtp.src_export == ie 852 assert dtp.dest_import 853 854 self.feedback_fn("%s is sending data on %s" % 855 (dtp.data.name, ie.node_name))
856
857 - def ReportProgress(self, ie, dtp):
858 """Called when new progress information should be reported. 859 860 """ 861 progress = ie.progress 862 if not progress: 863 return 864 865 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
866
867 - def ReportFinished(self, ie, dtp):
868 """Called when a transfer has finished. 869 870 """ 871 assert self.src_cbs is None 872 assert dtp.src_export == ie 873 assert dtp.dest_import 874 875 if ie.success: 876 self.feedback_fn("%s finished sending data" % dtp.data.name) 877 else: 878 self.feedback_fn("%s failed to send data: %s (recent output: %s)" % 879 (dtp.data.name, ie.final_message, ie.recent_output)) 880 881 dtp.RecordResult(ie.success) 882 883 cb = dtp.data.finished_fn 884 if cb: 885 cb() 886 887 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 888 # give the daemon a moment to sort things out 889 if dtp.dest_import and not ie.success: 890 dtp.dest_import.Abort()
891
892 893 -class _TransferInstDestCb(_TransferInstCbBase):
894 - def ReportListening(self, ie, dtp, component):
895 """Called when daemon started listening. 896 897 """ 898 assert self.src_cbs 899 assert dtp.src_export is None 900 assert dtp.dest_import 901 assert dtp.export_opts 902 903 self.feedback_fn("%s is now listening, starting export" % dtp.data.name) 904 905 # Start export on source node 906 de = DiskExport(self.lu, self.src_node_uuid, dtp.export_opts, 907 self.dest_ip, ie.listen_port, self.instance, 908 component, dtp.data.src_io, dtp.data.src_ioargs, 909 self.timeouts, self.src_cbs, private=dtp) 910 ie.loop.Add(de) 911 912 dtp.src_export = de
913
914 - def ReportConnected(self, ie, dtp):
915 """Called when a connection has been established. 916 917 """ 918 self.feedback_fn("%s is receiving data on %s" % 919 (dtp.data.name, 920 self.lu.cfg.GetNodeName(self.dest_node_uuid)))
921
922 - def ReportFinished(self, ie, dtp):
923 """Called when a transfer has finished. 924 925 """ 926 if ie.success: 927 self.feedback_fn("%s finished receiving data" % dtp.data.name) 928 else: 929 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" % 930 (dtp.data.name, ie.final_message, ie.recent_output)) 931 932 dtp.RecordResult(ie.success) 933 934 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 935 # give the daemon a moment to sort things out 936 if dtp.src_export and not ie.success: 937 dtp.src_export.Abort()
938
939 940 -class DiskTransfer(object):
941 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs, 942 finished_fn):
943 """Initializes this class. 944 945 @type name: string 946 @param name: User-visible name for this transfer (e.g. "disk/0") 947 @param src_io: Source I/O type 948 @param src_ioargs: Source I/O arguments 949 @param dest_io: Destination I/O type 950 @param dest_ioargs: Destination I/O arguments 951 @type finished_fn: callable 952 @param finished_fn: Function called once transfer has finished 953 954 """ 955 self.name = name 956 957 self.src_io = src_io 958 self.src_ioargs = src_ioargs 959 960 self.dest_io = dest_io 961 self.dest_ioargs = dest_ioargs 962 963 self.finished_fn = finished_fn
964
965 966 -class _DiskTransferPrivate(object):
967 - def __init__(self, data, success, export_opts):
968 """Initializes this class. 969 970 @type data: L{DiskTransfer} 971 @type success: bool 972 973 """ 974 self.data = data 975 self.success = success 976 self.export_opts = export_opts 977 978 self.src_export = None 979 self.dest_import = None
980
981 - def RecordResult(self, success):
982 """Updates the status. 983 984 One failed part will cause the whole transfer to fail. 985 986 """ 987 self.success = self.success and success
988
989 990 -def _GetInstDiskMagic(base, instance_name, index):
991 """Computes the magic value for a disk export or import. 992 993 @type base: string 994 @param base: Random seed value (can be the same for all disks of a transfer) 995 @type instance_name: string 996 @param instance_name: Name of instance 997 @type index: number 998 @param index: Disk index 999 1000 """ 1001 h = compat.sha1_hash() 1002 h.update(str(constants.RIE_VERSION)) 1003 h.update(base) 1004 h.update(instance_name) 1005 h.update(str(index)) 1006 return h.hexdigest()
1007
1008 1009 -def TransferInstanceData(lu, feedback_fn, src_node_uuid, dest_node_uuid, 1010 dest_ip, instance, all_transfers):
1011 """Transfers an instance's data from one node to another. 1012 1013 @param lu: Logical unit instance 1014 @param feedback_fn: Feedback function 1015 @type src_node_uuid: string 1016 @param src_node_uuid: Source node UUID 1017 @type dest_node_uuid: string 1018 @param dest_node_uuid: Destination node UUID 1019 @type dest_ip: string 1020 @param dest_ip: IP address of destination node 1021 @type instance: L{objects.Instance} 1022 @param instance: Instance object 1023 @type all_transfers: list of L{DiskTransfer} instances 1024 @param all_transfers: List of all disk transfers to be made 1025 @rtype: list 1026 @return: List with a boolean (True=successful, False=failed) for success for 1027 each transfer 1028 1029 """ 1030 # Disable compression for all moves as these are all within the same cluster 1031 compress = constants.IEC_NONE 1032 1033 src_node_name = lu.cfg.GetNodeName(src_node_uuid) 1034 dest_node_name = lu.cfg.GetNodeName(dest_node_uuid) 1035 1036 logging.debug("Source node %s, destination node %s, compression '%s'", 1037 src_node_name, dest_node_name, compress) 1038 1039 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) 1040 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, 1041 src_node_uuid, None, dest_node_uuid, dest_ip) 1042 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts, 1043 src_node_uuid, src_cbs, dest_node_uuid, 1044 dest_ip) 1045 1046 all_dtp = [] 1047 1048 base_magic = utils.GenerateSecret(6) 1049 1050 ieloop = ImportExportLoop(lu) 1051 try: 1052 for idx, transfer in enumerate(all_transfers): 1053 if transfer: 1054 feedback_fn("Exporting %s from %s to %s" % 1055 (transfer.name, src_node_name, dest_node_name)) 1056 1057 magic = _GetInstDiskMagic(base_magic, instance.name, idx) 1058 opts = objects.ImportExportOptions(key_name=None, ca_pem=None, 1059 compress=compress, magic=magic) 1060 1061 dtp = _DiskTransferPrivate(transfer, True, opts) 1062 1063 di = DiskImport(lu, dest_node_uuid, opts, instance, "disk%d" % idx, 1064 transfer.dest_io, transfer.dest_ioargs, 1065 timeouts, dest_cbs, private=dtp) 1066 ieloop.Add(di) 1067 1068 dtp.dest_import = di 1069 else: 1070 dtp = _DiskTransferPrivate(None, False, None) 1071 1072 all_dtp.append(dtp) 1073 1074 ieloop.Run() 1075 finally: 1076 ieloop.FinalizeAll() 1077 1078 assert len(all_dtp) == len(all_transfers) 1079 assert compat.all((dtp.src_export is None or 1080 dtp.src_export.success is not None) and 1081 (dtp.dest_import is None or 1082 dtp.dest_import.success is not None) 1083 for dtp in all_dtp), \ 1084 "Not all imports/exports are finalized" 1085 1086 return [bool(dtp.success) for dtp in all_dtp]
1087
1088 1089 -class _RemoteExportCb(ImportExportCbBase):
1090 - def __init__(self, feedback_fn, disk_count):
1091 """Initializes this class. 1092 1093 """ 1094 ImportExportCbBase.__init__(self) 1095 self._feedback_fn = feedback_fn 1096 self._dresults = [None] * disk_count
1097 1098 @property
1099 - def disk_results(self):
1100 """Returns per-disk results. 1101 1102 """ 1103 return self._dresults
1104
1105 - def ReportConnected(self, ie, private):
1106 """Called when a connection has been established. 1107 1108 """ 1109 (idx, _) = private 1110 1111 self._feedback_fn("Disk %s is now sending data" % idx)
1112
1113 - def ReportProgress(self, ie, private):
1114 """Called when new progress information should be reported. 1115 1116 """ 1117 (idx, _) = private 1118 1119 progress = ie.progress 1120 if not progress: 1121 return 1122 1123 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1124
1125 - def ReportFinished(self, ie, private):
1126 """Called when a transfer has finished. 1127 1128 """ 1129 (idx, finished_fn) = private 1130 1131 if ie.success: 1132 self._feedback_fn("Disk %s finished sending data" % idx) 1133 else: 1134 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" % 1135 (idx, ie.final_message, ie.recent_output)) 1136 1137 self._dresults[idx] = bool(ie.success) 1138 1139 if finished_fn: 1140 finished_fn()
1141
1142 1143 -class ExportInstanceHelper:
1144 - def __init__(self, lu, feedback_fn, instance):
1145 """Initializes this class. 1146 1147 @param lu: Logical unit instance 1148 @param feedback_fn: Feedback function 1149 @type instance: L{objects.Instance} 1150 @param instance: Instance object 1151 1152 """ 1153 self._lu = lu 1154 self._feedback_fn = feedback_fn 1155 self._instance = instance 1156 1157 self._snap_disks = [] 1158 self._removed_snaps = [False] * len(instance.disks)
1159
1160 - def CreateSnapshots(self):
1161 """Creates an LVM snapshot for every disk of the instance. 1162 1163 """ 1164 assert not self._snap_disks 1165 1166 instance = self._instance 1167 src_node = instance.primary_node 1168 1169 for idx, disk in enumerate(instance.disks): 1170 self._feedback_fn("Creating a snapshot of disk/%s on node %s" % 1171 (idx, src_node)) 1172 1173 # result.payload will be a snapshot of an lvm leaf of the one we 1174 # passed 1175 result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance)) 1176 new_dev = False 1177 msg = result.fail_msg 1178 if msg: 1179 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s", 1180 idx, src_node, msg) 1181 elif (not isinstance(result.payload, (tuple, list)) or 1182 len(result.payload) != 2): 1183 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid" 1184 " result '%s'", idx, src_node, result.payload) 1185 else: 1186 disk_id = tuple(result.payload) 1187 disk_params = constants.DISK_LD_DEFAULTS[constants.DT_PLAIN].copy() 1188 new_dev = objects.Disk(dev_type=constants.DT_PLAIN, size=disk.size, 1189 logical_id=disk_id, physical_id=disk_id, 1190 iv_name=disk.iv_name, 1191 params=disk_params) 1192 1193 self._snap_disks.append(new_dev) 1194 1195 assert len(self._snap_disks) == len(instance.disks) 1196 assert len(self._removed_snaps) == len(instance.disks)
1197
1198 - def _RemoveSnapshot(self, disk_index):
1199 """Removes an LVM snapshot. 1200 1201 @type disk_index: number 1202 @param disk_index: Index of the snapshot to be removed 1203 1204 """ 1205 disk = self._snap_disks[disk_index] 1206 if disk and not self._removed_snaps[disk_index]: 1207 src_node = self._instance.primary_node 1208 1209 self._feedback_fn("Removing snapshot of disk/%s on node %s" % 1210 (disk_index, src_node)) 1211 1212 result = self._lu.rpc.call_blockdev_remove(src_node, disk) 1213 if result.fail_msg: 1214 self._lu.LogWarning("Could not remove snapshot for disk/%d from node" 1215 " %s: %s", disk_index, src_node, result.fail_msg) 1216 else: 1217 self._removed_snaps[disk_index] = True
1218
1219 - def LocalExport(self, dest_node):
1220 """Intra-cluster instance export. 1221 1222 @type dest_node: L{objects.Node} 1223 @param dest_node: Destination node 1224 1225 """ 1226 instance = self._instance 1227 src_node_uuid = instance.primary_node 1228 1229 assert len(self._snap_disks) == len(instance.disks) 1230 1231 transfers = [] 1232 1233 for idx, dev in enumerate(self._snap_disks): 1234 if not dev: 1235 transfers.append(None) 1236 continue 1237 1238 path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name, 1239 dev.physical_id[1]) 1240 1241 finished_fn = compat.partial(self._TransferFinished, idx) 1242 1243 # FIXME: pass debug option from opcode to backend 1244 dt = DiskTransfer("snapshot/%s" % idx, 1245 constants.IEIO_SCRIPT, (dev, idx), 1246 constants.IEIO_FILE, (path, ), 1247 finished_fn) 1248 transfers.append(dt) 1249 1250 # Actually export data 1251 dresults = TransferInstanceData(self._lu, self._feedback_fn, 1252 src_node_uuid, dest_node.uuid, 1253 dest_node.secondary_ip, 1254 instance, transfers) 1255 1256 assert len(dresults) == len(instance.disks) 1257 1258 self._feedback_fn("Finalizing export on %s" % dest_node.name) 1259 result = self._lu.rpc.call_finalize_export(dest_node.uuid, instance, 1260 self._snap_disks) 1261 msg = result.fail_msg 1262 fin_resu = not msg 1263 if msg: 1264 self._lu.LogWarning("Could not finalize export for instance %s" 1265 " on node %s: %s", instance.name, dest_node.name, msg) 1266 1267 return (fin_resu, dresults)
1268
1269 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1270 """Inter-cluster instance export. 1271 1272 @type disk_info: list 1273 @param disk_info: Per-disk destination information 1274 @type key_name: string 1275 @param key_name: Name of X509 key to use 1276 @type dest_ca_pem: string 1277 @param dest_ca_pem: Destination X509 CA in PEM format 1278 @type timeouts: L{ImportExportTimeouts} 1279 @param timeouts: Timeouts for this import 1280 1281 """ 1282 instance = self._instance 1283 1284 assert len(disk_info) == len(instance.disks) 1285 1286 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks)) 1287 1288 ieloop = ImportExportLoop(self._lu) 1289 try: 1290 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks, 1291 disk_info)): 1292 # Decide whether to use IPv6 1293 ipv6 = netutils.IP6Address.IsValid(host) 1294 1295 opts = objects.ImportExportOptions(key_name=key_name, 1296 ca_pem=dest_ca_pem, 1297 magic=magic, ipv6=ipv6) 1298 1299 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port)) 1300 finished_fn = compat.partial(self._TransferFinished, idx) 1301 ieloop.Add(DiskExport(self._lu, instance.primary_node, 1302 opts, host, port, instance, "disk%d" % idx, 1303 constants.IEIO_SCRIPT, (dev, idx), 1304 timeouts, cbs, private=(idx, finished_fn))) 1305 1306 ieloop.Run() 1307 finally: 1308 ieloop.FinalizeAll() 1309 1310 return (True, cbs.disk_results)
1311
1312 - def _TransferFinished(self, idx):
1313 """Called once a transfer has finished. 1314 1315 @type idx: number 1316 @param idx: Disk index 1317 1318 """ 1319 logging.debug("Transfer %s finished", idx) 1320 self._RemoveSnapshot(idx)
1321
1322 - def Cleanup(self):
1323 """Remove all snapshots. 1324 1325 """ 1326 assert len(self._removed_snaps) == len(self._instance.disks) 1327 for idx in range(len(self._instance.disks)): 1328 self._RemoveSnapshot(idx)
1329
1330 1331 -class _RemoteImportCb(ImportExportCbBase):
1332 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count, 1333 external_address):
1334 """Initializes this class. 1335 1336 @type cds: string 1337 @param cds: Cluster domain secret 1338 @type x509_cert_pem: string 1339 @param x509_cert_pem: CA used for signing import key 1340 @type disk_count: number 1341 @param disk_count: Number of disks 1342 @type external_address: string 1343 @param external_address: External address of destination node 1344 1345 """ 1346 ImportExportCbBase.__init__(self) 1347 self._feedback_fn = feedback_fn 1348 self._cds = cds 1349 self._x509_cert_pem = x509_cert_pem 1350 self._disk_count = disk_count 1351 self._external_address = external_address 1352 1353 self._dresults = [None] * disk_count 1354 self._daemon_port = [None] * disk_count 1355 1356 self._salt = utils.GenerateSecret(8)
1357 1358 @property
1359 - def disk_results(self):
1360 """Returns per-disk results. 1361 1362 """ 1363 return self._dresults
1364
1365 - def _CheckAllListening(self):
1366 """Checks whether all daemons are listening. 1367 1368 If all daemons are listening, the information is sent to the client. 1369 1370 """ 1371 if not compat.all(dp is not None for dp in self._daemon_port): 1372 return 1373 1374 host = self._external_address 1375 1376 disks = [] 1377 for idx, (port, magic) in enumerate(self._daemon_port): 1378 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt, 1379 idx, host, port, magic)) 1380 1381 assert len(disks) == self._disk_count 1382 1383 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, { 1384 "disks": disks, 1385 "x509_ca": self._x509_cert_pem, 1386 })
1387
1388 - def ReportListening(self, ie, private, _):
1389 """Called when daemon started listening. 1390 1391 """ 1392 (idx, ) = private 1393 1394 self._feedback_fn("Disk %s is now listening" % idx) 1395 1396 assert self._daemon_port[idx] is None 1397 1398 self._daemon_port[idx] = (ie.listen_port, ie.magic) 1399 1400 self._CheckAllListening()
1401
1402 - def ReportConnected(self, ie, private):
1403 """Called when a connection has been established. 1404 1405 """ 1406 (idx, ) = private 1407 1408 self._feedback_fn("Disk %s is now receiving data" % idx)
1409
1410 - def ReportFinished(self, ie, private):
1411 """Called when a transfer has finished. 1412 1413 """ 1414 (idx, ) = private 1415 1416 # Daemon is certainly no longer listening 1417 self._daemon_port[idx] = None 1418 1419 if ie.success: 1420 self._feedback_fn("Disk %s finished receiving data" % idx) 1421 else: 1422 self._feedback_fn(("Disk %s failed to receive data: %s" 1423 " (recent output: %s)") % 1424 (idx, ie.final_message, ie.recent_output)) 1425 1426 self._dresults[idx] = bool(ie.success)
1427
1428 1429 -def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca, 1430 cds, timeouts):
1431 """Imports an instance from another cluster. 1432 1433 @param lu: Logical unit instance 1434 @param feedback_fn: Feedback function 1435 @type instance: L{objects.Instance} 1436 @param instance: Instance object 1437 @type pnode: L{objects.Node} 1438 @param pnode: Primary node of instance as an object 1439 @type source_x509_ca: OpenSSL.crypto.X509 1440 @param source_x509_ca: Import source's X509 CA 1441 @type cds: string 1442 @param cds: Cluster domain secret 1443 @type timeouts: L{ImportExportTimeouts} 1444 @param timeouts: Timeouts for this import 1445 1446 """ 1447 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, 1448 source_x509_ca) 1449 1450 magic_base = utils.GenerateSecret(6) 1451 1452 # Decide whether to use IPv6 1453 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip) 1454 1455 # Create crypto key 1456 result = lu.rpc.call_x509_cert_create(instance.primary_node, 1457 constants.RIE_CERT_VALIDITY) 1458 result.Raise("Can't create X509 key and certificate on %s" % result.node) 1459 1460 (x509_key_name, x509_cert_pem) = result.payload 1461 try: 1462 # Load certificate 1463 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, 1464 x509_cert_pem) 1465 1466 # Sign certificate 1467 signed_x509_cert_pem = \ 1468 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8)) 1469 1470 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem, 1471 len(instance.disks), pnode.primary_ip) 1472 1473 ieloop = ImportExportLoop(lu) 1474 try: 1475 for idx, dev in enumerate(instance.disks): 1476 magic = _GetInstDiskMagic(magic_base, instance.name, idx) 1477 1478 # Import daemon options 1479 opts = objects.ImportExportOptions(key_name=x509_key_name, 1480 ca_pem=source_ca_pem, 1481 magic=magic, ipv6=ipv6) 1482 1483 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance, 1484 "disk%d" % idx, 1485 constants.IEIO_SCRIPT, (dev, idx), 1486 timeouts, cbs, private=(idx, ))) 1487 1488 ieloop.Run() 1489 finally: 1490 ieloop.FinalizeAll() 1491 finally: 1492 # Remove crypto key and certificate 1493 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name) 1494 result.Raise("Can't remove X509 key and certificate on %s" % result.node) 1495 1496 return cbs.disk_results
1497
1498 1499 -def _GetImportExportHandshakeMessage(version):
1500 """Returns the handshake message for a RIE protocol version. 1501 1502 @type version: number 1503 1504 """ 1505 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1506
1507 1508 -def ComputeRemoteExportHandshake(cds):
1509 """Computes the remote import/export handshake. 1510 1511 @type cds: string 1512 @param cds: Cluster domain secret 1513 1514 """ 1515 salt = utils.GenerateSecret(8) 1516 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION) 1517 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1518
1519 1520 -def CheckRemoteExportHandshake(cds, handshake):
1521 """Checks the handshake of a remote import/export. 1522 1523 @type cds: string 1524 @param cds: Cluster domain secret 1525 @type handshake: sequence 1526 @param handshake: Handshake sent by remote peer 1527 1528 """ 1529 try: 1530 (version, hmac_digest, hmac_salt) = handshake 1531 except (TypeError, ValueError), err: 1532 return "Invalid data: %s" % err 1533 1534 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version), 1535 hmac_digest, salt=hmac_salt): 1536 return "Hash didn't match, clusters don't share the same domain secret" 1537 1538 if version != constants.RIE_VERSION: 1539 return ("Clusters don't have the same remote import/export protocol" 1540 " (local=%s, remote=%s)" % 1541 (constants.RIE_VERSION, version)) 1542 1543 return None
1544
1545 1546 -def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1547 """Returns the hashed text for import/export disk information. 1548 1549 @type disk_index: number 1550 @param disk_index: Index of disk (included in hash) 1551 @type host: string 1552 @param host: Hostname 1553 @type port: number 1554 @param port: Daemon port 1555 @type magic: string 1556 @param magic: Magic value 1557 1558 """ 1559 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1560
1561 1562 -def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1563 """Verifies received disk information for an export. 1564 1565 @type cds: string 1566 @param cds: Cluster domain secret 1567 @type disk_index: number 1568 @param disk_index: Index of disk (included in hash) 1569 @type disk_info: sequence 1570 @param disk_info: Disk information sent by remote peer 1571 1572 """ 1573 try: 1574 (host, port, magic, hmac_digest, hmac_salt) = disk_info 1575 except (TypeError, ValueError), err: 1576 raise errors.GenericError("Invalid data: %s" % err) 1577 1578 if not (host and port and magic): 1579 raise errors.GenericError("Missing destination host, port or magic") 1580 1581 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1582 1583 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt): 1584 raise errors.GenericError("HMAC is wrong") 1585 1586 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host): 1587 destination = host 1588 else: 1589 destination = netutils.Hostname.GetNormalizedName(host) 1590 1591 return (destination, 1592 utils.ValidateServiceName(port), 1593 magic)
1594
1595 1596 -def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1597 """Computes the signed disk information for a remote import. 1598 1599 @type cds: string 1600 @param cds: Cluster domain secret 1601 @type salt: string 1602 @param salt: HMAC salt 1603 @type disk_index: number 1604 @param disk_index: Index of disk (included in hash) 1605 @type host: string 1606 @param host: Hostname 1607 @type port: number 1608 @param port: Daemon port 1609 @type magic: string 1610 @param magic: Magic value 1611 1612 """ 1613 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1614 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt) 1615 return (host, port, magic, hmac_digest, salt)
1616
1617 1618 -def CalculateGroupIPolicy(cluster, group):
1619 """Calculate instance policy for group. 1620 1621 """ 1622 return cluster.SimpleFillIPolicy(group.ipolicy)
1623
1624 1625 -def ComputeDiskSize(disk_template, disks):
1626 """Compute disk size requirements according to disk template 1627 1628 """ 1629 # Required free disk space as a function of disk and swap space 1630 req_size_dict = { 1631 constants.DT_DISKLESS: 0, 1632 constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks), 1633 # 128 MB are added for drbd metadata for each disk 1634 constants.DT_DRBD8: 1635 sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks), 1636 constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks), 1637 constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks), 1638 constants.DT_BLOCK: 0, 1639 constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks), 1640 constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks), 1641 } 1642 1643 if disk_template not in req_size_dict: 1644 raise errors.ProgrammerError("Disk template '%s' size requirement" 1645 " is unknown" % disk_template) 1646 1647 return req_size_dict[disk_template]
1648