Package ganeti :: Package masterd :: Module instance
[hide private]
[frames] | no frames]

Source Code for Module ganeti.masterd.instance

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2010, 2011 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Instance-related functions and classes for masterd. 
  23   
  24  """ 
  25   
  26  import logging 
  27  import time 
  28  import OpenSSL 
  29   
  30  from ganeti import constants 
  31  from ganeti import errors 
  32  from ganeti import compat 
  33  from ganeti import utils 
  34  from ganeti import objects 
  35  from ganeti import netutils 
  36  from ganeti import pathutils 
37 38 39 -class _ImportExportError(Exception):
40 """Local exception to report import/export errors. 41 42 """
43
44 45 -class ImportExportTimeouts(object):
46 #: Time until daemon starts writing status file 47 DEFAULT_READY_TIMEOUT = 10 48 49 #: Length of time until errors cause hard failure 50 DEFAULT_ERROR_TIMEOUT = 10 51 52 #: Time after which daemon must be listening 53 DEFAULT_LISTEN_TIMEOUT = 10 54 55 #: Progress update interval 56 DEFAULT_PROGRESS_INTERVAL = 60 57 58 __slots__ = [ 59 "error", 60 "ready", 61 "listen", 62 "connect", 63 "progress", 64 ] 65
66 - def __init__(self, connect, 67 listen=DEFAULT_LISTEN_TIMEOUT, 68 error=DEFAULT_ERROR_TIMEOUT, 69 ready=DEFAULT_READY_TIMEOUT, 70 progress=DEFAULT_PROGRESS_INTERVAL):
71 """Initializes this class. 72 73 @type connect: number 74 @param connect: Timeout for establishing connection 75 @type listen: number 76 @param listen: Timeout for starting to listen for connections 77 @type error: number 78 @param error: Length of time until errors cause hard failure 79 @type ready: number 80 @param ready: Timeout for daemon to become ready 81 @type progress: number 82 @param progress: Progress update interval 83 84 """ 85 self.error = error 86 self.ready = ready 87 self.listen = listen 88 self.connect = connect 89 self.progress = progress
90
91 92 -class ImportExportCbBase(object):
93 """Callbacks for disk import/export. 94 95 """
96 - def ReportListening(self, ie, private, component):
97 """Called when daemon started listening. 98 99 @type ie: Subclass of L{_DiskImportExportBase} 100 @param ie: Import/export object 101 @param private: Private data passed to import/export object 102 @param component: transfer component name 103 104 """
105
106 - def ReportConnected(self, ie, private):
107 """Called when a connection has been established. 108 109 @type ie: Subclass of L{_DiskImportExportBase} 110 @param ie: Import/export object 111 @param private: Private data passed to import/export object 112 113 """
114
115 - def ReportProgress(self, ie, private):
116 """Called when new progress information should be reported. 117 118 @type ie: Subclass of L{_DiskImportExportBase} 119 @param ie: Import/export object 120 @param private: Private data passed to import/export object 121 122 """
123
124 - def ReportFinished(self, ie, private):
125 """Called when a transfer has finished. 126 127 @type ie: Subclass of L{_DiskImportExportBase} 128 @param ie: Import/export object 129 @param private: Private data passed to import/export object 130 131 """
132
133 134 -class _DiskImportExportBase(object):
135 MODE_TEXT = None 136
137 - def __init__(self, lu, node_name, opts, 138 instance, component, timeouts, cbs, private=None):
139 """Initializes this class. 140 141 @param lu: Logical unit instance 142 @type node_name: string 143 @param node_name: Node name for import 144 @type opts: L{objects.ImportExportOptions} 145 @param opts: Import/export daemon options 146 @type instance: L{objects.Instance} 147 @param instance: Instance object 148 @type component: string 149 @param component: which part of the instance is being imported 150 @type timeouts: L{ImportExportTimeouts} 151 @param timeouts: Timeouts for this import 152 @type cbs: L{ImportExportCbBase} 153 @param cbs: Callbacks 154 @param private: Private data for callback functions 155 156 """ 157 assert self.MODE_TEXT 158 159 self._lu = lu 160 self.node_name = node_name 161 self._opts = opts.Copy() 162 self._instance = instance 163 self._component = component 164 self._timeouts = timeouts 165 self._cbs = cbs 166 self._private = private 167 168 # Set master daemon's timeout in options for import/export daemon 169 assert self._opts.connect_timeout is None 170 self._opts.connect_timeout = timeouts.connect 171 172 # Parent loop 173 self._loop = None 174 175 # Timestamps 176 self._ts_begin = None 177 self._ts_connected = None 178 self._ts_finished = None 179 self._ts_cleanup = None 180 self._ts_last_progress = None 181 self._ts_last_error = None 182 183 # Transfer status 184 self.success = None 185 self.final_message = None 186 187 # Daemon status 188 self._daemon_name = None 189 self._daemon = None
190 191 @property
192 - def recent_output(self):
193 """Returns the most recent output from the daemon. 194 195 """ 196 if self._daemon: 197 return "\n".join(self._daemon.recent_output) 198 199 return None
200 201 @property
202 - def progress(self):
203 """Returns transfer progress information. 204 205 """ 206 if not self._daemon: 207 return None 208 209 return (self._daemon.progress_mbytes, 210 self._daemon.progress_throughput, 211 self._daemon.progress_percent, 212 self._daemon.progress_eta)
213 214 @property
215 - def magic(self):
216 """Returns the magic value for this import/export. 217 218 """ 219 return self._opts.magic
220 221 @property
222 - def active(self):
223 """Determines whether this transport is still active. 224 225 """ 226 return self.success is None
227 228 @property
229 - def loop(self):
230 """Returns parent loop. 231 232 @rtype: L{ImportExportLoop} 233 234 """ 235 return self._loop
236
237 - def SetLoop(self, loop):
238 """Sets the parent loop. 239 240 @type loop: L{ImportExportLoop} 241 242 """ 243 if self._loop: 244 raise errors.ProgrammerError("Loop can only be set once") 245 246 self._loop = loop
247
248 - def _StartDaemon(self):
249 """Starts the import/export daemon. 250 251 """ 252 raise NotImplementedError()
253
254 - def CheckDaemon(self):
255 """Checks whether daemon has been started and if not, starts it. 256 257 @rtype: string 258 @return: Daemon name 259 260 """ 261 assert self._ts_cleanup is None 262 263 if self._daemon_name is None: 264 assert self._ts_begin is None 265 266 result = self._StartDaemon() 267 if result.fail_msg: 268 raise _ImportExportError("Failed to start %s on %s: %s" % 269 (self.MODE_TEXT, self.node_name, 270 result.fail_msg)) 271 272 daemon_name = result.payload 273 274 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name, 275 self.node_name) 276 277 self._ts_begin = time.time() 278 self._daemon_name = daemon_name 279 280 return self._daemon_name
281
282 - def GetDaemonName(self):
283 """Returns the daemon name. 284 285 """ 286 assert self._daemon_name, "Daemon has not been started" 287 assert self._ts_cleanup is None 288 return self._daemon_name
289
290 - def Abort(self):
291 """Sends SIGTERM to import/export daemon (if still active). 292 293 """ 294 if self._daemon_name: 295 self._lu.LogWarning("Aborting %s '%s' on %s", 296 self.MODE_TEXT, self._daemon_name, self.node_name) 297 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name) 298 if result.fail_msg: 299 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s", 300 self.MODE_TEXT, self._daemon_name, 301 self.node_name, result.fail_msg) 302 return False 303 304 return True
305
306 - def _SetDaemonData(self, data):
307 """Internal function for updating status daemon data. 308 309 @type data: L{objects.ImportExportStatus} 310 @param data: Daemon status data 311 312 """ 313 assert self._ts_begin is not None 314 315 if not data: 316 if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready): 317 raise _ImportExportError("Didn't become ready after %s seconds" % 318 self._timeouts.ready) 319 320 return False 321 322 self._daemon = data 323 324 return True
325
326 - def SetDaemonData(self, success, data):
327 """Updates daemon status data. 328 329 @type success: bool 330 @param success: Whether fetching data was successful or not 331 @type data: L{objects.ImportExportStatus} 332 @param data: Daemon status data 333 334 """ 335 if not success: 336 if self._ts_last_error is None: 337 self._ts_last_error = time.time() 338 339 elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error): 340 raise _ImportExportError("Too many errors while updating data") 341 342 return False 343 344 self._ts_last_error = None 345 346 return self._SetDaemonData(data)
347
348 - def CheckListening(self):
349 """Checks whether the daemon is listening. 350 351 """ 352 raise NotImplementedError()
353
354 - def _GetConnectedCheckEpoch(self):
355 """Returns timeout to calculate connect timeout. 356 357 """ 358 raise NotImplementedError()
359
360 - def CheckConnected(self):
361 """Checks whether the daemon is connected. 362 363 @rtype: bool 364 @return: Whether the daemon is connected 365 366 """ 367 assert self._daemon, "Daemon status missing" 368 369 if self._ts_connected is not None: 370 return True 371 372 if self._daemon.connected: 373 self._ts_connected = time.time() 374 375 # TODO: Log remote peer 376 logging.debug("%s '%s' on %s is now connected", 377 self.MODE_TEXT, self._daemon_name, self.node_name) 378 379 self._cbs.ReportConnected(self, self._private) 380 381 return True 382 383 if utils.TimeoutExpired(self._GetConnectedCheckEpoch(), 384 self._timeouts.connect): 385 raise _ImportExportError("Not connected after %s seconds" % 386 self._timeouts.connect) 387 388 return False
389
390 - def _CheckProgress(self):
391 """Checks whether a progress update should be reported. 392 393 """ 394 if ((self._ts_last_progress is None or 395 utils.TimeoutExpired(self._ts_last_progress, 396 self._timeouts.progress)) and 397 self._daemon and 398 self._daemon.progress_mbytes is not None and 399 self._daemon.progress_throughput is not None): 400 self._cbs.ReportProgress(self, self._private) 401 self._ts_last_progress = time.time()
402
403 - def CheckFinished(self):
404 """Checks whether the daemon exited. 405 406 @rtype: bool 407 @return: Whether the transfer is finished 408 409 """ 410 assert self._daemon, "Daemon status missing" 411 412 if self._ts_finished: 413 return True 414 415 if self._daemon.exit_status is None: 416 # TODO: Adjust delay for ETA expiring soon 417 self._CheckProgress() 418 return False 419 420 self._ts_finished = time.time() 421 422 self._ReportFinished(self._daemon.exit_status == 0, 423 self._daemon.error_message) 424 425 return True
426
427 - def _ReportFinished(self, success, message):
428 """Transfer is finished or daemon exited. 429 430 @type success: bool 431 @param success: Whether the transfer was successful 432 @type message: string 433 @param message: Error message 434 435 """ 436 assert self.success is None 437 438 self.success = success 439 self.final_message = message 440 441 if success: 442 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT, 443 self._daemon_name, self.node_name) 444 elif self._daemon_name: 445 self._lu.LogWarning("%s '%s' on %s failed: %s", 446 self.MODE_TEXT, self._daemon_name, self.node_name, 447 message) 448 else: 449 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT, 450 self.node_name, message) 451 452 self._cbs.ReportFinished(self, self._private)
453
454 - def _Finalize(self):
455 """Makes the RPC call to finalize this import/export. 456 457 """ 458 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
459
460 - def Finalize(self, error=None):
461 """Finalizes this import/export. 462 463 """ 464 if self._daemon_name: 465 logging.info("Finalizing %s '%s' on %s", 466 self.MODE_TEXT, self._daemon_name, self.node_name) 467 468 result = self._Finalize() 469 if result.fail_msg: 470 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s", 471 self.MODE_TEXT, self._daemon_name, 472 self.node_name, result.fail_msg) 473 return False 474 475 # Daemon is no longer running 476 self._daemon_name = None 477 self._ts_cleanup = time.time() 478 479 if error: 480 self._ReportFinished(False, error) 481 482 return True
483
484 485 -class DiskImport(_DiskImportExportBase):
486 MODE_TEXT = "import" 487
488 - def __init__(self, lu, node_name, opts, instance, component, 489 dest, dest_args, timeouts, cbs, private=None):
490 """Initializes this class. 491 492 @param lu: Logical unit instance 493 @type node_name: string 494 @param node_name: Node name for import 495 @type opts: L{objects.ImportExportOptions} 496 @param opts: Import/export daemon options 497 @type instance: L{objects.Instance} 498 @param instance: Instance object 499 @type component: string 500 @param component: which part of the instance is being imported 501 @param dest: I/O destination 502 @param dest_args: I/O arguments 503 @type timeouts: L{ImportExportTimeouts} 504 @param timeouts: Timeouts for this import 505 @type cbs: L{ImportExportCbBase} 506 @param cbs: Callbacks 507 @param private: Private data for callback functions 508 509 """ 510 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance, 511 component, timeouts, cbs, private) 512 self._dest = dest 513 self._dest_args = dest_args 514 515 # Timestamps 516 self._ts_listening = None
517 518 @property
519 - def listen_port(self):
520 """Returns the port the daemon is listening on. 521 522 """ 523 if self._daemon: 524 return self._daemon.listen_port 525 526 return None
527
528 - def _StartDaemon(self):
529 """Starts the import daemon. 530 531 """ 532 return self._lu.rpc.call_import_start(self.node_name, self._opts, 533 self._instance, self._component, 534 (self._dest, self._dest_args))
535
536 - def CheckListening(self):
537 """Checks whether the daemon is listening. 538 539 @rtype: bool 540 @return: Whether the daemon is listening 541 542 """ 543 assert self._daemon, "Daemon status missing" 544 545 if self._ts_listening is not None: 546 return True 547 548 port = self._daemon.listen_port 549 if port is not None: 550 self._ts_listening = time.time() 551 552 logging.debug("Import '%s' on %s is now listening on port %s", 553 self._daemon_name, self.node_name, port) 554 555 self._cbs.ReportListening(self, self._private, self._component) 556 557 return True 558 559 if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen): 560 raise _ImportExportError("Not listening after %s seconds" % 561 self._timeouts.listen) 562 563 return False
564
565 - def _GetConnectedCheckEpoch(self):
566 """Returns the time since we started listening. 567 568 """ 569 assert self._ts_listening is not None, \ 570 ("Checking whether an import is connected is only useful" 571 " once it's been listening") 572 573 return self._ts_listening
574
575 576 -class DiskExport(_DiskImportExportBase):
577 MODE_TEXT = "export" 578
579 - def __init__(self, lu, node_name, opts, dest_host, dest_port, 580 instance, component, source, source_args, 581 timeouts, cbs, private=None):
582 """Initializes this class. 583 584 @param lu: Logical unit instance 585 @type node_name: string 586 @param node_name: Node name for import 587 @type opts: L{objects.ImportExportOptions} 588 @param opts: Import/export daemon options 589 @type dest_host: string 590 @param dest_host: Destination host name or IP address 591 @type dest_port: number 592 @param dest_port: Destination port number 593 @type instance: L{objects.Instance} 594 @param instance: Instance object 595 @type component: string 596 @param component: which part of the instance is being imported 597 @param source: I/O source 598 @param source_args: I/O source 599 @type timeouts: L{ImportExportTimeouts} 600 @param timeouts: Timeouts for this import 601 @type cbs: L{ImportExportCbBase} 602 @param cbs: Callbacks 603 @param private: Private data for callback functions 604 605 """ 606 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance, 607 component, timeouts, cbs, private) 608 self._dest_host = dest_host 609 self._dest_port = dest_port 610 self._source = source 611 self._source_args = source_args
612
613 - def _StartDaemon(self):
614 """Starts the export daemon. 615 616 """ 617 return self._lu.rpc.call_export_start(self.node_name, self._opts, 618 self._dest_host, self._dest_port, 619 self._instance, self._component, 620 (self._source, self._source_args))
621
622 - def CheckListening(self):
623 """Checks whether the daemon is listening. 624 625 """ 626 # Only an import can be listening 627 return True
628
629 - def _GetConnectedCheckEpoch(self):
630 """Returns the time since the daemon started. 631 632 """ 633 assert self._ts_begin is not None 634 635 return self._ts_begin
636
637 638 -def FormatProgress(progress):
639 """Formats progress information for user consumption 640 641 """ 642 (mbytes, throughput, percent, eta) = progress 643 644 parts = [ 645 utils.FormatUnit(mbytes, "h"), 646 647 # Not using FormatUnit as it doesn't support kilobytes 648 "%0.1f MiB/s" % throughput, 649 ] 650 651 if percent is not None: 652 parts.append("%d%%" % percent) 653 654 if eta is not None: 655 parts.append("ETA %s" % utils.FormatSeconds(eta)) 656 657 return utils.CommaJoin(parts)
658
659 660 -class ImportExportLoop:
661 MIN_DELAY = 1.0 662 MAX_DELAY = 20.0 663
664 - def __init__(self, lu):
665 """Initializes this class. 666 667 """ 668 self._lu = lu 669 self._queue = [] 670 self._pending_add = []
671
672 - def Add(self, diskie):
673 """Adds an import/export object to the loop. 674 675 @type diskie: Subclass of L{_DiskImportExportBase} 676 @param diskie: Import/export object 677 678 """ 679 assert diskie not in self._pending_add 680 assert diskie.loop is None 681 682 diskie.SetLoop(self) 683 684 # Adding new objects to a staging list is necessary, otherwise the main 685 # loop gets confused if callbacks modify the queue while the main loop is 686 # iterating over it. 687 self._pending_add.append(diskie)
688 689 @staticmethod
690 - def _CollectDaemonStatus(lu, daemons):
691 """Collects the status for all import/export daemons. 692 693 """ 694 daemon_status = {} 695 696 for node_name, names in daemons.iteritems(): 697 result = lu.rpc.call_impexp_status(node_name, names) 698 if result.fail_msg: 699 lu.LogWarning("Failed to get daemon status on %s: %s", 700 node_name, result.fail_msg) 701 continue 702 703 assert len(names) == len(result.payload) 704 705 daemon_status[node_name] = dict(zip(names, result.payload)) 706 707 return daemon_status
708 709 @staticmethod
710 - def _GetActiveDaemonNames(queue):
711 """Gets the names of all active daemons. 712 713 """ 714 result = {} 715 for diskie in queue: 716 if not diskie.active: 717 continue 718 719 try: 720 # Start daemon if necessary 721 daemon_name = diskie.CheckDaemon() 722 except _ImportExportError, err: 723 logging.exception("%s failed", diskie.MODE_TEXT) 724 diskie.Finalize(error=str(err)) 725 continue 726 727 result.setdefault(diskie.node_name, []).append(daemon_name) 728 729 assert len(queue) >= len(result) 730 assert len(queue) >= sum([len(names) for names in result.itervalues()]) 731 732 logging.debug("daemons=%r", result) 733 734 return result
735
736 - def _AddPendingToQueue(self):
737 """Adds all pending import/export objects to the internal queue. 738 739 """ 740 assert compat.all(diskie not in self._queue and diskie.loop == self 741 for diskie in self._pending_add) 742 743 self._queue.extend(self._pending_add) 744 745 del self._pending_add[:]
746
747 - def Run(self):
748 """Utility main loop. 749 750 """ 751 while True: 752 self._AddPendingToQueue() 753 754 # Collect all active daemon names 755 daemons = self._GetActiveDaemonNames(self._queue) 756 if not daemons: 757 break 758 759 # Collection daemon status data 760 data = self._CollectDaemonStatus(self._lu, daemons) 761 762 # Use data 763 delay = self.MAX_DELAY 764 for diskie in self._queue: 765 if not diskie.active: 766 continue 767 768 try: 769 try: 770 all_daemon_data = data[diskie.node_name] 771 except KeyError: 772 result = diskie.SetDaemonData(False, None) 773 else: 774 result = \ 775 diskie.SetDaemonData(True, 776 all_daemon_data[diskie.GetDaemonName()]) 777 778 if not result: 779 # Daemon not yet ready, retry soon 780 delay = min(3.0, delay) 781 continue 782 783 if diskie.CheckFinished(): 784 # Transfer finished 785 diskie.Finalize() 786 continue 787 788 # Normal case: check again in 5 seconds 789 delay = min(5.0, delay) 790 791 if not diskie.CheckListening(): 792 # Not yet listening, retry soon 793 delay = min(1.0, delay) 794 continue 795 796 if not diskie.CheckConnected(): 797 # Not yet connected, retry soon 798 delay = min(1.0, delay) 799 continue 800 801 except _ImportExportError, err: 802 logging.exception("%s failed", diskie.MODE_TEXT) 803 diskie.Finalize(error=str(err)) 804 805 if not compat.any(diskie.active for diskie in self._queue): 806 break 807 808 # Wait a bit 809 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay)) 810 logging.debug("Waiting for %ss", delay) 811 time.sleep(delay)
812
813 - def FinalizeAll(self):
814 """Finalizes all pending transfers. 815 816 """ 817 success = True 818 819 for diskie in self._queue: 820 success = diskie.Finalize() and success 821 822 return success
823
824 825 -class _TransferInstCbBase(ImportExportCbBase):
826 - def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs, 827 dest_node, dest_ip):
828 """Initializes this class. 829 830 """ 831 ImportExportCbBase.__init__(self) 832 833 self.lu = lu 834 self.feedback_fn = feedback_fn 835 self.instance = instance 836 self.timeouts = timeouts 837 self.src_node = src_node 838 self.src_cbs = src_cbs 839 self.dest_node = dest_node 840 self.dest_ip = dest_ip
841
842 843 -class _TransferInstSourceCb(_TransferInstCbBase):
844 - def ReportConnected(self, ie, dtp):
845 """Called when a connection has been established. 846 847 """ 848 assert self.src_cbs is None 849 assert dtp.src_export == ie 850 assert dtp.dest_import 851 852 self.feedback_fn("%s is sending data on %s" % 853 (dtp.data.name, ie.node_name))
854
855 - def ReportProgress(self, ie, dtp):
856 """Called when new progress information should be reported. 857 858 """ 859 progress = ie.progress 860 if not progress: 861 return 862 863 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
864
865 - def ReportFinished(self, ie, dtp):
866 """Called when a transfer has finished. 867 868 """ 869 assert self.src_cbs is None 870 assert dtp.src_export == ie 871 assert dtp.dest_import 872 873 if ie.success: 874 self.feedback_fn("%s finished sending data" % dtp.data.name) 875 else: 876 self.feedback_fn("%s failed to send data: %s (recent output: %s)" % 877 (dtp.data.name, ie.final_message, ie.recent_output)) 878 879 dtp.RecordResult(ie.success) 880 881 cb = dtp.data.finished_fn 882 if cb: 883 cb() 884 885 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 886 # give the daemon a moment to sort things out 887 if dtp.dest_import and not ie.success: 888 dtp.dest_import.Abort()
889
890 891 -class _TransferInstDestCb(_TransferInstCbBase):
892 - def ReportListening(self, ie, dtp, component):
893 """Called when daemon started listening. 894 895 """ 896 assert self.src_cbs 897 assert dtp.src_export is None 898 assert dtp.dest_import 899 assert dtp.export_opts 900 901 self.feedback_fn("%s is now listening, starting export" % dtp.data.name) 902 903 # Start export on source node 904 de = DiskExport(self.lu, self.src_node, dtp.export_opts, 905 self.dest_ip, ie.listen_port, self.instance, 906 component, dtp.data.src_io, dtp.data.src_ioargs, 907 self.timeouts, self.src_cbs, private=dtp) 908 ie.loop.Add(de) 909 910 dtp.src_export = de
911
912 - def ReportConnected(self, ie, dtp):
913 """Called when a connection has been established. 914 915 """ 916 self.feedback_fn("%s is receiving data on %s" % 917 (dtp.data.name, self.dest_node))
918
919 - def ReportFinished(self, ie, dtp):
920 """Called when a transfer has finished. 921 922 """ 923 if ie.success: 924 self.feedback_fn("%s finished receiving data" % dtp.data.name) 925 else: 926 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" % 927 (dtp.data.name, ie.final_message, ie.recent_output)) 928 929 dtp.RecordResult(ie.success) 930 931 # TODO: Check whether sending SIGTERM right away is okay, maybe we should 932 # give the daemon a moment to sort things out 933 if dtp.src_export and not ie.success: 934 dtp.src_export.Abort()
935
936 937 -class DiskTransfer(object):
938 - def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs, 939 finished_fn):
940 """Initializes this class. 941 942 @type name: string 943 @param name: User-visible name for this transfer (e.g. "disk/0") 944 @param src_io: Source I/O type 945 @param src_ioargs: Source I/O arguments 946 @param dest_io: Destination I/O type 947 @param dest_ioargs: Destination I/O arguments 948 @type finished_fn: callable 949 @param finished_fn: Function called once transfer has finished 950 951 """ 952 self.name = name 953 954 self.src_io = src_io 955 self.src_ioargs = src_ioargs 956 957 self.dest_io = dest_io 958 self.dest_ioargs = dest_ioargs 959 960 self.finished_fn = finished_fn
961
962 963 -class _DiskTransferPrivate(object):
964 - def __init__(self, data, success, export_opts):
965 """Initializes this class. 966 967 @type data: L{DiskTransfer} 968 @type success: bool 969 970 """ 971 self.data = data 972 self.success = success 973 self.export_opts = export_opts 974 975 self.src_export = None 976 self.dest_import = None
977
978 - def RecordResult(self, success):
979 """Updates the status. 980 981 One failed part will cause the whole transfer to fail. 982 983 """ 984 self.success = self.success and success
985
986 987 -def _GetInstDiskMagic(base, instance_name, index):
988 """Computes the magic value for a disk export or import. 989 990 @type base: string 991 @param base: Random seed value (can be the same for all disks of a transfer) 992 @type instance_name: string 993 @param instance_name: Name of instance 994 @type index: number 995 @param index: Disk index 996 997 """ 998 h = compat.sha1_hash() 999 h.update(str(constants.RIE_VERSION)) 1000 h.update(base) 1001 h.update(instance_name) 1002 h.update(str(index)) 1003 return h.hexdigest()
1004
1005 1006 -def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, 1007 instance, all_transfers):
1008 """Transfers an instance's data from one node to another. 1009 1010 @param lu: Logical unit instance 1011 @param feedback_fn: Feedback function 1012 @type src_node: string 1013 @param src_node: Source node name 1014 @type dest_node: string 1015 @param dest_node: Destination node name 1016 @type dest_ip: string 1017 @param dest_ip: IP address of destination node 1018 @type instance: L{objects.Instance} 1019 @param instance: Instance object 1020 @type all_transfers: list of L{DiskTransfer} instances 1021 @param all_transfers: List of all disk transfers to be made 1022 @rtype: list 1023 @return: List with a boolean (True=successful, False=failed) for success for 1024 each transfer 1025 1026 """ 1027 # Disable compression for all moves as these are all within the same cluster 1028 compress = constants.IEC_NONE 1029 1030 logging.debug("Source node %s, destination node %s, compression '%s'", 1031 src_node, dest_node, compress) 1032 1033 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) 1034 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, 1035 src_node, None, dest_node, dest_ip) 1036 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts, 1037 src_node, src_cbs, dest_node, dest_ip) 1038 1039 all_dtp = [] 1040 1041 base_magic = utils.GenerateSecret(6) 1042 1043 ieloop = ImportExportLoop(lu) 1044 try: 1045 for idx, transfer in enumerate(all_transfers): 1046 if transfer: 1047 feedback_fn("Exporting %s from %s to %s" % 1048 (transfer.name, src_node, dest_node)) 1049 1050 magic = _GetInstDiskMagic(base_magic, instance.name, idx) 1051 opts = objects.ImportExportOptions(key_name=None, ca_pem=None, 1052 compress=compress, magic=magic) 1053 1054 dtp = _DiskTransferPrivate(transfer, True, opts) 1055 1056 di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx, 1057 transfer.dest_io, transfer.dest_ioargs, 1058 timeouts, dest_cbs, private=dtp) 1059 ieloop.Add(di) 1060 1061 dtp.dest_import = di 1062 else: 1063 dtp = _DiskTransferPrivate(None, False, None) 1064 1065 all_dtp.append(dtp) 1066 1067 ieloop.Run() 1068 finally: 1069 ieloop.FinalizeAll() 1070 1071 assert len(all_dtp) == len(all_transfers) 1072 assert compat.all((dtp.src_export is None or 1073 dtp.src_export.success is not None) and 1074 (dtp.dest_import is None or 1075 dtp.dest_import.success is not None) 1076 for dtp in all_dtp), \ 1077 "Not all imports/exports are finalized" 1078 1079 return [bool(dtp.success) for dtp in all_dtp]
1080
1081 1082 -class _RemoteExportCb(ImportExportCbBase):
1083 - def __init__(self, feedback_fn, disk_count):
1084 """Initializes this class. 1085 1086 """ 1087 ImportExportCbBase.__init__(self) 1088 self._feedback_fn = feedback_fn 1089 self._dresults = [None] * disk_count
1090 1091 @property
1092 - def disk_results(self):
1093 """Returns per-disk results. 1094 1095 """ 1096 return self._dresults
1097
1098 - def ReportConnected(self, ie, private):
1099 """Called when a connection has been established. 1100 1101 """ 1102 (idx, _) = private 1103 1104 self._feedback_fn("Disk %s is now sending data" % idx)
1105
1106 - def ReportProgress(self, ie, private):
1107 """Called when new progress information should be reported. 1108 1109 """ 1110 (idx, _) = private 1111 1112 progress = ie.progress 1113 if not progress: 1114 return 1115 1116 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1117
1118 - def ReportFinished(self, ie, private):
1119 """Called when a transfer has finished. 1120 1121 """ 1122 (idx, finished_fn) = private 1123 1124 if ie.success: 1125 self._feedback_fn("Disk %s finished sending data" % idx) 1126 else: 1127 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" % 1128 (idx, ie.final_message, ie.recent_output)) 1129 1130 self._dresults[idx] = bool(ie.success) 1131 1132 if finished_fn: 1133 finished_fn()
1134
1135 1136 -class ExportInstanceHelper:
1137 - def __init__(self, lu, feedback_fn, instance):
1138 """Initializes this class. 1139 1140 @param lu: Logical unit instance 1141 @param feedback_fn: Feedback function 1142 @type instance: L{objects.Instance} 1143 @param instance: Instance object 1144 1145 """ 1146 self._lu = lu 1147 self._feedback_fn = feedback_fn 1148 self._instance = instance 1149 1150 self._snap_disks = [] 1151 self._removed_snaps = [False] * len(instance.disks)
1152
1153 - def CreateSnapshots(self):
1154 """Creates an LVM snapshot for every disk of the instance. 1155 1156 """ 1157 assert not self._snap_disks 1158 1159 instance = self._instance 1160 src_node = instance.primary_node 1161 1162 for idx, disk in enumerate(instance.disks): 1163 self._feedback_fn("Creating a snapshot of disk/%s on node %s" % 1164 (idx, src_node)) 1165 1166 # result.payload will be a snapshot of an lvm leaf of the one we 1167 # passed 1168 result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance)) 1169 new_dev = False 1170 msg = result.fail_msg 1171 if msg: 1172 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s", 1173 idx, src_node, msg) 1174 elif (not isinstance(result.payload, (tuple, list)) or 1175 len(result.payload) != 2): 1176 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid" 1177 " result '%s'", idx, src_node, result.payload) 1178 else: 1179 disk_id = tuple(result.payload) 1180 disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy() 1181 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, 1182 logical_id=disk_id, physical_id=disk_id, 1183 iv_name=disk.iv_name, 1184 params=disk_params) 1185 1186 self._snap_disks.append(new_dev) 1187 1188 assert len(self._snap_disks) == len(instance.disks) 1189 assert len(self._removed_snaps) == len(instance.disks)
1190
1191 - def _RemoveSnapshot(self, disk_index):
1192 """Removes an LVM snapshot. 1193 1194 @type disk_index: number 1195 @param disk_index: Index of the snapshot to be removed 1196 1197 """ 1198 disk = self._snap_disks[disk_index] 1199 if disk and not self._removed_snaps[disk_index]: 1200 src_node = self._instance.primary_node 1201 1202 self._feedback_fn("Removing snapshot of disk/%s on node %s" % 1203 (disk_index, src_node)) 1204 1205 result = self._lu.rpc.call_blockdev_remove(src_node, disk) 1206 if result.fail_msg: 1207 self._lu.LogWarning("Could not remove snapshot for disk/%d from node" 1208 " %s: %s", disk_index, src_node, result.fail_msg) 1209 else: 1210 self._removed_snaps[disk_index] = True
1211
1212 - def LocalExport(self, dest_node):
1213 """Intra-cluster instance export. 1214 1215 @type dest_node: L{objects.Node} 1216 @param dest_node: Destination node 1217 1218 """ 1219 instance = self._instance 1220 src_node = instance.primary_node 1221 1222 assert len(self._snap_disks) == len(instance.disks) 1223 1224 transfers = [] 1225 1226 for idx, dev in enumerate(self._snap_disks): 1227 if not dev: 1228 transfers.append(None) 1229 continue 1230 1231 path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name, 1232 dev.physical_id[1]) 1233 1234 finished_fn = compat.partial(self._TransferFinished, idx) 1235 1236 # FIXME: pass debug option from opcode to backend 1237 dt = DiskTransfer("snapshot/%s" % idx, 1238 constants.IEIO_SCRIPT, (dev, idx), 1239 constants.IEIO_FILE, (path, ), 1240 finished_fn) 1241 transfers.append(dt) 1242 1243 # Actually export data 1244 dresults = TransferInstanceData(self._lu, self._feedback_fn, 1245 src_node, dest_node.name, 1246 dest_node.secondary_ip, 1247 instance, transfers) 1248 1249 assert len(dresults) == len(instance.disks) 1250 1251 self._feedback_fn("Finalizing export on %s" % dest_node.name) 1252 result = self._lu.rpc.call_finalize_export(dest_node.name, instance, 1253 self._snap_disks) 1254 msg = result.fail_msg 1255 fin_resu = not msg 1256 if msg: 1257 self._lu.LogWarning("Could not finalize export for instance %s" 1258 " on node %s: %s", instance.name, dest_node.name, msg) 1259 1260 return (fin_resu, dresults)
1261
1262 - def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1263 """Inter-cluster instance export. 1264 1265 @type disk_info: list 1266 @param disk_info: Per-disk destination information 1267 @type key_name: string 1268 @param key_name: Name of X509 key to use 1269 @type dest_ca_pem: string 1270 @param dest_ca_pem: Destination X509 CA in PEM format 1271 @type timeouts: L{ImportExportTimeouts} 1272 @param timeouts: Timeouts for this import 1273 1274 """ 1275 instance = self._instance 1276 1277 assert len(disk_info) == len(instance.disks) 1278 1279 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks)) 1280 1281 ieloop = ImportExportLoop(self._lu) 1282 try: 1283 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks, 1284 disk_info)): 1285 # Decide whether to use IPv6 1286 ipv6 = netutils.IP6Address.IsValid(host) 1287 1288 opts = objects.ImportExportOptions(key_name=key_name, 1289 ca_pem=dest_ca_pem, 1290 magic=magic, ipv6=ipv6) 1291 1292 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port)) 1293 finished_fn = compat.partial(self._TransferFinished, idx) 1294 ieloop.Add(DiskExport(self._lu, instance.primary_node, 1295 opts, host, port, instance, "disk%d" % idx, 1296 constants.IEIO_SCRIPT, (dev, idx), 1297 timeouts, cbs, private=(idx, finished_fn))) 1298 1299 ieloop.Run() 1300 finally: 1301 ieloop.FinalizeAll() 1302 1303 return (True, cbs.disk_results)
1304
1305 - def _TransferFinished(self, idx):
1306 """Called once a transfer has finished. 1307 1308 @type idx: number 1309 @param idx: Disk index 1310 1311 """ 1312 logging.debug("Transfer %s finished", idx) 1313 self._RemoveSnapshot(idx)
1314
1315 - def Cleanup(self):
1316 """Remove all snapshots. 1317 1318 """ 1319 assert len(self._removed_snaps) == len(self._instance.disks) 1320 for idx in range(len(self._instance.disks)): 1321 self._RemoveSnapshot(idx)
1322
1323 1324 -class _RemoteImportCb(ImportExportCbBase):
1325 - def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count, 1326 external_address):
1327 """Initializes this class. 1328 1329 @type cds: string 1330 @param cds: Cluster domain secret 1331 @type x509_cert_pem: string 1332 @param x509_cert_pem: CA used for signing import key 1333 @type disk_count: number 1334 @param disk_count: Number of disks 1335 @type external_address: string 1336 @param external_address: External address of destination node 1337 1338 """ 1339 ImportExportCbBase.__init__(self) 1340 self._feedback_fn = feedback_fn 1341 self._cds = cds 1342 self._x509_cert_pem = x509_cert_pem 1343 self._disk_count = disk_count 1344 self._external_address = external_address 1345 1346 self._dresults = [None] * disk_count 1347 self._daemon_port = [None] * disk_count 1348 1349 self._salt = utils.GenerateSecret(8)
1350 1351 @property
1352 - def disk_results(self):
1353 """Returns per-disk results. 1354 1355 """ 1356 return self._dresults
1357
1358 - def _CheckAllListening(self):
1359 """Checks whether all daemons are listening. 1360 1361 If all daemons are listening, the information is sent to the client. 1362 1363 """ 1364 if not compat.all(dp is not None for dp in self._daemon_port): 1365 return 1366 1367 host = self._external_address 1368 1369 disks = [] 1370 for idx, (port, magic) in enumerate(self._daemon_port): 1371 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt, 1372 idx, host, port, magic)) 1373 1374 assert len(disks) == self._disk_count 1375 1376 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, { 1377 "disks": disks, 1378 "x509_ca": self._x509_cert_pem, 1379 })
1380
1381 - def ReportListening(self, ie, private, _):
1382 """Called when daemon started listening. 1383 1384 """ 1385 (idx, ) = private 1386 1387 self._feedback_fn("Disk %s is now listening" % idx) 1388 1389 assert self._daemon_port[idx] is None 1390 1391 self._daemon_port[idx] = (ie.listen_port, ie.magic) 1392 1393 self._CheckAllListening()
1394
1395 - def ReportConnected(self, ie, private):
1396 """Called when a connection has been established. 1397 1398 """ 1399 (idx, ) = private 1400 1401 self._feedback_fn("Disk %s is now receiving data" % idx)
1402
1403 - def ReportFinished(self, ie, private):
1404 """Called when a transfer has finished. 1405 1406 """ 1407 (idx, ) = private 1408 1409 # Daemon is certainly no longer listening 1410 self._daemon_port[idx] = None 1411 1412 if ie.success: 1413 self._feedback_fn("Disk %s finished receiving data" % idx) 1414 else: 1415 self._feedback_fn(("Disk %s failed to receive data: %s" 1416 " (recent output: %s)") % 1417 (idx, ie.final_message, ie.recent_output)) 1418 1419 self._dresults[idx] = bool(ie.success)
1420
1421 1422 -def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca, 1423 cds, timeouts):
1424 """Imports an instance from another cluster. 1425 1426 @param lu: Logical unit instance 1427 @param feedback_fn: Feedback function 1428 @type instance: L{objects.Instance} 1429 @param instance: Instance object 1430 @type pnode: L{objects.Node} 1431 @param pnode: Primary node of instance as an object 1432 @type source_x509_ca: OpenSSL.crypto.X509 1433 @param source_x509_ca: Import source's X509 CA 1434 @type cds: string 1435 @param cds: Cluster domain secret 1436 @type timeouts: L{ImportExportTimeouts} 1437 @param timeouts: Timeouts for this import 1438 1439 """ 1440 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, 1441 source_x509_ca) 1442 1443 magic_base = utils.GenerateSecret(6) 1444 1445 # Decide whether to use IPv6 1446 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip) 1447 1448 # Create crypto key 1449 result = lu.rpc.call_x509_cert_create(instance.primary_node, 1450 constants.RIE_CERT_VALIDITY) 1451 result.Raise("Can't create X509 key and certificate on %s" % result.node) 1452 1453 (x509_key_name, x509_cert_pem) = result.payload 1454 try: 1455 # Load certificate 1456 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, 1457 x509_cert_pem) 1458 1459 # Sign certificate 1460 signed_x509_cert_pem = \ 1461 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8)) 1462 1463 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem, 1464 len(instance.disks), pnode.primary_ip) 1465 1466 ieloop = ImportExportLoop(lu) 1467 try: 1468 for idx, dev in enumerate(instance.disks): 1469 magic = _GetInstDiskMagic(magic_base, instance.name, idx) 1470 1471 # Import daemon options 1472 opts = objects.ImportExportOptions(key_name=x509_key_name, 1473 ca_pem=source_ca_pem, 1474 magic=magic, ipv6=ipv6) 1475 1476 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance, 1477 "disk%d" % idx, 1478 constants.IEIO_SCRIPT, (dev, idx), 1479 timeouts, cbs, private=(idx, ))) 1480 1481 ieloop.Run() 1482 finally: 1483 ieloop.FinalizeAll() 1484 finally: 1485 # Remove crypto key and certificate 1486 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name) 1487 result.Raise("Can't remove X509 key and certificate on %s" % result.node) 1488 1489 return cbs.disk_results
1490
1491 1492 -def _GetImportExportHandshakeMessage(version):
1493 """Returns the handshake message for a RIE protocol version. 1494 1495 @type version: number 1496 1497 """ 1498 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1499
1500 1501 -def ComputeRemoteExportHandshake(cds):
1502 """Computes the remote import/export handshake. 1503 1504 @type cds: string 1505 @param cds: Cluster domain secret 1506 1507 """ 1508 salt = utils.GenerateSecret(8) 1509 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION) 1510 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1511
1512 1513 -def CheckRemoteExportHandshake(cds, handshake):
1514 """Checks the handshake of a remote import/export. 1515 1516 @type cds: string 1517 @param cds: Cluster domain secret 1518 @type handshake: sequence 1519 @param handshake: Handshake sent by remote peer 1520 1521 """ 1522 try: 1523 (version, hmac_digest, hmac_salt) = handshake 1524 except (TypeError, ValueError), err: 1525 return "Invalid data: %s" % err 1526 1527 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version), 1528 hmac_digest, salt=hmac_salt): 1529 return "Hash didn't match, clusters don't share the same domain secret" 1530 1531 if version != constants.RIE_VERSION: 1532 return ("Clusters don't have the same remote import/export protocol" 1533 " (local=%s, remote=%s)" % 1534 (constants.RIE_VERSION, version)) 1535 1536 return None
1537
1538 1539 -def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1540 """Returns the hashed text for import/export disk information. 1541 1542 @type disk_index: number 1543 @param disk_index: Index of disk (included in hash) 1544 @type host: string 1545 @param host: Hostname 1546 @type port: number 1547 @param port: Daemon port 1548 @type magic: string 1549 @param magic: Magic value 1550 1551 """ 1552 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1553
1554 1555 -def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1556 """Verifies received disk information for an export. 1557 1558 @type cds: string 1559 @param cds: Cluster domain secret 1560 @type disk_index: number 1561 @param disk_index: Index of disk (included in hash) 1562 @type disk_info: sequence 1563 @param disk_info: Disk information sent by remote peer 1564 1565 """ 1566 try: 1567 (host, port, magic, hmac_digest, hmac_salt) = disk_info 1568 except (TypeError, ValueError), err: 1569 raise errors.GenericError("Invalid data: %s" % err) 1570 1571 if not (host and port and magic): 1572 raise errors.GenericError("Missing destination host, port or magic") 1573 1574 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1575 1576 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt): 1577 raise errors.GenericError("HMAC is wrong") 1578 1579 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host): 1580 destination = host 1581 else: 1582 destination = netutils.Hostname.GetNormalizedName(host) 1583 1584 return (destination, 1585 utils.ValidateServiceName(port), 1586 magic)
1587
1588 1589 -def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1590 """Computes the signed disk information for a remote import. 1591 1592 @type cds: string 1593 @param cds: Cluster domain secret 1594 @type salt: string 1595 @param salt: HMAC salt 1596 @type disk_index: number 1597 @param disk_index: Index of disk (included in hash) 1598 @type host: string 1599 @param host: Hostname 1600 @type port: number 1601 @param port: Daemon port 1602 @type magic: string 1603 @param magic: Magic value 1604 1605 """ 1606 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) 1607 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt) 1608 return (host, port, magic, hmac_digest, salt)
1609
1610 1611 -def CalculateGroupIPolicy(cluster, group):
1612 """Calculate instance policy for group. 1613 1614 """ 1615 return cluster.SimpleFillIPolicy(group.ipolicy)
1616
1617 1618 -def ComputeDiskSize(disk_template, disks):
1619 """Compute disk size requirements according to disk template 1620 1621 """ 1622 # Required free disk space as a function of disk and swap space 1623 req_size_dict = { 1624 constants.DT_DISKLESS: 0, 1625 constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks), 1626 # 128 MB are added for drbd metadata for each disk 1627 constants.DT_DRBD8: 1628 sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks), 1629 constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks), 1630 constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks), 1631 constants.DT_BLOCK: 0, 1632 constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks), 1633 constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks), 1634 } 1635 1636 if disk_template not in req_size_dict: 1637 raise errors.ProgrammerError("Disk template '%s' size requirement" 1638 " is unknown" % disk_template) 1639 1640 return req_size_dict[disk_template]
1641