Package ganeti :: Package cmdlib :: Module backup
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.backup

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Logical units dealing with backup operations.""" 
 32   
 33  import OpenSSL 
 34  import logging 
 35   
 36  from ganeti import compat 
 37  from ganeti import constants 
 38  from ganeti import errors 
 39  from ganeti import locking 
 40  from ganeti import masterd 
 41  from ganeti import utils 
 42  from ganeti.utils import retry 
 43   
 44  from ganeti.cmdlib.base import NoHooksLU, LogicalUnit 
 45  from ganeti.cmdlib.common import CheckNodeOnline, ExpandNodeUuidAndName, \ 
 46    IsInstanceRunning, DetermineImageSize 
 47  from ganeti.cmdlib.instance_storage import StartInstanceDisks, \ 
 48    ShutdownInstanceDisks, TemporaryDisk, ImageDisks 
 49  from ganeti.cmdlib.instance_utils import GetClusterDomainSecret, \ 
 50    BuildInstanceHookEnvByObject, CheckNodeNotDrained, RemoveInstance, \ 
 51    CheckCompressionTool 
 52   
 53   
54 -class LUBackupPrepare(NoHooksLU):
55 """Prepares an instance for an export and returns useful information. 56 57 """ 58 REQ_BGL = False 59
60 - def ExpandNames(self):
62
63 - def CheckPrereq(self):
64 """Check prerequisites. 65 66 """ 67 self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name) 68 assert self.instance is not None, \ 69 "Cannot retrieve locked instance %s" % self.op.instance_name 70 CheckNodeOnline(self, self.instance.primary_node) 71 72 self._cds = GetClusterDomainSecret()
73
74 - def Exec(self, feedback_fn):
75 """Prepares an instance for an export. 76 77 """ 78 if self.op.mode == constants.EXPORT_MODE_REMOTE: 79 salt = utils.GenerateSecret(8) 80 81 feedback_fn("Generating X509 certificate on %s" % 82 self.cfg.GetNodeName(self.instance.primary_node)) 83 result = self.rpc.call_x509_cert_create(self.instance.primary_node, 84 constants.RIE_CERT_VALIDITY) 85 result.Raise("Can't create X509 key and certificate on %s" % 86 self.cfg.GetNodeName(result.node)) 87 88 (name, cert_pem) = result.payload 89 90 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, 91 cert_pem) 92 93 return { 94 "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds), 95 "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt), 96 salt), 97 "x509_ca": utils.SignX509Certificate(cert, self._cds, salt), 98 } 99 100 return None
101 102
103 -class LUBackupExport(LogicalUnit):
104 """Export an instance to an image in the cluster. 105 106 """ 107 HPATH = "instance-export" 108 HTYPE = constants.HTYPE_INSTANCE 109 REQ_BGL = False 110
111 - def CheckArguments(self):
112 """Check the arguments. 113 114 """ 115 self.x509_key_name = self.op.x509_key_name 116 self.dest_x509_ca_pem = self.op.destination_x509_ca 117 118 if self.op.mode == constants.EXPORT_MODE_REMOTE: 119 if not self.x509_key_name: 120 raise errors.OpPrereqError("Missing X509 key name for encryption", 121 errors.ECODE_INVAL) 122 123 if not self.dest_x509_ca_pem: 124 raise errors.OpPrereqError("Missing destination X509 CA", 125 errors.ECODE_INVAL) 126 127 if self.op.zero_free_space and not self.op.compress: 128 raise errors.OpPrereqError("Zeroing free space does not make sense " 129 "unless compression is used") 130 131 if self.op.zero_free_space and not self.op.shutdown: 132 raise errors.OpPrereqError("Unless the instance is shut down, zeroing " 133 "cannot be used.")
134
135 - def ExpandNames(self):
136 self._ExpandAndLockInstance() 137 138 # In case we are zeroing, a node lock is required as we will be creating and 139 # destroying a disk - allocations should be stopped, but not on the entire 140 # cluster 141 if self.op.zero_free_space: 142 self.recalculate_locks = {locking.LEVEL_NODE: constants.LOCKS_REPLACE} 143 self._LockInstancesNodes(primary_only=True) 144 145 # Lock all nodes for local exports 146 if self.op.mode == constants.EXPORT_MODE_LOCAL: 147 (self.op.target_node_uuid, self.op.target_node) = \ 148 ExpandNodeUuidAndName(self.cfg, self.op.target_node_uuid, 149 self.op.target_node) 150 # FIXME: lock only instance primary and destination node 151 # 152 # Sad but true, for now we have do lock all nodes, as we don't know where 153 # the previous export might be, and in this LU we search for it and 154 # remove it from its current node. In the future we could fix this by: 155 # - making a tasklet to search (share-lock all), then create the 156 # new one, then one to remove, after 157 # - removing the removal operation altogether 158 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 159 160 # Allocations should be stopped while this LU runs with node locks, but 161 # it doesn't have to be exclusive 162 self.share_locks[locking.LEVEL_NODE_ALLOC] = 1 163 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
164
165 - def DeclareLocks(self, level):
166 """Last minute lock declaration."""
167 # All nodes are locked anyway, so nothing to do here. 168
169 - def BuildHooksEnv(self):
170 """Build hooks env. 171 172 This will run on the master, primary node and target node. 173 174 """ 175 env = { 176 "EXPORT_MODE": self.op.mode, 177 "EXPORT_NODE": self.op.target_node, 178 "EXPORT_DO_SHUTDOWN": self.op.shutdown, 179 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, 180 # TODO: Generic function for boolean env variables 181 "REMOVE_INSTANCE": str(bool(self.op.remove_instance)), 182 } 183 184 env.update(BuildInstanceHookEnvByObject( 185 self, self.instance, 186 secondary_nodes=self.secondary_nodes, disks=self.inst_disks)) 187 188 return env
189
190 - def BuildHooksNodes(self):
191 """Build hooks nodes. 192 193 """ 194 nl = [self.cfg.GetMasterNode(), self.instance.primary_node] 195 196 if self.op.mode == constants.EXPORT_MODE_LOCAL: 197 nl.append(self.op.target_node_uuid) 198 199 return (nl, nl)
200
201 - def CheckPrereq(self):
202 """Check prerequisites. 203 204 This checks that the instance and node names are valid. 205 206 """ 207 self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name) 208 assert self.instance is not None, \ 209 "Cannot retrieve locked instance %s" % self.op.instance_name 210 CheckNodeOnline(self, self.instance.primary_node) 211 212 if (self.op.remove_instance and 213 self.instance.admin_state == constants.ADMINST_UP and 214 not self.op.shutdown): 215 raise errors.OpPrereqError("Can not remove instance without shutting it" 216 " down before", errors.ECODE_STATE) 217 218 if self.op.mode == constants.EXPORT_MODE_LOCAL: 219 self.dst_node = self.cfg.GetNodeInfo(self.op.target_node_uuid) 220 assert self.dst_node is not None 221 222 CheckNodeOnline(self, self.dst_node.uuid) 223 CheckNodeNotDrained(self, self.dst_node.uuid) 224 225 self._cds = None 226 self.dest_disk_info = None 227 self.dest_x509_ca = None 228 229 elif self.op.mode == constants.EXPORT_MODE_REMOTE: 230 self.dst_node = None 231 232 if len(self.op.target_node) != len(self.instance.disks): 233 raise errors.OpPrereqError(("Received destination information for %s" 234 " disks, but instance %s has %s disks") % 235 (len(self.op.target_node), 236 self.op.instance_name, 237 len(self.instance.disks)), 238 errors.ECODE_INVAL) 239 240 cds = GetClusterDomainSecret() 241 242 # Check X509 key name 243 try: 244 (key_name, hmac_digest, hmac_salt) = self.x509_key_name 245 except (TypeError, ValueError), err: 246 raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err, 247 errors.ECODE_INVAL) 248 249 if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt): 250 raise errors.OpPrereqError("HMAC for X509 key name is wrong", 251 errors.ECODE_INVAL) 252 253 # Load and verify CA 254 try: 255 (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds) 256 except OpenSSL.crypto.Error, err: 257 raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" % 258 (err, ), errors.ECODE_INVAL) 259 260 (errcode, msg) = utils.VerifyX509Certificate(cert, None, None) 261 if errcode is not None: 262 raise errors.OpPrereqError("Invalid destination X509 CA (%s)" % 263 (msg, ), errors.ECODE_INVAL) 264 265 self.dest_x509_ca = cert 266 267 # Verify target information 268 disk_info = [] 269 for idx, disk_data in enumerate(self.op.target_node): 270 try: 271 (host, port, magic) = \ 272 masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data) 273 except errors.GenericError, err: 274 raise errors.OpPrereqError("Target info for disk %s: %s" % 275 (idx, err), errors.ECODE_INVAL) 276 277 disk_info.append((host, port, magic)) 278 279 assert len(disk_info) == len(self.op.target_node) 280 self.dest_disk_info = disk_info 281 282 else: 283 raise errors.ProgrammerError("Unhandled export mode %r" % 284 self.op.mode) 285 286 # instance disk type verification 287 # TODO: Implement export support for file-based disks 288 for disk in self.cfg.GetInstanceDisks(self.instance.uuid): 289 if disk.dev_type in constants.DTS_FILEBASED: 290 raise errors.OpPrereqError("Export not supported for instances with" 291 " file-based disks", errors.ECODE_INVAL) 292 293 # Check prerequisites for zeroing 294 if self.op.zero_free_space: 295 # Check that user shutdown detection has been enabled 296 hvparams = self.cfg.GetClusterInfo().FillHV(self.instance) 297 if self.instance.hypervisor == constants.HT_KVM and \ 298 not hvparams.get(constants.HV_KVM_USER_SHUTDOWN, False): 299 raise errors.OpPrereqError("Instance shutdown detection must be " 300 "enabled for zeroing to work") 301 302 # Check that the instance is set to boot from the disk 303 if constants.HV_BOOT_ORDER in hvparams and \ 304 hvparams[constants.HV_BOOT_ORDER] != constants.HT_BO_DISK: 305 raise errors.OpPrereqError("Booting from disk must be set for zeroing " 306 "to work") 307 308 # Check that the zeroing image is set 309 if not self.cfg.GetZeroingImage(): 310 raise errors.OpPrereqError("A zeroing image must be set for zeroing to" 311 " work") 312 313 if self.op.zeroing_timeout_fixed is None: 314 self.op.zeroing_timeout_fixed = constants.HELPER_VM_STARTUP 315 316 if self.op.zeroing_timeout_per_mib is None: 317 self.op.zeroing_timeout_per_mib = constants.ZEROING_TIMEOUT_PER_MIB 318 319 else: 320 if (self.op.zeroing_timeout_fixed is not None or 321 self.op.zeroing_timeout_per_mib is not None): 322 raise errors.OpPrereqError("Zeroing timeout options can only be used" 323 " only with the --zero-free-space option") 324 325 self.secondary_nodes = \ 326 self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 327 self.inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 328 329 # Check if the compression tool is whitelisted 330 CheckCompressionTool(self, self.op.compress)
331
332 - def _CleanupExports(self, feedback_fn):
333 """Removes exports of current instance from all other nodes. 334 335 If an instance in a cluster with nodes A..D was exported to node C, its 336 exports will be removed from the nodes A, B and D. 337 338 """ 339 assert self.op.mode != constants.EXPORT_MODE_REMOTE 340 341 node_uuids = self.cfg.GetNodeList() 342 node_uuids.remove(self.dst_node.uuid) 343 344 # on one-node clusters nodelist will be empty after the removal 345 # if we proceed the backup would be removed because OpBackupQuery 346 # substitutes an empty list with the full cluster node list. 347 iname = self.instance.name 348 if node_uuids: 349 feedback_fn("Removing old exports for instance %s" % iname) 350 exportlist = self.rpc.call_export_list(node_uuids) 351 for node_uuid in exportlist: 352 if exportlist[node_uuid].fail_msg: 353 continue 354 if iname in exportlist[node_uuid].payload: 355 msg = self.rpc.call_export_remove(node_uuid, iname).fail_msg 356 if msg: 357 self.LogWarning("Could not remove older export for instance %s" 358 " on node %s: %s", iname, 359 self.cfg.GetNodeName(node_uuid), msg)
360
361 - def _InstanceDiskSizeSum(self):
362 """Calculates the size of all the disks of the instance used in this LU. 363 364 @rtype: int 365 @return: Size of the disks in MiB 366 367 """ 368 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 369 return sum([d.size for d in inst_disks])
370
371 - def ZeroFreeSpace(self, feedback_fn):
372 """Zeroes the free space on a shutdown instance. 373 374 @type feedback_fn: function 375 @param feedback_fn: Function used to log progress 376 377 """ 378 assert self.op.zeroing_timeout_fixed is not None 379 assert self.op.zeroing_timeout_per_mib is not None 380 381 zeroing_image = self.cfg.GetZeroingImage() 382 src_node_uuid = self.instance.primary_node 383 384 try: 385 disk_size = DetermineImageSize(self, zeroing_image, src_node_uuid) 386 except errors.OpExecError, err: 387 raise errors.OpExecError("Could not create temporary disk for zeroing:" 388 " %s", err) 389 390 # Calculate the sum prior to adding the temporary disk 391 instance_disks_size_sum = self._InstanceDiskSizeSum() 392 393 with TemporaryDisk(self, 394 self.instance, 395 [(constants.DT_PLAIN, constants.DISK_RDWR, disk_size)], 396 feedback_fn): 397 feedback_fn("Activating instance disks") 398 StartInstanceDisks(self, self.instance, False) 399 400 feedback_fn("Imaging disk with zeroing image") 401 ImageDisks(self, self.instance, zeroing_image) 402 403 feedback_fn("Starting instance with zeroing image") 404 result = self.rpc.call_instance_start(src_node_uuid, 405 (self.instance, [], []), 406 False, self.op.reason) 407 result.Raise("Could not start instance %s when using the zeroing image " 408 "%s" % (self.instance.name, zeroing_image)) 409 410 # First wait for the instance to start up 411 running_check = lambda: IsInstanceRunning(self, self.instance, 412 prereq=False) 413 instance_up = retry.SimpleRetry(True, running_check, 5.0, 414 self.op.shutdown_timeout) 415 if not instance_up: 416 raise errors.OpExecError("Could not boot instance when using the " 417 "zeroing image %s" % zeroing_image) 418 419 feedback_fn("Instance is up, now awaiting shutdown") 420 421 # Then for it to be finished, detected by its shutdown 422 timeout = self.op.zeroing_timeout_fixed + \ 423 self.op.zeroing_timeout_per_mib * instance_disks_size_sum 424 instance_up = retry.SimpleRetry(False, running_check, 20.0, timeout) 425 if instance_up: 426 self.LogWarning("Zeroing not completed prior to timeout; instance will" 427 "be shut down forcibly") 428 429 feedback_fn("Zeroing completed!")
430
431 - def Exec(self, feedback_fn):
432 """Export an instance to an image in the cluster. 433 434 """ 435 assert self.op.mode in constants.EXPORT_MODES 436 437 src_node_uuid = self.instance.primary_node 438 439 if self.op.shutdown: 440 # shutdown the instance, but not the disks 441 feedback_fn("Shutting down instance %s" % self.instance.name) 442 result = self.rpc.call_instance_shutdown(src_node_uuid, self.instance, 443 self.op.shutdown_timeout, 444 self.op.reason) 445 # TODO: Maybe ignore failures if ignore_remove_failures is set 446 result.Raise("Could not shutdown instance %s on" 447 " node %s" % (self.instance.name, 448 self.cfg.GetNodeName(src_node_uuid))) 449 450 if self.op.zero_free_space: 451 self.ZeroFreeSpace(feedback_fn) 452 453 activate_disks = not self.instance.disks_active 454 455 if activate_disks: 456 # Activate the instance disks if we're exporting a stopped instance 457 feedback_fn("Activating disks for %s" % self.instance.name) 458 StartInstanceDisks(self, self.instance, None) 459 self.instance = self.cfg.GetInstanceInfo(self.instance.uuid) 460 461 try: 462 helper = masterd.instance.ExportInstanceHelper(self, feedback_fn, 463 self.instance) 464 465 helper.CreateSnapshots() 466 try: 467 if (self.op.shutdown and 468 self.instance.admin_state == constants.ADMINST_UP and 469 not self.op.remove_instance): 470 assert self.instance.disks_active 471 feedback_fn("Starting instance %s" % self.instance.name) 472 result = self.rpc.call_instance_start(src_node_uuid, 473 (self.instance, None, None), 474 False, self.op.reason) 475 msg = result.fail_msg 476 if msg: 477 feedback_fn("Failed to start instance: %s" % msg) 478 ShutdownInstanceDisks(self, self.instance) 479 raise errors.OpExecError("Could not start instance: %s" % msg) 480 481 if self.op.mode == constants.EXPORT_MODE_LOCAL: 482 (fin_resu, dresults) = helper.LocalExport(self.dst_node, 483 self.op.compress) 484 elif self.op.mode == constants.EXPORT_MODE_REMOTE: 485 connect_timeout = constants.RIE_CONNECT_TIMEOUT 486 timeouts = masterd.instance.ImportExportTimeouts(connect_timeout) 487 488 (key_name, _, _) = self.x509_key_name 489 490 dest_ca_pem = \ 491 OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, 492 self.dest_x509_ca) 493 494 (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info, 495 key_name, dest_ca_pem, 496 self.op.compress, 497 timeouts) 498 finally: 499 helper.Cleanup() 500 501 # Check for backwards compatibility 502 assert len(dresults) == len(self.instance.disks) 503 assert compat.all(isinstance(i, bool) for i in dresults), \ 504 "Not all results are boolean: %r" % dresults 505 506 finally: 507 if activate_disks: 508 feedback_fn("Deactivating disks for %s" % self.instance.name) 509 ShutdownInstanceDisks(self, self.instance) 510 511 if not (compat.all(dresults) and fin_resu): 512 failures = [] 513 if not fin_resu: 514 failures.append("export finalization") 515 if not compat.all(dresults): 516 fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults) 517 if not dsk) 518 failures.append("disk export: disk(s) %s" % fdsk) 519 520 raise errors.OpExecError("Export failed, errors in %s" % 521 utils.CommaJoin(failures)) 522 523 # At this point, the export was successful, we can cleanup/finish 524 525 # Remove instance if requested 526 if self.op.remove_instance: 527 feedback_fn("Removing instance %s" % self.instance.name) 528 RemoveInstance(self, feedback_fn, self.instance, 529 self.op.ignore_remove_failures) 530 531 if self.op.mode == constants.EXPORT_MODE_LOCAL: 532 self._CleanupExports(feedback_fn) 533 534 return fin_resu, dresults
535 536
537 -class LUBackupRemove(NoHooksLU):
538 """Remove exports related to the named instance. 539 540 """ 541 REQ_BGL = False 542
543 - def ExpandNames(self):
544 self.needed_locks = { 545 # We need all nodes to be locked in order for RemoveExport to work, but 546 # we don't need to lock the instance itself, as nothing will happen to it 547 # (and we can remove exports also for a removed instance) 548 locking.LEVEL_NODE: locking.ALL_SET, 549 550 # Removing backups is quick, so blocking allocations is justified 551 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 552 } 553 554 # Allocations should be stopped while this LU runs with node locks, but it 555 # doesn't have to be exclusive 556 self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
557
558 - def Exec(self, feedback_fn):
559 """Remove any export. 560 561 """ 562 (_, inst_name) = self.cfg.ExpandInstanceName(self.op.instance_name) 563 # If the instance was not found we'll try with the name that was passed in. 564 # This will only work if it was an FQDN, though. 565 fqdn_warn = False 566 if not inst_name: 567 fqdn_warn = True 568 inst_name = self.op.instance_name 569 570 locked_nodes = self.owned_locks(locking.LEVEL_NODE) 571 exportlist = self.rpc.call_export_list(locked_nodes) 572 found = False 573 for node_uuid in exportlist: 574 msg = exportlist[node_uuid].fail_msg 575 if msg: 576 self.LogWarning("Failed to query node %s (continuing): %s", 577 self.cfg.GetNodeName(node_uuid), msg) 578 continue 579 if inst_name in exportlist[node_uuid].payload: 580 found = True 581 result = self.rpc.call_export_remove(node_uuid, inst_name) 582 msg = result.fail_msg 583 if msg: 584 logging.error("Could not remove export for instance %s" 585 " on node %s: %s", inst_name, 586 self.cfg.GetNodeName(node_uuid), msg) 587 588 if fqdn_warn and not found: 589 feedback_fn("Export not found. If trying to remove an export belonging" 590 " to a deleted instance please use its Fully Qualified" 591 " Domain Name.")
592