Package ganeti :: Package cmdlib :: Module instance_storage
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_storage

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with storage of instances.""" 
  32   
  33  import itertools 
  34  import logging 
  35  import os 
  36  import time 
  37   
  38  from ganeti import compat 
  39  from ganeti import constants 
  40  from ganeti import errors 
  41  from ganeti import ht 
  42  from ganeti import locking 
  43  from ganeti.masterd import iallocator 
  44  from ganeti import objects 
  45  from ganeti import utils 
  46  from ganeti import rpc 
  47  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet 
  48  from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \ 
  49    AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \ 
  50    CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \ 
  51    IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \ 
  52    CheckDiskTemplateEnabled 
  53  from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \ 
  54    CopyLockList, ReleaseLocks, CheckNodeVmCapable, \ 
  55    BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy 
  56   
  57  import ganeti.masterd.instance 
  58   
  59   
  60  _DISK_TEMPLATE_NAME_PREFIX = { 
  61    constants.DT_PLAIN: "", 
  62    constants.DT_RBD: ".rbd", 
  63    constants.DT_EXT: ".ext", 
  64    constants.DT_FILE: ".file", 
  65    constants.DT_SHARED_FILE: ".sharedfile", 
  66    } 
67 68 69 -def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open, 70 excl_stor):
71 """Create a single block device on a given node. 72 73 This will not recurse over children of the device, so they must be 74 created in advance. 75 76 @param lu: the lu on whose behalf we execute 77 @param node_uuid: the node on which to create the device 78 @type instance: L{objects.Instance} 79 @param instance: the instance which owns the device 80 @type device: L{objects.Disk} 81 @param device: the device to create 82 @param info: the extra 'metadata' we should attach to the device 83 (this will be represented as a LVM tag) 84 @type force_open: boolean 85 @param force_open: this parameter will be passes to the 86 L{backend.BlockdevCreate} function where it specifies 87 whether we run on primary or not, and it affects both 88 the child assembly and the device own Open() execution 89 @type excl_stor: boolean 90 @param excl_stor: Whether exclusive_storage is active for the node 91 92 """ 93 result = lu.rpc.call_blockdev_create(node_uuid, (device, instance), 94 device.size, instance.name, force_open, 95 info, excl_stor) 96 result.Raise("Can't create block device %s on" 97 " node %s for instance %s" % (device, 98 lu.cfg.GetNodeName(node_uuid), 99 instance.name))
100
101 102 -def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create, 103 info, force_open, excl_stor):
104 """Create a tree of block devices on a given node. 105 106 If this device type has to be created on secondaries, create it and 107 all its children. 108 109 If not, just recurse to children keeping the same 'force' value. 110 111 @attention: The device has to be annotated already. 112 113 @param lu: the lu on whose behalf we execute 114 @param node_uuid: the node on which to create the device 115 @type instance: L{objects.Instance} 116 @param instance: the instance which owns the device 117 @type device: L{objects.Disk} 118 @param device: the device to create 119 @type force_create: boolean 120 @param force_create: whether to force creation of this device; this 121 will be change to True whenever we find a device which has 122 CreateOnSecondary() attribute 123 @param info: the extra 'metadata' we should attach to the device 124 (this will be represented as a LVM tag) 125 @type force_open: boolean 126 @param force_open: this parameter will be passes to the 127 L{backend.BlockdevCreate} function where it specifies 128 whether we run on primary or not, and it affects both 129 the child assembly and the device own Open() execution 130 @type excl_stor: boolean 131 @param excl_stor: Whether exclusive_storage is active for the node 132 133 @return: list of created devices 134 """ 135 created_devices = [] 136 try: 137 if device.CreateOnSecondary(): 138 force_create = True 139 140 if device.children: 141 for child in device.children: 142 devs = _CreateBlockDevInner(lu, node_uuid, instance, child, 143 force_create, info, force_open, excl_stor) 144 created_devices.extend(devs) 145 146 if not force_create: 147 return created_devices 148 149 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open, 150 excl_stor) 151 # The device has been completely created, so there is no point in keeping 152 # its subdevices in the list. We just add the device itself instead. 153 created_devices = [(node_uuid, device)] 154 return created_devices 155 156 except errors.DeviceCreationError, e: 157 e.created_devices.extend(created_devices) 158 raise e 159 except errors.OpExecError, e: 160 raise errors.DeviceCreationError(str(e), created_devices)
161
162 163 -def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
164 """Whether exclusive_storage is in effect for the given node. 165 166 @type cfg: L{config.ConfigWriter} 167 @param cfg: The cluster configuration 168 @type node_uuid: string 169 @param node_uuid: The node UUID 170 @rtype: bool 171 @return: The effective value of exclusive_storage 172 @raise errors.OpPrereqError: if no node exists with the given name 173 174 """ 175 ni = cfg.GetNodeInfo(node_uuid) 176 if ni is None: 177 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid, 178 errors.ECODE_NOENT) 179 return IsExclusiveStorageEnabledNode(cfg, ni)
180
181 182 -def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info, 183 force_open):
184 """Wrapper around L{_CreateBlockDevInner}. 185 186 This method annotates the root device first. 187 188 """ 189 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg) 190 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid) 191 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info, 192 force_open, excl_stor)
193
194 195 -def _UndoCreateDisks(lu, disks_created, instance):
196 """Undo the work performed by L{CreateDisks}. 197 198 This function is called in case of an error to undo the work of 199 L{CreateDisks}. 200 201 @type lu: L{LogicalUnit} 202 @param lu: the logical unit on whose behalf we execute 203 @param disks_created: the result returned by L{CreateDisks} 204 @type instance: L{objects.Instance} 205 @param instance: the instance for which disks were created 206 207 """ 208 for (node_uuid, disk) in disks_created: 209 result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance)) 210 result.Warn("Failed to remove newly-created disk %s on node %s" % 211 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
212
213 214 -def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
215 """Create all disks for an instance. 216 217 This abstracts away some work from AddInstance. 218 219 @type lu: L{LogicalUnit} 220 @param lu: the logical unit on whose behalf we execute 221 @type instance: L{objects.Instance} 222 @param instance: the instance whose disks we should create 223 @type to_skip: list 224 @param to_skip: list of indices to skip 225 @type target_node_uuid: string 226 @param target_node_uuid: if passed, overrides the target node for creation 227 @type disks: list of {objects.Disk} 228 @param disks: the disks to create; if not specified, all the disks of the 229 instance are created 230 @return: information about the created disks, to be used to call 231 L{_UndoCreateDisks} 232 @raise errors.OpPrereqError: in case of error 233 234 """ 235 info = GetInstanceInfoText(instance) 236 if target_node_uuid is None: 237 pnode_uuid = instance.primary_node 238 all_node_uuids = instance.all_nodes 239 else: 240 pnode_uuid = target_node_uuid 241 all_node_uuids = [pnode_uuid] 242 243 if disks is None: 244 disks = instance.disks 245 246 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template) 247 248 if instance.disk_template in constants.DTS_FILEBASED: 249 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) 250 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir) 251 252 result.Raise("Failed to create directory '%s' on" 253 " node %s" % (file_storage_dir, 254 lu.cfg.GetNodeName(pnode_uuid))) 255 256 disks_created = [] 257 for idx, device in enumerate(disks): 258 if to_skip and idx in to_skip: 259 continue 260 logging.info("Creating disk %s for instance '%s'", idx, instance.name) 261 for node_uuid in all_node_uuids: 262 f_create = node_uuid == pnode_uuid 263 try: 264 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info, 265 f_create) 266 disks_created.append((node_uuid, device)) 267 except errors.DeviceCreationError, e: 268 logging.warning("Creating disk %s for instance '%s' failed", 269 idx, instance.name) 270 disks_created.extend(e.created_devices) 271 _UndoCreateDisks(lu, disks_created, instance) 272 raise errors.OpExecError(e.message) 273 return disks_created
274
275 276 -def ComputeDiskSizePerVG(disk_template, disks):
277 """Compute disk size requirements in the volume group 278 279 """ 280 def _compute(disks, payload): 281 """Universal algorithm. 282 283 """ 284 vgs = {} 285 for disk in disks: 286 vgs[disk[constants.IDISK_VG]] = \ 287 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload 288 289 return vgs
290 291 # Required free disk space as a function of disk and swap space 292 req_size_dict = { 293 constants.DT_DISKLESS: {}, 294 constants.DT_PLAIN: _compute(disks, 0), 295 # 128 MB are added for drbd metadata for each disk 296 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE), 297 constants.DT_FILE: {}, 298 constants.DT_SHARED_FILE: {}, 299 } 300 301 if disk_template not in req_size_dict: 302 raise errors.ProgrammerError("Disk template '%s' size requirement" 303 " is unknown" % disk_template) 304 305 return req_size_dict[disk_template] 306
307 308 -def ComputeDisks(op, default_vg):
309 """Computes the instance disks. 310 311 @param op: The instance opcode 312 @param default_vg: The default_vg to assume 313 314 @return: The computed disks 315 316 """ 317 disks = [] 318 for disk in op.disks: 319 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR) 320 if mode not in constants.DISK_ACCESS_SET: 321 raise errors.OpPrereqError("Invalid disk access mode '%s'" % 322 mode, errors.ECODE_INVAL) 323 size = disk.get(constants.IDISK_SIZE, None) 324 if size is None: 325 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) 326 try: 327 size = int(size) 328 except (TypeError, ValueError): 329 raise errors.OpPrereqError("Invalid disk size '%s'" % size, 330 errors.ECODE_INVAL) 331 332 ext_provider = disk.get(constants.IDISK_PROVIDER, None) 333 if ext_provider and op.disk_template != constants.DT_EXT: 334 raise errors.OpPrereqError("The '%s' option is only valid for the %s" 335 " disk template, not %s" % 336 (constants.IDISK_PROVIDER, constants.DT_EXT, 337 op.disk_template), errors.ECODE_INVAL) 338 339 data_vg = disk.get(constants.IDISK_VG, default_vg) 340 name = disk.get(constants.IDISK_NAME, None) 341 if name is not None and name.lower() == constants.VALUE_NONE: 342 name = None 343 new_disk = { 344 constants.IDISK_SIZE: size, 345 constants.IDISK_MODE: mode, 346 constants.IDISK_VG: data_vg, 347 constants.IDISK_NAME: name, 348 } 349 350 for key in [ 351 constants.IDISK_METAVG, 352 constants.IDISK_ADOPT, 353 constants.IDISK_SPINDLES, 354 ]: 355 if key in disk: 356 new_disk[key] = disk[key] 357 358 # For extstorage, demand the `provider' option and add any 359 # additional parameters (ext-params) to the dict 360 if op.disk_template == constants.DT_EXT: 361 if ext_provider: 362 new_disk[constants.IDISK_PROVIDER] = ext_provider 363 for key in disk: 364 if key not in constants.IDISK_PARAMS: 365 new_disk[key] = disk[key] 366 else: 367 raise errors.OpPrereqError("Missing provider for template '%s'" % 368 constants.DT_EXT, errors.ECODE_INVAL) 369 370 disks.append(new_disk) 371 372 return disks
373
374 375 -def CheckRADOSFreeSpace():
376 """Compute disk size requirements inside the RADOS cluster. 377 378 """ 379 # For the RADOS cluster we assume there is always enough space. 380 pass
381
382 383 -def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names, 384 iv_name, p_minor, s_minor):
385 """Generate a drbd8 device complete with its children. 386 387 """ 388 assert len(vgnames) == len(names) == 2 389 port = lu.cfg.AllocatePort() 390 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) 391 392 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size, 393 logical_id=(vgnames[0], names[0]), 394 params={}) 395 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 396 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN, 397 size=constants.DRBD_META_SIZE, 398 logical_id=(vgnames[1], names[1]), 399 params={}) 400 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 401 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size, 402 logical_id=(primary_uuid, secondary_uuid, port, 403 p_minor, s_minor, 404 shared_secret), 405 children=[dev_data, dev_meta], 406 iv_name=iv_name, params={}) 407 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 408 return drbd_dev
409
410 411 -def GenerateDiskTemplate( 412 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids, 413 disk_info, file_storage_dir, file_driver, base_index, 414 feedback_fn, full_disk_params):
415 """Generate the entire disk layout for a given template type. 416 417 """ 418 vgname = lu.cfg.GetVGName() 419 disk_count = len(disk_info) 420 disks = [] 421 422 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name) 423 424 if template_name == constants.DT_DISKLESS: 425 pass 426 elif template_name == constants.DT_DRBD8: 427 if len(secondary_node_uuids) != 1: 428 raise errors.ProgrammerError("Wrong template configuration") 429 remote_node_uuid = secondary_node_uuids[0] 430 minors = lu.cfg.AllocateDRBDMinor( 431 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid) 432 433 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name, 434 full_disk_params) 435 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] 436 437 names = [] 438 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) 439 for i in range(disk_count)]): 440 names.append(lv_prefix + "_data") 441 names.append(lv_prefix + "_meta") 442 for idx, disk in enumerate(disk_info): 443 disk_index = idx + base_index 444 data_vg = disk.get(constants.IDISK_VG, vgname) 445 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg) 446 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid, 447 disk[constants.IDISK_SIZE], 448 [data_vg, meta_vg], 449 names[idx * 2:idx * 2 + 2], 450 "disk/%d" % disk_index, 451 minors[idx * 2], minors[idx * 2 + 1]) 452 disk_dev.mode = disk[constants.IDISK_MODE] 453 disk_dev.name = disk.get(constants.IDISK_NAME, None) 454 disks.append(disk_dev) 455 else: 456 if secondary_node_uuids: 457 raise errors.ProgrammerError("Wrong template configuration") 458 459 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None) 460 if name_prefix is None: 461 names = None 462 else: 463 names = _GenerateUniqueNames(lu, ["%s.disk%s" % 464 (name_prefix, base_index + i) 465 for i in range(disk_count)]) 466 467 if template_name == constants.DT_PLAIN: 468 469 def logical_id_fn(idx, _, disk): 470 vg = disk.get(constants.IDISK_VG, vgname) 471 return (vg, names[idx])
472 473 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE): 474 logical_id_fn = \ 475 lambda _, disk_index, disk: (file_driver, 476 "%s/%s" % (file_storage_dir, 477 names[idx])) 478 elif template_name == constants.DT_BLOCK: 479 logical_id_fn = \ 480 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL, 481 disk[constants.IDISK_ADOPT]) 482 elif template_name == constants.DT_RBD: 483 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx]) 484 elif template_name == constants.DT_EXT: 485 def logical_id_fn(idx, _, disk): 486 provider = disk.get(constants.IDISK_PROVIDER, None) 487 if provider is None: 488 raise errors.ProgrammerError("Disk template is %s, but '%s' is" 489 " not found", constants.DT_EXT, 490 constants.IDISK_PROVIDER) 491 return (provider, names[idx]) 492 else: 493 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) 494 495 dev_type = template_name 496 497 for idx, disk in enumerate(disk_info): 498 params = {} 499 # Only for the Ext template add disk_info to params 500 if template_name == constants.DT_EXT: 501 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER] 502 for key in disk: 503 if key not in constants.IDISK_PARAMS: 504 params[key] = disk[key] 505 disk_index = idx + base_index 506 size = disk[constants.IDISK_SIZE] 507 feedback_fn("* disk %s, size %s" % 508 (disk_index, utils.FormatUnit(size, "h"))) 509 disk_dev = objects.Disk(dev_type=dev_type, size=size, 510 logical_id=logical_id_fn(idx, disk_index, disk), 511 iv_name="disk/%d" % disk_index, 512 mode=disk[constants.IDISK_MODE], 513 params=params, 514 spindles=disk.get(constants.IDISK_SPINDLES)) 515 disk_dev.name = disk.get(constants.IDISK_NAME, None) 516 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 517 disks.append(disk_dev) 518 519 return disks 520
521 522 -def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
523 """Check the presence of the spindle options with exclusive_storage. 524 525 @type diskdict: dict 526 @param diskdict: disk parameters 527 @type es_flag: bool 528 @param es_flag: the effective value of the exlusive_storage flag 529 @type required: bool 530 @param required: whether spindles are required or just optional 531 @raise errors.OpPrereqError when spindles are given and they should not 532 533 """ 534 if (not es_flag and constants.IDISK_SPINDLES in diskdict and 535 diskdict[constants.IDISK_SPINDLES] is not None): 536 raise errors.OpPrereqError("Spindles in instance disks cannot be specified" 537 " when exclusive storage is not active", 538 errors.ECODE_INVAL) 539 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or 540 diskdict[constants.IDISK_SPINDLES] is None)): 541 raise errors.OpPrereqError("You must specify spindles in instance disks" 542 " when exclusive storage is active", 543 errors.ECODE_INVAL)
544
545 546 -class LUInstanceRecreateDisks(LogicalUnit):
547 """Recreate an instance's missing disks. 548 549 """ 550 HPATH = "instance-recreate-disks" 551 HTYPE = constants.HTYPE_INSTANCE 552 REQ_BGL = False 553 554 _MODIFYABLE = compat.UniqueFrozenset([ 555 constants.IDISK_SIZE, 556 constants.IDISK_MODE, 557 constants.IDISK_SPINDLES, 558 ]) 559 560 # New or changed disk parameters may have different semantics 561 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([ 562 constants.IDISK_ADOPT, 563 564 # TODO: Implement support changing VG while recreating 565 constants.IDISK_VG, 566 constants.IDISK_METAVG, 567 constants.IDISK_PROVIDER, 568 constants.IDISK_NAME, 569 ])) 570
571 - def _RunAllocator(self):
572 """Run the allocator based on input opcode. 573 574 """ 575 be_full = self.cfg.GetClusterInfo().FillBE(self.instance) 576 577 # FIXME 578 # The allocator should actually run in "relocate" mode, but current 579 # allocators don't support relocating all the nodes of an instance at 580 # the same time. As a workaround we use "allocate" mode, but this is 581 # suboptimal for two reasons: 582 # - The instance name passed to the allocator is present in the list of 583 # existing instances, so there could be a conflict within the 584 # internal structures of the allocator. This doesn't happen with the 585 # current allocators, but it's a liability. 586 # - The allocator counts the resources used by the instance twice: once 587 # because the instance exists already, and once because it tries to 588 # allocate a new instance. 589 # The allocator could choose some of the nodes on which the instance is 590 # running, but that's not a problem. If the instance nodes are broken, 591 # they should be already be marked as drained or offline, and hence 592 # skipped by the allocator. If instance disks have been lost for other 593 # reasons, then recreating the disks on the same nodes should be fine. 594 disk_template = self.instance.disk_template 595 spindle_use = be_full[constants.BE_SPINDLE_USE] 596 disks = [{ 597 constants.IDISK_SIZE: d.size, 598 constants.IDISK_MODE: d.mode, 599 constants.IDISK_SPINDLES: d.spindles, 600 } for d in self.instance.disks] 601 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name, 602 disk_template=disk_template, 603 tags=list(self.instance.GetTags()), 604 os=self.instance.os, 605 nics=[{}], 606 vcpus=be_full[constants.BE_VCPUS], 607 memory=be_full[constants.BE_MAXMEM], 608 spindle_use=spindle_use, 609 disks=disks, 610 hypervisor=self.instance.hypervisor, 611 node_whitelist=None) 612 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 613 614 ial.Run(self.op.iallocator) 615 616 assert req.RequiredNodes() == len(self.instance.all_nodes) 617 618 if not ial.success: 619 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 620 " %s" % (self.op.iallocator, ial.info), 621 errors.ECODE_NORES) 622 623 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result) 624 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 625 self.op.instance_name, self.op.iallocator, 626 utils.CommaJoin(self.op.nodes))
627
628 - def CheckArguments(self):
629 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]): 630 # Normalize and convert deprecated list of disk indices 631 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] 632 633 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks)) 634 if duplicates: 635 raise errors.OpPrereqError("Some disks have been specified more than" 636 " once: %s" % utils.CommaJoin(duplicates), 637 errors.ECODE_INVAL) 638 639 # We don't want _CheckIAllocatorOrNode selecting the default iallocator 640 # when neither iallocator nor nodes are specified 641 if self.op.iallocator or self.op.nodes: 642 CheckIAllocatorOrNode(self, "iallocator", "nodes") 643 644 for (idx, params) in self.op.disks: 645 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) 646 unsupported = frozenset(params.keys()) - self._MODIFYABLE 647 if unsupported: 648 raise errors.OpPrereqError("Parameters for disk %s try to change" 649 " unmodifyable parameter(s): %s" % 650 (idx, utils.CommaJoin(unsupported)), 651 errors.ECODE_INVAL)
652
653 - def ExpandNames(self):
654 self._ExpandAndLockInstance() 655 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 656 657 if self.op.nodes: 658 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes) 659 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids) 660 else: 661 self.needed_locks[locking.LEVEL_NODE] = [] 662 if self.op.iallocator: 663 # iallocator will select a new node in the same group 664 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 665 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 666 667 self.needed_locks[locking.LEVEL_NODE_RES] = []
668
669 - def DeclareLocks(self, level):
670 if level == locking.LEVEL_NODEGROUP: 671 assert self.op.iallocator is not None 672 assert not self.op.nodes 673 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 674 self.share_locks[locking.LEVEL_NODEGROUP] = 1 675 # Lock the primary group used by the instance optimistically; this 676 # requires going via the node before it's locked, requiring 677 # verification later on 678 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 679 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True) 680 681 elif level == locking.LEVEL_NODE: 682 # If an allocator is used, then we lock all the nodes in the current 683 # instance group, as we don't know yet which ones will be selected; 684 # if we replace the nodes without using an allocator, locks are 685 # already declared in ExpandNames; otherwise, we need to lock all the 686 # instance nodes for disk re-creation 687 if self.op.iallocator: 688 assert not self.op.nodes 689 assert not self.needed_locks[locking.LEVEL_NODE] 690 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1 691 692 # Lock member nodes of the group of the primary node 693 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP): 694 self.needed_locks[locking.LEVEL_NODE].extend( 695 self.cfg.GetNodeGroup(group_uuid).members) 696 697 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) 698 elif not self.op.nodes: 699 self._LockInstancesNodes(primary_only=False) 700 elif level == locking.LEVEL_NODE_RES: 701 # Copy node locks 702 self.needed_locks[locking.LEVEL_NODE_RES] = \ 703 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
704
705 - def BuildHooksEnv(self):
706 """Build hooks env. 707 708 This runs on master, primary and secondary nodes of the instance. 709 710 """ 711 return BuildInstanceHookEnvByObject(self, self.instance)
712
713 - def BuildHooksNodes(self):
714 """Build hooks nodes. 715 716 """ 717 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) 718 return (nl, nl)
719
720 - def CheckPrereq(self):
721 """Check prerequisites. 722 723 This checks that the instance is in the cluster and is not running. 724 725 """ 726 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 727 assert instance is not None, \ 728 "Cannot retrieve locked instance %s" % self.op.instance_name 729 if self.op.node_uuids: 730 if len(self.op.node_uuids) != len(instance.all_nodes): 731 raise errors.OpPrereqError("Instance %s currently has %d nodes, but" 732 " %d replacement nodes were specified" % 733 (instance.name, len(instance.all_nodes), 734 len(self.op.node_uuids)), 735 errors.ECODE_INVAL) 736 assert instance.disk_template != constants.DT_DRBD8 or \ 737 len(self.op.node_uuids) == 2 738 assert instance.disk_template != constants.DT_PLAIN or \ 739 len(self.op.node_uuids) == 1 740 primary_node = self.op.node_uuids[0] 741 else: 742 primary_node = instance.primary_node 743 if not self.op.iallocator: 744 CheckNodeOnline(self, primary_node) 745 746 if instance.disk_template == constants.DT_DISKLESS: 747 raise errors.OpPrereqError("Instance '%s' has no disks" % 748 self.op.instance_name, errors.ECODE_INVAL) 749 750 # Verify if node group locks are still correct 751 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 752 if owned_groups: 753 # Node group locks are acquired only for the primary node (and only 754 # when the allocator is used) 755 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups, 756 primary_only=True) 757 758 # if we replace nodes *and* the old primary is offline, we don't 759 # check the instance state 760 old_pnode = self.cfg.GetNodeInfo(instance.primary_node) 761 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline): 762 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, 763 msg="cannot recreate disks") 764 765 if self.op.disks: 766 self.disks = dict(self.op.disks) 767 else: 768 self.disks = dict((idx, {}) for idx in range(len(instance.disks))) 769 770 maxidx = max(self.disks.keys()) 771 if maxidx >= len(instance.disks): 772 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, 773 errors.ECODE_INVAL) 774 775 if ((self.op.node_uuids or self.op.iallocator) and 776 sorted(self.disks.keys()) != range(len(instance.disks))): 777 raise errors.OpPrereqError("Can't recreate disks partially and" 778 " change the nodes at the same time", 779 errors.ECODE_INVAL) 780 781 self.instance = instance 782 783 if self.op.iallocator: 784 self._RunAllocator() 785 # Release unneeded node and node resource locks 786 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids) 787 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids) 788 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) 789 790 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) 791 792 if self.op.node_uuids: 793 node_uuids = self.op.node_uuids 794 else: 795 node_uuids = instance.all_nodes 796 excl_stor = compat.any( 797 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values() 798 ) 799 for new_params in self.disks.values(): 800 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
801
802 - def Exec(self, feedback_fn):
803 """Recreate the disks. 804 805 """ 806 assert (self.owned_locks(locking.LEVEL_NODE) == 807 self.owned_locks(locking.LEVEL_NODE_RES)) 808 809 to_skip = [] 810 mods = [] # keeps track of needed changes 811 812 for idx, disk in enumerate(self.instance.disks): 813 try: 814 changes = self.disks[idx] 815 except KeyError: 816 # Disk should not be recreated 817 to_skip.append(idx) 818 continue 819 820 # update secondaries for disks, if needed 821 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8: 822 # need to update the nodes and minors 823 assert len(self.op.node_uuids) == 2 824 assert len(disk.logical_id) == 6 # otherwise disk internals 825 # have changed 826 (_, _, old_port, _, _, old_secret) = disk.logical_id 827 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids, 828 self.instance.uuid) 829 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port, 830 new_minors[0], new_minors[1], old_secret) 831 assert len(disk.logical_id) == len(new_id) 832 else: 833 new_id = None 834 835 mods.append((idx, new_id, changes)) 836 837 # now that we have passed all asserts above, we can apply the mods 838 # in a single run (to avoid partial changes) 839 for idx, new_id, changes in mods: 840 disk = self.instance.disks[idx] 841 if new_id is not None: 842 assert disk.dev_type == constants.DT_DRBD8 843 disk.logical_id = new_id 844 if changes: 845 disk.Update(size=changes.get(constants.IDISK_SIZE, None), 846 mode=changes.get(constants.IDISK_MODE, None), 847 spindles=changes.get(constants.IDISK_SPINDLES, None)) 848 849 # change primary node, if needed 850 if self.op.node_uuids: 851 self.instance.primary_node = self.op.node_uuids[0] 852 self.LogWarning("Changing the instance's nodes, you will have to" 853 " remove any disks left on the older nodes manually") 854 855 if self.op.node_uuids: 856 self.cfg.Update(self.instance, feedback_fn) 857 858 # All touched nodes must be locked 859 mylocks = self.owned_locks(locking.LEVEL_NODE) 860 assert mylocks.issuperset(frozenset(self.instance.all_nodes)) 861 new_disks = CreateDisks(self, self.instance, to_skip=to_skip) 862 863 # TODO: Release node locks before wiping, or explain why it's not possible 864 if self.cfg.GetClusterInfo().prealloc_wipe_disks: 865 wipedisks = [(idx, disk, 0) 866 for (idx, disk) in enumerate(self.instance.disks) 867 if idx not in to_skip] 868 WipeOrCleanupDisks(self, self.instance, disks=wipedisks, 869 cleanup=new_disks)
870
871 872 -def _PerformNodeInfoCall(lu, node_uuids, vg):
873 """Prepares the input and performs a node info call. 874 875 @type lu: C{LogicalUnit} 876 @param lu: a logical unit from which we get configuration data 877 @type node_uuids: list of string 878 @param node_uuids: list of node UUIDs to perform the call for 879 @type vg: string 880 @param vg: the volume group's name 881 882 """ 883 lvm_storage_units = [(constants.ST_LVM_VG, vg)] 884 storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units, 885 node_uuids) 886 hvname = lu.cfg.GetHypervisorType() 887 hvparams = lu.cfg.GetClusterInfo().hvparams 888 nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units, 889 [(hvname, hvparams[hvname])]) 890 return nodeinfo
891
892 893 -def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
894 """Checks the vg capacity for a given node. 895 896 @type node_info: tuple (_, list of dicts, _) 897 @param node_info: the result of the node info call for one node 898 @type node_name: string 899 @param node_name: the name of the node 900 @type vg: string 901 @param vg: volume group name 902 @type requested: int 903 @param requested: the amount of disk in MiB to check for 904 @raise errors.OpPrereqError: if the node doesn't have enough disk, 905 or we cannot check the node 906 907 """ 908 (_, space_info, _) = node_info 909 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType( 910 space_info, constants.ST_LVM_VG) 911 if not lvm_vg_info: 912 raise errors.OpPrereqError("Can't retrieve storage information for LVM") 913 vg_free = lvm_vg_info.get("storage_free", None) 914 if not isinstance(vg_free, int): 915 raise errors.OpPrereqError("Can't compute free disk space on node" 916 " %s for vg %s, result was '%s'" % 917 (node_name, vg, vg_free), errors.ECODE_ENVIRON) 918 if requested > vg_free: 919 raise errors.OpPrereqError("Not enough disk space on target node %s" 920 " vg %s: required %d MiB, available %d MiB" % 921 (node_name, vg, requested, vg_free), 922 errors.ECODE_NORES)
923
924 925 -def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
926 """Checks if nodes have enough free disk space in the specified VG. 927 928 This function checks if all given nodes have the needed amount of 929 free disk. In case any node has less disk or we cannot get the 930 information from the node, this function raises an OpPrereqError 931 exception. 932 933 @type lu: C{LogicalUnit} 934 @param lu: a logical unit from which we get configuration data 935 @type node_uuids: C{list} 936 @param node_uuids: the list of node UUIDs to check 937 @type vg: C{str} 938 @param vg: the volume group to check 939 @type requested: C{int} 940 @param requested: the amount of disk in MiB to check for 941 @raise errors.OpPrereqError: if the node doesn't have enough disk, 942 or we cannot check the node 943 944 """ 945 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg) 946 for node_uuid in node_uuids: 947 node_name = lu.cfg.GetNodeName(node_uuid) 948 info = nodeinfo[node_uuid] 949 info.Raise("Cannot get current information from node %s" % node_name, 950 prereq=True, ecode=errors.ECODE_ENVIRON) 951 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
952
953 954 -def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
955 """Checks if nodes have enough free disk space in all the VGs. 956 957 This function checks if all given nodes have the needed amount of 958 free disk. In case any node has less disk or we cannot get the 959 information from the node, this function raises an OpPrereqError 960 exception. 961 962 @type lu: C{LogicalUnit} 963 @param lu: a logical unit from which we get configuration data 964 @type node_uuids: C{list} 965 @param node_uuids: the list of node UUIDs to check 966 @type req_sizes: C{dict} 967 @param req_sizes: the hash of vg and corresponding amount of disk in 968 MiB to check for 969 @raise errors.OpPrereqError: if the node doesn't have enough disk, 970 or we cannot check the node 971 972 """ 973 for vg, req_size in req_sizes.items(): 974 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
975
976 977 -def _DiskSizeInBytesToMebibytes(lu, size):
978 """Converts a disk size in bytes to mebibytes. 979 980 Warns and rounds up if the size isn't an even multiple of 1 MiB. 981 982 """ 983 (mib, remainder) = divmod(size, 1024 * 1024) 984 985 if remainder != 0: 986 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up" 987 " to not overwrite existing data (%s bytes will not be" 988 " wiped)", (1024 * 1024) - remainder) 989 mib += 1 990 991 return mib
992
993 994 -def _CalcEta(time_taken, written, total_size):
995 """Calculates the ETA based on size written and total size. 996 997 @param time_taken: The time taken so far 998 @param written: amount written so far 999 @param total_size: The total size of data to be written 1000 @return: The remaining time in seconds 1001 1002 """ 1003 avg_time = time_taken / float(written) 1004 return (total_size - written) * avg_time
1005
1006 1007 -def WipeDisks(lu, instance, disks=None):
1008 """Wipes instance disks. 1009 1010 @type lu: L{LogicalUnit} 1011 @param lu: the logical unit on whose behalf we execute 1012 @type instance: L{objects.Instance} 1013 @param instance: the instance whose disks we should create 1014 @type disks: None or list of tuple of (number, L{objects.Disk}, number) 1015 @param disks: Disk details; tuple contains disk index, disk object and the 1016 start offset 1017 1018 """ 1019 node_uuid = instance.primary_node 1020 node_name = lu.cfg.GetNodeName(node_uuid) 1021 1022 if disks is None: 1023 disks = [(idx, disk, 0) 1024 for (idx, disk) in enumerate(instance.disks)] 1025 1026 logging.info("Pausing synchronization of disks of instance '%s'", 1027 instance.name) 1028 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1029 (map(compat.snd, disks), 1030 instance), 1031 True) 1032 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name) 1033 1034 for idx, success in enumerate(result.payload): 1035 if not success: 1036 logging.warn("Pausing synchronization of disk %s of instance '%s'" 1037 " failed", idx, instance.name) 1038 1039 try: 1040 for (idx, device, offset) in disks: 1041 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but 1042 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors. 1043 wipe_chunk_size = \ 1044 int(min(constants.MAX_WIPE_CHUNK, 1045 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT)) 1046 1047 size = device.size 1048 last_output = 0 1049 start_time = time.time() 1050 1051 if offset == 0: 1052 info_text = "" 1053 else: 1054 info_text = (" (from %s to %s)" % 1055 (utils.FormatUnit(offset, "h"), 1056 utils.FormatUnit(size, "h"))) 1057 1058 lu.LogInfo("* Wiping disk %s%s", idx, info_text) 1059 1060 logging.info("Wiping disk %d for instance %s on node %s using" 1061 " chunk size %s", idx, instance.name, node_name, 1062 wipe_chunk_size) 1063 1064 while offset < size: 1065 wipe_size = min(wipe_chunk_size, size - offset) 1066 1067 logging.debug("Wiping disk %d, offset %s, chunk %s", 1068 idx, offset, wipe_size) 1069 1070 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance), 1071 offset, wipe_size) 1072 result.Raise("Could not wipe disk %d at offset %d for size %d" % 1073 (idx, offset, wipe_size)) 1074 1075 now = time.time() 1076 offset += wipe_size 1077 if now - last_output >= 60: 1078 eta = _CalcEta(now - start_time, offset, size) 1079 lu.LogInfo(" - done: %.1f%% ETA: %s", 1080 offset / float(size) * 100, utils.FormatSeconds(eta)) 1081 last_output = now 1082 finally: 1083 logging.info("Resuming synchronization of disks for instance '%s'", 1084 instance.name) 1085 1086 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1087 (map(compat.snd, disks), 1088 instance), 1089 False) 1090 1091 if result.fail_msg: 1092 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s", 1093 node_name, result.fail_msg) 1094 else: 1095 for idx, success in enumerate(result.payload): 1096 if not success: 1097 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'" 1098 " failed", idx, instance.name)
1099
1100 1101 -def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1102 """Wrapper for L{WipeDisks} that handles errors. 1103 1104 @type lu: L{LogicalUnit} 1105 @param lu: the logical unit on whose behalf we execute 1106 @type instance: L{objects.Instance} 1107 @param instance: the instance whose disks we should wipe 1108 @param disks: see L{WipeDisks} 1109 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in 1110 case of error 1111 @raise errors.OpPrereqError: in case of failure 1112 1113 """ 1114 try: 1115 WipeDisks(lu, instance, disks=disks) 1116 except errors.OpExecError: 1117 logging.warning("Wiping disks for instance '%s' failed", 1118 instance.name) 1119 _UndoCreateDisks(lu, cleanup, instance) 1120 raise
1121
1122 1123 -def ExpandCheckDisks(instance, disks):
1124 """Return the instance disks selected by the disks list 1125 1126 @type disks: list of L{objects.Disk} or None 1127 @param disks: selected disks 1128 @rtype: list of L{objects.Disk} 1129 @return: selected instance disks to act on 1130 1131 """ 1132 if disks is None: 1133 return instance.disks 1134 else: 1135 if not set(disks).issubset(instance.disks): 1136 raise errors.ProgrammerError("Can only act on disks belonging to the" 1137 " target instance: expected a subset of %r," 1138 " got %r" % (instance.disks, disks)) 1139 return disks
1140
1141 1142 -def WaitForSync(lu, instance, disks=None, oneshot=False):
1143 """Sleep and poll for an instance's disk to sync. 1144 1145 """ 1146 if not instance.disks or disks is not None and not disks: 1147 return True 1148 1149 disks = ExpandCheckDisks(instance, disks) 1150 1151 if not oneshot: 1152 lu.LogInfo("Waiting for instance %s to sync disks", instance.name) 1153 1154 node_uuid = instance.primary_node 1155 node_name = lu.cfg.GetNodeName(node_uuid) 1156 1157 # TODO: Convert to utils.Retry 1158 1159 retries = 0 1160 degr_retries = 10 # in seconds, as we sleep 1 second each time 1161 while True: 1162 max_time = 0 1163 done = True 1164 cumul_degraded = False 1165 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance)) 1166 msg = rstats.fail_msg 1167 if msg: 1168 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg) 1169 retries += 1 1170 if retries >= 10: 1171 raise errors.RemoteError("Can't contact node %s for mirror data," 1172 " aborting." % node_name) 1173 time.sleep(6) 1174 continue 1175 rstats = rstats.payload 1176 retries = 0 1177 for i, mstat in enumerate(rstats): 1178 if mstat is None: 1179 lu.LogWarning("Can't compute data for node %s/%s", 1180 node_name, disks[i].iv_name) 1181 continue 1182 1183 cumul_degraded = (cumul_degraded or 1184 (mstat.is_degraded and mstat.sync_percent is None)) 1185 if mstat.sync_percent is not None: 1186 done = False 1187 if mstat.estimated_time is not None: 1188 rem_time = ("%s remaining (estimated)" % 1189 utils.FormatSeconds(mstat.estimated_time)) 1190 max_time = mstat.estimated_time 1191 else: 1192 rem_time = "no time estimate" 1193 max_time = 5 # sleep at least a bit between retries 1194 lu.LogInfo("- device %s: %5.2f%% done, %s", 1195 disks[i].iv_name, mstat.sync_percent, rem_time) 1196 1197 # if we're done but degraded, let's do a few small retries, to 1198 # make sure we see a stable and not transient situation; therefore 1199 # we force restart of the loop 1200 if (done or oneshot) and cumul_degraded and degr_retries > 0: 1201 logging.info("Degraded disks found, %d retries left", degr_retries) 1202 degr_retries -= 1 1203 time.sleep(1) 1204 continue 1205 1206 if done or oneshot: 1207 break 1208 1209 time.sleep(min(60, max_time)) 1210 1211 if done: 1212 lu.LogInfo("Instance %s's disks are in sync", instance.name) 1213 1214 return not cumul_degraded
1215
1216 1217 -def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1218 """Shutdown block devices of an instance. 1219 1220 This does the shutdown on all nodes of the instance. 1221 1222 If the ignore_primary is false, errors on the primary node are 1223 ignored. 1224 1225 """ 1226 all_result = True 1227 1228 if disks is None: 1229 # only mark instance disks as inactive if all disks are affected 1230 lu.cfg.MarkInstanceDisksInactive(instance.uuid) 1231 disks = ExpandCheckDisks(instance, disks) 1232 1233 for disk in disks: 1234 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node): 1235 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance)) 1236 msg = result.fail_msg 1237 if msg: 1238 lu.LogWarning("Could not shutdown block device %s on node %s: %s", 1239 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1240 if ((node_uuid == instance.primary_node and not ignore_primary) or 1241 (node_uuid != instance.primary_node and not result.offline)): 1242 all_result = False 1243 return all_result
1244
1245 1246 -def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1247 """Shutdown block devices of an instance. 1248 1249 This function checks if an instance is running, before calling 1250 _ShutdownInstanceDisks. 1251 1252 """ 1253 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks") 1254 ShutdownInstanceDisks(lu, instance, disks=disks)
1255
1256 1257 -def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, 1258 ignore_size=False):
1259 """Prepare the block devices for an instance. 1260 1261 This sets up the block devices on all nodes. 1262 1263 @type lu: L{LogicalUnit} 1264 @param lu: the logical unit on whose behalf we execute 1265 @type instance: L{objects.Instance} 1266 @param instance: the instance for whose disks we assemble 1267 @type disks: list of L{objects.Disk} or None 1268 @param disks: which disks to assemble (or all, if None) 1269 @type ignore_secondaries: boolean 1270 @param ignore_secondaries: if true, errors on secondary nodes 1271 won't result in an error return from the function 1272 @type ignore_size: boolean 1273 @param ignore_size: if true, the current known size of the disk 1274 will not be used during the disk activation, useful for cases 1275 when the size is wrong 1276 @return: False if the operation failed, otherwise a list of 1277 (host, instance_visible_name, node_visible_name) 1278 with the mapping from node devices to instance devices 1279 1280 """ 1281 device_info = [] 1282 disks_ok = True 1283 1284 if disks is None: 1285 # only mark instance disks as active if all disks are affected 1286 lu.cfg.MarkInstanceDisksActive(instance.uuid) 1287 1288 disks = ExpandCheckDisks(instance, disks) 1289 1290 # With the two passes mechanism we try to reduce the window of 1291 # opportunity for the race condition of switching DRBD to primary 1292 # before handshaking occured, but we do not eliminate it 1293 1294 # The proper fix would be to wait (with some limits) until the 1295 # connection has been made and drbd transitions from WFConnection 1296 # into any other network-connected state (Connected, SyncTarget, 1297 # SyncSource, etc.) 1298 1299 # 1st pass, assemble on all nodes in secondary mode 1300 for idx, inst_disk in enumerate(disks): 1301 for node_uuid, node_disk in inst_disk.ComputeNodeTree( 1302 instance.primary_node): 1303 if ignore_size: 1304 node_disk = node_disk.Copy() 1305 node_disk.UnsetSize() 1306 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance), 1307 instance, False, idx) 1308 msg = result.fail_msg 1309 if msg: 1310 is_offline_secondary = (node_uuid in instance.secondary_nodes and 1311 result.offline) 1312 lu.LogWarning("Could not prepare block device %s on node %s" 1313 " (is_primary=False, pass=1): %s", 1314 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1315 if not (ignore_secondaries or is_offline_secondary): 1316 disks_ok = False 1317 1318 # FIXME: race condition on drbd migration to primary 1319 1320 # 2nd pass, do only the primary node 1321 for idx, inst_disk in enumerate(disks): 1322 dev_path = None 1323 1324 for node_uuid, node_disk in inst_disk.ComputeNodeTree( 1325 instance.primary_node): 1326 if node_uuid != instance.primary_node: 1327 continue 1328 if ignore_size: 1329 node_disk = node_disk.Copy() 1330 node_disk.UnsetSize() 1331 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance), 1332 instance, True, idx) 1333 msg = result.fail_msg 1334 if msg: 1335 lu.LogWarning("Could not prepare block device %s on node %s" 1336 " (is_primary=True, pass=2): %s", 1337 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1338 disks_ok = False 1339 else: 1340 dev_path, _, __ = result.payload 1341 1342 device_info.append((lu.cfg.GetNodeName(instance.primary_node), 1343 inst_disk.iv_name, dev_path)) 1344 1345 if not disks_ok: 1346 lu.cfg.MarkInstanceDisksInactive(instance.uuid) 1347 1348 return disks_ok, device_info
1349
1350 1351 -def StartInstanceDisks(lu, instance, force):
1352 """Start the disks of an instance. 1353 1354 """ 1355 disks_ok, _ = AssembleInstanceDisks(lu, instance, 1356 ignore_secondaries=force) 1357 if not disks_ok: 1358 ShutdownInstanceDisks(lu, instance) 1359 if force is not None and not force: 1360 lu.LogWarning("", 1361 hint=("If the message above refers to a secondary node," 1362 " you can retry the operation using '--force'")) 1363 raise errors.OpExecError("Disk consistency error")
1364
1365 1366 -class LUInstanceGrowDisk(LogicalUnit):
1367 """Grow a disk of an instance. 1368 1369 """ 1370 HPATH = "disk-grow" 1371 HTYPE = constants.HTYPE_INSTANCE 1372 REQ_BGL = False 1373
1374 - def ExpandNames(self):
1375 self._ExpandAndLockInstance() 1376 self.needed_locks[locking.LEVEL_NODE] = [] 1377 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1378 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1379 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1380
1381 - def DeclareLocks(self, level):
1382 if level == locking.LEVEL_NODE: 1383 self._LockInstancesNodes() 1384 elif level == locking.LEVEL_NODE_RES: 1385 # Copy node locks 1386 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1387 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1388
1389 - def BuildHooksEnv(self):
1390 """Build hooks env. 1391 1392 This runs on the master, the primary and all the secondaries. 1393 1394 """ 1395 env = { 1396 "DISK": self.op.disk, 1397 "AMOUNT": self.op.amount, 1398 "ABSOLUTE": self.op.absolute, 1399 } 1400 env.update(BuildInstanceHookEnvByObject(self, self.instance)) 1401 return env
1402
1403 - def BuildHooksNodes(self):
1404 """Build hooks nodes. 1405 1406 """ 1407 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) 1408 return (nl, nl)
1409
1410 - def CheckPrereq(self):
1411 """Check prerequisites. 1412 1413 This checks that the instance is in the cluster. 1414 1415 """ 1416 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1417 assert self.instance is not None, \ 1418 "Cannot retrieve locked instance %s" % self.op.instance_name 1419 node_uuids = list(self.instance.all_nodes) 1420 for node_uuid in node_uuids: 1421 CheckNodeOnline(self, node_uuid) 1422 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids) 1423 1424 if self.instance.disk_template not in constants.DTS_GROWABLE: 1425 raise errors.OpPrereqError("Instance's disk layout does not support" 1426 " growing", errors.ECODE_INVAL) 1427 1428 self.disk = self.instance.FindDisk(self.op.disk) 1429 1430 if self.op.absolute: 1431 self.target = self.op.amount 1432 self.delta = self.target - self.disk.size 1433 if self.delta < 0: 1434 raise errors.OpPrereqError("Requested size (%s) is smaller than " 1435 "current disk size (%s)" % 1436 (utils.FormatUnit(self.target, "h"), 1437 utils.FormatUnit(self.disk.size, "h")), 1438 errors.ECODE_STATE) 1439 else: 1440 self.delta = self.op.amount 1441 self.target = self.disk.size + self.delta 1442 if self.delta < 0: 1443 raise errors.OpPrereqError("Requested increment (%s) is negative" % 1444 utils.FormatUnit(self.delta, "h"), 1445 errors.ECODE_INVAL) 1446 1447 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1448
1449 - def _CheckDiskSpace(self, node_uuids, req_vgspace):
1450 template = self.instance.disk_template 1451 if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and 1452 not any(self.node_es_flags.values())): 1453 # TODO: check the free disk space for file, when that feature will be 1454 # supported 1455 # With exclusive storage we need to do something smarter than just looking 1456 # at free space, which, in the end, is basically a dry run. So we rely on 1457 # the dry run performed in Exec() instead. 1458 CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1459
1460 - def Exec(self, feedback_fn):
1461 """Execute disk grow. 1462 1463 """ 1464 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) 1465 assert (self.owned_locks(locking.LEVEL_NODE) == 1466 self.owned_locks(locking.LEVEL_NODE_RES)) 1467 1468 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks 1469 1470 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk]) 1471 if not disks_ok: 1472 raise errors.OpExecError("Cannot activate block device to grow") 1473 1474 feedback_fn("Growing disk %s of instance '%s' by %s to %s" % 1475 (self.op.disk, self.instance.name, 1476 utils.FormatUnit(self.delta, "h"), 1477 utils.FormatUnit(self.target, "h"))) 1478 1479 # First run all grow ops in dry-run mode 1480 for node_uuid in self.instance.all_nodes: 1481 result = self.rpc.call_blockdev_grow(node_uuid, 1482 (self.disk, self.instance), 1483 self.delta, True, True, 1484 self.node_es_flags[node_uuid]) 1485 result.Raise("Dry-run grow request failed to node %s" % 1486 self.cfg.GetNodeName(node_uuid)) 1487 1488 if wipe_disks: 1489 # Get disk size from primary node for wiping 1490 result = self.rpc.call_blockdev_getdimensions( 1491 self.instance.primary_node, [([self.disk], self.instance)]) 1492 result.Raise("Failed to retrieve disk size from node '%s'" % 1493 self.instance.primary_node) 1494 1495 (disk_dimensions, ) = result.payload 1496 1497 if disk_dimensions is None: 1498 raise errors.OpExecError("Failed to retrieve disk size from primary" 1499 " node '%s'" % self.instance.primary_node) 1500 (disk_size_in_bytes, _) = disk_dimensions 1501 1502 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes) 1503 1504 assert old_disk_size >= self.disk.size, \ 1505 ("Retrieved disk size too small (got %s, should be at least %s)" % 1506 (old_disk_size, self.disk.size)) 1507 else: 1508 old_disk_size = None 1509 1510 # We know that (as far as we can test) operations across different 1511 # nodes will succeed, time to run it for real on the backing storage 1512 for node_uuid in self.instance.all_nodes: 1513 result = self.rpc.call_blockdev_grow(node_uuid, 1514 (self.disk, self.instance), 1515 self.delta, False, True, 1516 self.node_es_flags[node_uuid]) 1517 result.Raise("Grow request failed to node %s" % 1518 self.cfg.GetNodeName(node_uuid)) 1519 1520 # And now execute it for logical storage, on the primary node 1521 node_uuid = self.instance.primary_node 1522 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance), 1523 self.delta, False, False, 1524 self.node_es_flags[node_uuid]) 1525 result.Raise("Grow request failed to node %s" % 1526 self.cfg.GetNodeName(node_uuid)) 1527 1528 self.disk.RecordGrow(self.delta) 1529 self.cfg.Update(self.instance, feedback_fn) 1530 1531 # Changes have been recorded, release node lock 1532 ReleaseLocks(self, locking.LEVEL_NODE) 1533 1534 # Downgrade lock while waiting for sync 1535 self.glm.downgrade(locking.LEVEL_INSTANCE) 1536 1537 assert wipe_disks ^ (old_disk_size is None) 1538 1539 if wipe_disks: 1540 assert self.instance.disks[self.op.disk] == self.disk 1541 1542 # Wipe newly added disk space 1543 WipeDisks(self, self.instance, 1544 disks=[(self.op.disk, self.disk, old_disk_size)]) 1545 1546 if self.op.wait_for_sync: 1547 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk]) 1548 if disk_abort: 1549 self.LogWarning("Disk syncing has not returned a good status; check" 1550 " the instance") 1551 if not self.instance.disks_active: 1552 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk]) 1553 elif not self.instance.disks_active: 1554 self.LogWarning("Not shutting down the disk even if the instance is" 1555 " not supposed to be running because no wait for" 1556 " sync mode was requested") 1557 1558 assert self.owned_locks(locking.LEVEL_NODE_RES) 1559 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1560
1561 1562 -class LUInstanceReplaceDisks(LogicalUnit):
1563 """Replace the disks of an instance. 1564 1565 """ 1566 HPATH = "mirrors-replace" 1567 HTYPE = constants.HTYPE_INSTANCE 1568 REQ_BGL = False 1569
1570 - def CheckArguments(self):
1571 """Check arguments. 1572 1573 """ 1574 if self.op.mode == constants.REPLACE_DISK_CHG: 1575 if self.op.remote_node is None and self.op.iallocator is None: 1576 raise errors.OpPrereqError("When changing the secondary either an" 1577 " iallocator script must be used or the" 1578 " new node given", errors.ECODE_INVAL) 1579 else: 1580 CheckIAllocatorOrNode(self, "iallocator", "remote_node") 1581 1582 elif self.op.remote_node is not None or self.op.iallocator is not None: 1583 # Not replacing the secondary 1584 raise errors.OpPrereqError("The iallocator and new node options can" 1585 " only be used when changing the" 1586 " secondary node", errors.ECODE_INVAL)
1587
1588 - def ExpandNames(self):
1589 self._ExpandAndLockInstance() 1590 1591 assert locking.LEVEL_NODE not in self.needed_locks 1592 assert locking.LEVEL_NODE_RES not in self.needed_locks 1593 assert locking.LEVEL_NODEGROUP not in self.needed_locks 1594 1595 assert self.op.iallocator is None or self.op.remote_node is None, \ 1596 "Conflicting options" 1597 1598 if self.op.remote_node is not None: 1599 (self.op.remote_node_uuid, self.op.remote_node) = \ 1600 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 1601 self.op.remote_node) 1602 1603 # Warning: do not remove the locking of the new secondary here 1604 # unless DRBD8Dev.AddChildren is changed to work in parallel; 1605 # currently it doesn't since parallel invocations of 1606 # FindUnusedMinor will conflict 1607 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid] 1608 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 1609 else: 1610 self.needed_locks[locking.LEVEL_NODE] = [] 1611 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1612 1613 if self.op.iallocator is not None: 1614 # iallocator will select a new node in the same group 1615 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 1616 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 1617 1618 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1619 1620 self.replacer = TLReplaceDisks(self, self.op.instance_uuid, 1621 self.op.instance_name, self.op.mode, 1622 self.op.iallocator, self.op.remote_node_uuid, 1623 self.op.disks, self.op.early_release, 1624 self.op.ignore_ipolicy) 1625 1626 self.tasklets = [self.replacer]
1627
1628 - def DeclareLocks(self, level):
1629 if level == locking.LEVEL_NODEGROUP: 1630 assert self.op.remote_node_uuid is None 1631 assert self.op.iallocator is not None 1632 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 1633 1634 self.share_locks[locking.LEVEL_NODEGROUP] = 1 1635 # Lock all groups used by instance optimistically; this requires going 1636 # via the node before it's locked, requiring verification later on 1637 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1638 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid) 1639 1640 elif level == locking.LEVEL_NODE: 1641 if self.op.iallocator is not None: 1642 assert self.op.remote_node_uuid is None 1643 assert not self.needed_locks[locking.LEVEL_NODE] 1644 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) 1645 1646 # Lock member nodes of all locked groups 1647 self.needed_locks[locking.LEVEL_NODE] = \ 1648 [node_uuid 1649 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 1650 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members] 1651 else: 1652 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) 1653 1654 self._LockInstancesNodes() 1655 1656 elif level == locking.LEVEL_NODE_RES: 1657 # Reuse node locks 1658 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1659 self.needed_locks[locking.LEVEL_NODE]
1660
1661 - def BuildHooksEnv(self):
1662 """Build hooks env. 1663 1664 This runs on the master, the primary and all the secondaries. 1665 1666 """ 1667 instance = self.replacer.instance 1668 env = { 1669 "MODE": self.op.mode, 1670 "NEW_SECONDARY": self.op.remote_node, 1671 "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]), 1672 } 1673 env.update(BuildInstanceHookEnvByObject(self, instance)) 1674 return env
1675
1676 - def BuildHooksNodes(self):
1677 """Build hooks nodes. 1678 1679 """ 1680 instance = self.replacer.instance 1681 nl = [ 1682 self.cfg.GetMasterNode(), 1683 instance.primary_node, 1684 ] 1685 if self.op.remote_node_uuid is not None: 1686 nl.append(self.op.remote_node_uuid) 1687 return nl, nl
1688
1689 - def CheckPrereq(self):
1690 """Check prerequisites. 1691 1692 """ 1693 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or 1694 self.op.iallocator is None) 1695 1696 # Verify if node group locks are still correct 1697 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 1698 if owned_groups: 1699 CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups) 1700 1701 return LogicalUnit.CheckPrereq(self)
1702
1703 1704 -class LUInstanceActivateDisks(NoHooksLU):
1705 """Bring up an instance's disks. 1706 1707 """ 1708 REQ_BGL = False 1709
1710 - def ExpandNames(self):
1711 self._ExpandAndLockInstance() 1712 self.needed_locks[locking.LEVEL_NODE] = [] 1713 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1714
1715 - def DeclareLocks(self, level):
1716 if level == locking.LEVEL_NODE: 1717 self._LockInstancesNodes()
1718
1719 - def CheckPrereq(self):
1720 """Check prerequisites. 1721 1722 This checks that the instance is in the cluster. 1723 1724 """ 1725 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1726 assert self.instance is not None, \ 1727 "Cannot retrieve locked instance %s" % self.op.instance_name 1728 CheckNodeOnline(self, self.instance.primary_node)
1729
1730 - def Exec(self, feedback_fn):
1731 """Activate the disks. 1732 1733 """ 1734 disks_ok, disks_info = \ 1735 AssembleInstanceDisks(self, self.instance, 1736 ignore_size=self.op.ignore_size) 1737 if not disks_ok: 1738 raise errors.OpExecError("Cannot activate block devices") 1739 1740 if self.op.wait_for_sync: 1741 if not WaitForSync(self, self.instance): 1742 self.cfg.MarkInstanceDisksInactive(self.instance.uuid) 1743 raise errors.OpExecError("Some disks of the instance are degraded!") 1744 1745 return disks_info
1746
1747 1748 -class LUInstanceDeactivateDisks(NoHooksLU):
1749 """Shutdown an instance's disks. 1750 1751 """ 1752 REQ_BGL = False 1753
1754 - def ExpandNames(self):
1755 self._ExpandAndLockInstance() 1756 self.needed_locks[locking.LEVEL_NODE] = [] 1757 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1758
1759 - def DeclareLocks(self, level):
1760 if level == locking.LEVEL_NODE: 1761 self._LockInstancesNodes()
1762
1763 - def CheckPrereq(self):
1764 """Check prerequisites. 1765 1766 This checks that the instance is in the cluster. 1767 1768 """ 1769 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1770 assert self.instance is not None, \ 1771 "Cannot retrieve locked instance %s" % self.op.instance_name
1772
1773 - def Exec(self, feedback_fn):
1774 """Deactivate the disks 1775 1776 """ 1777 if self.op.force: 1778 ShutdownInstanceDisks(self, self.instance) 1779 else: 1780 _SafeShutdownInstanceDisks(self, self.instance)
1781
1782 1783 -def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary, 1784 ldisk=False):
1785 """Check that mirrors are not degraded. 1786 1787 @attention: The device has to be annotated already. 1788 1789 The ldisk parameter, if True, will change the test from the 1790 is_degraded attribute (which represents overall non-ok status for 1791 the device(s)) to the ldisk (representing the local storage status). 1792 1793 """ 1794 result = True 1795 1796 if on_primary or dev.AssembleOnSecondary(): 1797 rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance)) 1798 msg = rstats.fail_msg 1799 if msg: 1800 lu.LogWarning("Can't find disk on node %s: %s", 1801 lu.cfg.GetNodeName(node_uuid), msg) 1802 result = False 1803 elif not rstats.payload: 1804 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid)) 1805 result = False 1806 else: 1807 if ldisk: 1808 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY 1809 else: 1810 result = result and not rstats.payload.is_degraded 1811 1812 if dev.children: 1813 for child in dev.children: 1814 result = result and _CheckDiskConsistencyInner(lu, instance, child, 1815 node_uuid, on_primary) 1816 1817 return result
1818
1819 1820 -def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1821 """Wrapper around L{_CheckDiskConsistencyInner}. 1822 1823 """ 1824 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 1825 return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary, 1826 ldisk=ldisk)
1827
1828 1829 -def _BlockdevFind(lu, node_uuid, dev, instance):
1830 """Wrapper around call_blockdev_find to annotate diskparams. 1831 1832 @param lu: A reference to the lu object 1833 @param node_uuid: The node to call out 1834 @param dev: The device to find 1835 @param instance: The instance object the device belongs to 1836 @returns The result of the rpc call 1837 1838 """ 1839 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 1840 return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1841
1842 1843 -def _GenerateUniqueNames(lu, exts):
1844 """Generate a suitable LV name. 1845 1846 This will generate a logical volume name for the given instance. 1847 1848 """ 1849 results = [] 1850 for val in exts: 1851 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 1852 results.append("%s%s" % (new_id, val)) 1853 return results
1854
1855 1856 -class TLReplaceDisks(Tasklet):
1857 """Replaces disks for an instance. 1858 1859 Note: Locking is not within the scope of this class. 1860 1861 """
1862 - def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name, 1863 remote_node_uuid, disks, early_release, ignore_ipolicy):
1864 """Initializes this class. 1865 1866 """ 1867 Tasklet.__init__(self, lu) 1868 1869 # Parameters 1870 self.instance_uuid = instance_uuid 1871 self.instance_name = instance_name 1872 self.mode = mode 1873 self.iallocator_name = iallocator_name 1874 self.remote_node_uuid = remote_node_uuid 1875 self.disks = disks 1876 self.early_release = early_release 1877 self.ignore_ipolicy = ignore_ipolicy 1878 1879 # Runtime data 1880 self.instance = None 1881 self.new_node_uuid = None 1882 self.target_node_uuid = None 1883 self.other_node_uuid = None 1884 self.remote_node_info = None 1885 self.node_secondary_ip = None
1886 1887 @staticmethod
1888 - def _RunAllocator(lu, iallocator_name, instance_uuid, 1889 relocate_from_node_uuids):
1890 """Compute a new secondary node using an IAllocator. 1891 1892 """ 1893 req = iallocator.IAReqRelocate( 1894 inst_uuid=instance_uuid, 1895 relocate_from_node_uuids=list(relocate_from_node_uuids)) 1896 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req) 1897 1898 ial.Run(iallocator_name) 1899 1900 if not ial.success: 1901 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 1902 " %s" % (iallocator_name, ial.info), 1903 errors.ECODE_NORES) 1904 1905 remote_node_name = ial.result[0] 1906 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name) 1907 1908 if remote_node is None: 1909 raise errors.OpPrereqError("Node %s not found in configuration" % 1910 remote_node_name, errors.ECODE_NOENT) 1911 1912 lu.LogInfo("Selected new secondary for instance '%s': %s", 1913 instance_uuid, remote_node_name) 1914 1915 return remote_node.uuid
1916
1917 - def _FindFaultyDisks(self, node_uuid):
1918 """Wrapper for L{FindFaultyInstanceDisks}. 1919 1920 """ 1921 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, 1922 node_uuid, True)
1923
1924 - def _CheckDisksActivated(self, instance):
1925 """Checks if the instance disks are activated. 1926 1927 @param instance: The instance to check disks 1928 @return: True if they are activated, False otherwise 1929 1930 """ 1931 node_uuids = instance.all_nodes 1932 1933 for idx, dev in enumerate(instance.disks): 1934 for node_uuid in node_uuids: 1935 self.lu.LogInfo("Checking disk/%d on %s", idx, 1936 self.cfg.GetNodeName(node_uuid)) 1937 1938 result = _BlockdevFind(self, node_uuid, dev, instance) 1939 1940 if result.offline: 1941 continue 1942 elif result.fail_msg or not result.payload: 1943 return False 1944 1945 return True
1946
1947 - def CheckPrereq(self):
1948 """Check prerequisites. 1949 1950 This checks that the instance is in the cluster. 1951 1952 """ 1953 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 1954 assert self.instance is not None, \ 1955 "Cannot retrieve locked instance %s" % self.instance_name 1956 1957 if self.instance.disk_template != constants.DT_DRBD8: 1958 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" 1959 " instances", errors.ECODE_INVAL) 1960 1961 if len(self.instance.secondary_nodes) != 1: 1962 raise errors.OpPrereqError("The instance has a strange layout," 1963 " expected one secondary but found %d" % 1964 len(self.instance.secondary_nodes), 1965 errors.ECODE_FAULT) 1966 1967 secondary_node_uuid = self.instance.secondary_nodes[0] 1968 1969 if self.iallocator_name is None: 1970 remote_node_uuid = self.remote_node_uuid 1971 else: 1972 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name, 1973 self.instance.uuid, 1974 self.instance.secondary_nodes) 1975 1976 if remote_node_uuid is None: 1977 self.remote_node_info = None 1978 else: 1979 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \ 1980 "Remote node '%s' is not locked" % remote_node_uuid 1981 1982 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid) 1983 assert self.remote_node_info is not None, \ 1984 "Cannot retrieve locked node %s" % remote_node_uuid 1985 1986 if remote_node_uuid == self.instance.primary_node: 1987 raise errors.OpPrereqError("The specified node is the primary node of" 1988 " the instance", errors.ECODE_INVAL) 1989 1990 if remote_node_uuid == secondary_node_uuid: 1991 raise errors.OpPrereqError("The specified node is already the" 1992 " secondary node of the instance", 1993 errors.ECODE_INVAL) 1994 1995 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, 1996 constants.REPLACE_DISK_CHG): 1997 raise errors.OpPrereqError("Cannot specify disks to be replaced", 1998 errors.ECODE_INVAL) 1999 2000 if self.mode == constants.REPLACE_DISK_AUTO: 2001 if not self._CheckDisksActivated(self.instance): 2002 raise errors.OpPrereqError("Please run activate-disks on instance %s" 2003 " first" % self.instance_name, 2004 errors.ECODE_STATE) 2005 faulty_primary = self._FindFaultyDisks(self.instance.primary_node) 2006 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid) 2007 2008 if faulty_primary and faulty_secondary: 2009 raise errors.OpPrereqError("Instance %s has faulty disks on more than" 2010 " one node and can not be repaired" 2011 " automatically" % self.instance_name, 2012 errors.ECODE_STATE) 2013 2014 if faulty_primary: 2015 self.disks = faulty_primary 2016 self.target_node_uuid = self.instance.primary_node 2017 self.other_node_uuid = secondary_node_uuid 2018 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2019 elif faulty_secondary: 2020 self.disks = faulty_secondary 2021 self.target_node_uuid = secondary_node_uuid 2022 self.other_node_uuid = self.instance.primary_node 2023 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2024 else: 2025 self.disks = [] 2026 check_nodes = [] 2027 2028 else: 2029 # Non-automatic modes 2030 if self.mode == constants.REPLACE_DISK_PRI: 2031 self.target_node_uuid = self.instance.primary_node 2032 self.other_node_uuid = secondary_node_uuid 2033 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2034 2035 elif self.mode == constants.REPLACE_DISK_SEC: 2036 self.target_node_uuid = secondary_node_uuid 2037 self.other_node_uuid = self.instance.primary_node 2038 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2039 2040 elif self.mode == constants.REPLACE_DISK_CHG: 2041 self.new_node_uuid = remote_node_uuid 2042 self.other_node_uuid = self.instance.primary_node 2043 self.target_node_uuid = secondary_node_uuid 2044 check_nodes = [self.new_node_uuid, self.other_node_uuid] 2045 2046 CheckNodeNotDrained(self.lu, remote_node_uuid) 2047 CheckNodeVmCapable(self.lu, remote_node_uuid) 2048 2049 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid) 2050 assert old_node_info is not None 2051 if old_node_info.offline and not self.early_release: 2052 # doesn't make sense to delay the release 2053 self.early_release = True 2054 self.lu.LogInfo("Old secondary %s is offline, automatically enabling" 2055 " early-release mode", secondary_node_uuid) 2056 2057 else: 2058 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % 2059 self.mode) 2060 2061 # If not specified all disks should be replaced 2062 if not self.disks: 2063 self.disks = range(len(self.instance.disks)) 2064 2065 # TODO: This is ugly, but right now we can't distinguish between internal 2066 # submitted opcode and external one. We should fix that. 2067 if self.remote_node_info: 2068 # We change the node, lets verify it still meets instance policy 2069 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) 2070 cluster = self.cfg.GetClusterInfo() 2071 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 2072 new_group_info) 2073 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, 2074 self.remote_node_info, self.cfg, 2075 ignore=self.ignore_ipolicy) 2076 2077 for node_uuid in check_nodes: 2078 CheckNodeOnline(self.lu, node_uuid) 2079 2080 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid, 2081 self.other_node_uuid, 2082 self.target_node_uuid] 2083 if node_uuid is not None) 2084 2085 # Release unneeded node and node resource locks 2086 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) 2087 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) 2088 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) 2089 2090 # Release any owned node group 2091 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) 2092 2093 # Check whether disks are valid 2094 for disk_idx in self.disks: 2095 self.instance.FindDisk(disk_idx) 2096 2097 # Get secondary node IP addresses 2098 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node) 2099 in self.cfg.GetMultiNodeInfo(touched_nodes))
2100
2101 - def Exec(self, feedback_fn):
2102 """Execute disk replacement. 2103 2104 This dispatches the disk replacement to the appropriate handler. 2105 2106 """ 2107 if __debug__: 2108 # Verify owned locks before starting operation 2109 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) 2110 assert set(owned_nodes) == set(self.node_secondary_ip), \ 2111 ("Incorrect node locks, owning %s, expected %s" % 2112 (owned_nodes, self.node_secondary_ip.keys())) 2113 assert (self.lu.owned_locks(locking.LEVEL_NODE) == 2114 self.lu.owned_locks(locking.LEVEL_NODE_RES)) 2115 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) 2116 2117 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) 2118 assert list(owned_instances) == [self.instance_name], \ 2119 "Instance '%s' not locked" % self.instance_name 2120 2121 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ 2122 "Should not own any node group lock at this point" 2123 2124 if not self.disks: 2125 feedback_fn("No disks need replacement for instance '%s'" % 2126 self.instance.name) 2127 return 2128 2129 feedback_fn("Replacing disk(s) %s for instance '%s'" % 2130 (utils.CommaJoin(self.disks), self.instance.name)) 2131 feedback_fn("Current primary node: %s" % 2132 self.cfg.GetNodeName(self.instance.primary_node)) 2133 feedback_fn("Current secondary node: %s" % 2134 utils.CommaJoin(self.cfg.GetNodeNames( 2135 self.instance.secondary_nodes))) 2136 2137 activate_disks = not self.instance.disks_active 2138 2139 # Activate the instance disks if we're replacing them on a down instance 2140 if activate_disks: 2141 StartInstanceDisks(self.lu, self.instance, True) 2142 2143 try: 2144 # Should we replace the secondary node? 2145 if self.new_node_uuid is not None: 2146 fn = self._ExecDrbd8Secondary 2147 else: 2148 fn = self._ExecDrbd8DiskOnly 2149 2150 result = fn(feedback_fn) 2151 finally: 2152 # Deactivate the instance disks if we're replacing them on a 2153 # down instance 2154 if activate_disks: 2155 _SafeShutdownInstanceDisks(self.lu, self.instance) 2156 2157 assert not self.lu.owned_locks(locking.LEVEL_NODE) 2158 2159 if __debug__: 2160 # Verify owned locks 2161 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES) 2162 nodes = frozenset(self.node_secondary_ip) 2163 assert ((self.early_release and not owned_nodes) or 2164 (not self.early_release and not (set(owned_nodes) - nodes))), \ 2165 ("Not owning the correct locks, early_release=%s, owned=%r," 2166 " nodes=%r" % (self.early_release, owned_nodes, nodes)) 2167 2168 return result
2169
2170 - def _CheckVolumeGroup(self, node_uuids):
2171 self.lu.LogInfo("Checking volume groups") 2172 2173 vgname = self.cfg.GetVGName() 2174 2175 # Make sure volume group exists on all involved nodes 2176 results = self.rpc.call_vg_list(node_uuids) 2177 if not results: 2178 raise errors.OpExecError("Can't list volume groups on the nodes") 2179 2180 for node_uuid in node_uuids: 2181 res = results[node_uuid] 2182 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid)) 2183 if vgname not in res.payload: 2184 raise errors.OpExecError("Volume group '%s' not found on node %s" % 2185 (vgname, self.cfg.GetNodeName(node_uuid)))
2186
2187 - def _CheckDisksExistence(self, node_uuids):
2188 # Check disk existence 2189 for idx, dev in enumerate(self.instance.disks): 2190 if idx not in self.disks: 2191 continue 2192 2193 for node_uuid in node_uuids: 2194 self.lu.LogInfo("Checking disk/%d on %s", idx, 2195 self.cfg.GetNodeName(node_uuid)) 2196 2197 result = _BlockdevFind(self, node_uuid, dev, self.instance) 2198 2199 msg = result.fail_msg 2200 if msg or not result.payload: 2201 if not msg: 2202 msg = "disk not found" 2203 if not self._CheckDisksActivated(self.instance): 2204 extra_hint = ("\nDisks seem to be not properly activated. Try" 2205 " running activate-disks on the instance before" 2206 " using replace-disks.") 2207 else: 2208 extra_hint = "" 2209 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" % 2210 (idx, self.cfg.GetNodeName(node_uuid), msg, 2211 extra_hint))
2212
2213 - def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2214 for idx, dev in enumerate(self.instance.disks): 2215 if idx not in self.disks: 2216 continue 2217 2218 self.lu.LogInfo("Checking disk/%d consistency on node %s" % 2219 (idx, self.cfg.GetNodeName(node_uuid))) 2220 2221 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid, 2222 on_primary, ldisk=ldisk): 2223 raise errors.OpExecError("Node %s has degraded storage, unsafe to" 2224 " replace disks for instance %s" % 2225 (self.cfg.GetNodeName(node_uuid), 2226 self.instance.name))
2227
2228 - def _CreateNewStorage(self, node_uuid):
2229 """Create new storage on the primary or secondary node. 2230 2231 This is only used for same-node replaces, not for changing the 2232 secondary node, hence we don't want to modify the existing disk. 2233 2234 """ 2235 iv_names = {} 2236 2237 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) 2238 for idx, dev in enumerate(disks): 2239 if idx not in self.disks: 2240 continue 2241 2242 self.lu.LogInfo("Adding storage on %s for disk/%d", 2243 self.cfg.GetNodeName(node_uuid), idx) 2244 2245 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] 2246 names = _GenerateUniqueNames(self.lu, lv_names) 2247 2248 (data_disk, meta_disk) = dev.children 2249 vg_data = data_disk.logical_id[0] 2250 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size, 2251 logical_id=(vg_data, names[0]), 2252 params=data_disk.params) 2253 vg_meta = meta_disk.logical_id[0] 2254 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN, 2255 size=constants.DRBD_META_SIZE, 2256 logical_id=(vg_meta, names[1]), 2257 params=meta_disk.params) 2258 2259 new_lvs = [lv_data, lv_meta] 2260 old_lvs = [child.Copy() for child in dev.children] 2261 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) 2262 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid) 2263 2264 # we pass force_create=True to force the LVM creation 2265 for new_lv in new_lvs: 2266 try: 2267 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True, 2268 GetInstanceInfoText(self.instance), False, 2269 excl_stor) 2270 except errors.DeviceCreationError, e: 2271 raise errors.OpExecError("Can't create block device: %s" % e.message) 2272 2273 return iv_names
2274
2275 - def _CheckDevices(self, node_uuid, iv_names):
2276 for name, (dev, _, _) in iv_names.iteritems(): 2277 result = _BlockdevFind(self, node_uuid, dev, self.instance) 2278 2279 msg = result.fail_msg 2280 if msg or not result.payload: 2281 if not msg: 2282 msg = "disk not found" 2283 raise errors.OpExecError("Can't find DRBD device %s: %s" % 2284 (name, msg)) 2285 2286 if result.payload.is_degraded: 2287 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2288
2289 - def _RemoveOldStorage(self, node_uuid, iv_names):
2290 for name, (_, old_lvs, _) in iv_names.iteritems(): 2291 self.lu.LogInfo("Remove logical volumes for %s", name) 2292 2293 for lv in old_lvs: 2294 msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \ 2295 .fail_msg 2296 if msg: 2297 self.lu.LogWarning("Can't remove old LV: %s", msg, 2298 hint="remove unused LVs manually")
2299
2300 - def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2301 """Replace a disk on the primary or secondary for DRBD 8. 2302 2303 The algorithm for replace is quite complicated: 2304 2305 1. for each disk to be replaced: 2306 2307 1. create new LVs on the target node with unique names 2308 1. detach old LVs from the drbd device 2309 1. rename old LVs to name_replaced.<time_t> 2310 1. rename new LVs to old LVs 2311 1. attach the new LVs (with the old names now) to the drbd device 2312 2313 1. wait for sync across all devices 2314 2315 1. for each modified disk: 2316 2317 1. remove old LVs (which have the name name_replaces.<time_t>) 2318 2319 Failures are not very well handled. 2320 2321 """ 2322 steps_total = 6 2323 2324 # Step: check device activation 2325 self.lu.LogStep(1, steps_total, "Check device existence") 2326 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid]) 2327 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid]) 2328 2329 # Step: check other node consistency 2330 self.lu.LogStep(2, steps_total, "Check peer consistency") 2331 self._CheckDisksConsistency( 2332 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node, 2333 False) 2334 2335 # Step: create new storage 2336 self.lu.LogStep(3, steps_total, "Allocate new storage") 2337 iv_names = self._CreateNewStorage(self.target_node_uuid) 2338 2339 # Step: for each lv, detach+rename*2+attach 2340 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2341 for dev, old_lvs, new_lvs in iv_names.itervalues(): 2342 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name) 2343 2344 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, 2345 (dev, self.instance), 2346 (old_lvs, self.instance)) 2347 result.Raise("Can't detach drbd from local storage on node" 2348 " %s for device %s" % 2349 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name)) 2350 #dev.children = [] 2351 #cfg.Update(instance) 2352 2353 # ok, we created the new LVs, so now we know we have the needed 2354 # storage; as such, we proceed on the target node to rename 2355 # old_lv to _old, and new_lv to old_lv; note that we rename LVs 2356 # using the assumption that logical_id == unique_id on that node 2357 2358 # FIXME(iustin): use a better name for the replaced LVs 2359 temp_suffix = int(time.time()) 2360 ren_fn = lambda d, suff: (d.logical_id[0], 2361 d.logical_id[1] + "_replaced-%s" % suff) 2362 2363 # Build the rename list based on what LVs exist on the node 2364 rename_old_to_new = [] 2365 for to_ren in old_lvs: 2366 result = self.rpc.call_blockdev_find(self.target_node_uuid, 2367 (to_ren, self.instance)) 2368 if not result.fail_msg and result.payload: 2369 # device exists 2370 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) 2371 2372 self.lu.LogInfo("Renaming the old LVs on the target node") 2373 result = self.rpc.call_blockdev_rename(self.target_node_uuid, 2374 rename_old_to_new) 2375 result.Raise("Can't rename old LVs on node %s" % 2376 self.cfg.GetNodeName(self.target_node_uuid)) 2377 2378 # Now we rename the new LVs to the old LVs 2379 self.lu.LogInfo("Renaming the new LVs on the target node") 2380 rename_new_to_old = [(new, old.logical_id) 2381 for old, new in zip(old_lvs, new_lvs)] 2382 result = self.rpc.call_blockdev_rename(self.target_node_uuid, 2383 rename_new_to_old) 2384 result.Raise("Can't rename new LVs on node %s" % 2385 self.cfg.GetNodeName(self.target_node_uuid)) 2386 2387 # Intermediate steps of in memory modifications 2388 for old, new in zip(old_lvs, new_lvs): 2389 new.logical_id = old.logical_id 2390 2391 # We need to modify old_lvs so that removal later removes the 2392 # right LVs, not the newly added ones; note that old_lvs is a 2393 # copy here 2394 for disk in old_lvs: 2395 disk.logical_id = ren_fn(disk, temp_suffix) 2396 2397 # Now that the new lvs have the old name, we can add them to the device 2398 self.lu.LogInfo("Adding new mirror component on %s", 2399 self.cfg.GetNodeName(self.target_node_uuid)) 2400 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid, 2401 (dev, self.instance), 2402 (new_lvs, self.instance)) 2403 msg = result.fail_msg 2404 if msg: 2405 for new_lv in new_lvs: 2406 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid, 2407 (new_lv, self.instance)).fail_msg 2408 if msg2: 2409 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, 2410 hint=("cleanup manually the unused logical" 2411 "volumes")) 2412 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) 2413 2414 cstep = itertools.count(5) 2415 2416 if self.early_release: 2417 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2418 self._RemoveOldStorage(self.target_node_uuid, iv_names) 2419 # TODO: Check if releasing locks early still makes sense 2420 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2421 else: 2422 # Release all resource locks except those used by the instance 2423 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2424 keep=self.node_secondary_ip.keys()) 2425 2426 # Release all node locks while waiting for sync 2427 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2428 2429 # TODO: Can the instance lock be downgraded here? Take the optional disk 2430 # shutdown in the caller into consideration. 2431 2432 # Wait for sync 2433 # This can fail as the old devices are degraded and _WaitForSync 2434 # does a combined result over all disks, so we don't check its return value 2435 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2436 WaitForSync(self.lu, self.instance) 2437 2438 # Check all devices manually 2439 self._CheckDevices(self.instance.primary_node, iv_names) 2440 2441 # Step: remove old storage 2442 if not self.early_release: 2443 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2444 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2445
2446 - def _ExecDrbd8Secondary(self, feedback_fn):
2447 """Replace the secondary node for DRBD 8. 2448 2449 The algorithm for replace is quite complicated: 2450 - for all disks of the instance: 2451 - create new LVs on the new node with same names 2452 - shutdown the drbd device on the old secondary 2453 - disconnect the drbd network on the primary 2454 - create the drbd device on the new secondary 2455 - network attach the drbd on the primary, using an artifice: 2456 the drbd code for Attach() will connect to the network if it 2457 finds a device which is connected to the good local disks but 2458 not network enabled 2459 - wait for sync across all devices 2460 - remove all disks from the old secondary 2461 2462 Failures are not very well handled. 2463 2464 """ 2465 steps_total = 6 2466 2467 pnode = self.instance.primary_node 2468 2469 # Step: check device activation 2470 self.lu.LogStep(1, steps_total, "Check device existence") 2471 self._CheckDisksExistence([self.instance.primary_node]) 2472 self._CheckVolumeGroup([self.instance.primary_node]) 2473 2474 # Step: check other node consistency 2475 self.lu.LogStep(2, steps_total, "Check peer consistency") 2476 self._CheckDisksConsistency(self.instance.primary_node, True, True) 2477 2478 # Step: create new storage 2479 self.lu.LogStep(3, steps_total, "Allocate new storage") 2480 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) 2481 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, 2482 self.new_node_uuid) 2483 for idx, dev in enumerate(disks): 2484 self.lu.LogInfo("Adding new local storage on %s for disk/%d" % 2485 (self.cfg.GetNodeName(self.new_node_uuid), idx)) 2486 # we pass force_create=True to force LVM creation 2487 for new_lv in dev.children: 2488 try: 2489 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance, 2490 new_lv, True, GetInstanceInfoText(self.instance), 2491 False, excl_stor) 2492 except errors.DeviceCreationError, e: 2493 raise errors.OpExecError("Can't create block device: %s" % e.message) 2494 2495 # Step 4: dbrd minors and drbd setups changes 2496 # after this, we must manually remove the drbd minors on both the 2497 # error and the success paths 2498 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2499 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid 2500 for _ in self.instance.disks], 2501 self.instance.uuid) 2502 logging.debug("Allocated minors %r", minors) 2503 2504 iv_names = {} 2505 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)): 2506 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % 2507 (self.cfg.GetNodeName(self.new_node_uuid), idx)) 2508 # create new devices on new_node; note that we create two IDs: 2509 # one without port, so the drbd will be activated without 2510 # networking information on the new node at this stage, and one 2511 # with network, for the latter activation in step 4 2512 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id 2513 if self.instance.primary_node == o_node1: 2514 p_minor = o_minor1 2515 else: 2516 assert self.instance.primary_node == o_node2, "Three-node instance?" 2517 p_minor = o_minor2 2518 2519 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None, 2520 p_minor, new_minor, o_secret) 2521 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port, 2522 p_minor, new_minor, o_secret) 2523 2524 iv_names[idx] = (dev, dev.children, new_net_id) 2525 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, 2526 new_net_id) 2527 new_drbd = objects.Disk(dev_type=constants.DT_DRBD8, 2528 logical_id=new_alone_id, 2529 children=dev.children, 2530 size=dev.size, 2531 params={}) 2532 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd], 2533 self.cfg) 2534 try: 2535 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance, 2536 anno_new_drbd, 2537 GetInstanceInfoText(self.instance), False, 2538 excl_stor) 2539 except errors.GenericError: 2540 self.cfg.ReleaseDRBDMinors(self.instance.uuid) 2541 raise 2542 2543 # We have new devices, shutdown the drbd on the old secondary 2544 for idx, dev in enumerate(self.instance.disks): 2545 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx) 2546 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid, 2547 (dev, self.instance)).fail_msg 2548 if msg: 2549 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" 2550 "node: %s" % (idx, msg), 2551 hint=("Please cleanup this device manually as" 2552 " soon as possible")) 2553 2554 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") 2555 result = self.rpc.call_drbd_disconnect_net( 2556 [pnode], (self.instance.disks, self.instance))[pnode] 2557 2558 msg = result.fail_msg 2559 if msg: 2560 # detaches didn't succeed (unlikely) 2561 self.cfg.ReleaseDRBDMinors(self.instance.uuid) 2562 raise errors.OpExecError("Can't detach the disks from the network on" 2563 " old node: %s" % (msg,)) 2564 2565 # if we managed to detach at least one, we update all the disks of 2566 # the instance to point to the new secondary 2567 self.lu.LogInfo("Updating instance configuration") 2568 for dev, _, new_logical_id in iv_names.itervalues(): 2569 dev.logical_id = new_logical_id 2570 2571 self.cfg.Update(self.instance, feedback_fn) 2572 2573 # Release all node locks (the configuration has been updated) 2574 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2575 2576 # and now perform the drbd attach 2577 self.lu.LogInfo("Attaching primary drbds to new secondary" 2578 " (standalone => connected)") 2579 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, 2580 self.new_node_uuid], 2581 (self.instance.disks, self.instance), 2582 self.instance.name, 2583 False) 2584 for to_node, to_result in result.items(): 2585 msg = to_result.fail_msg 2586 if msg: 2587 raise errors.OpExecError( 2588 "Can't attach drbd disks on node %s: %s (please do a gnt-instance " 2589 "info %s to see the status of disks)" % 2590 (self.cfg.GetNodeName(to_node), msg, self.instance.name)) 2591 2592 cstep = itertools.count(5) 2593 2594 if self.early_release: 2595 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2596 self._RemoveOldStorage(self.target_node_uuid, iv_names) 2597 # TODO: Check if releasing locks early still makes sense 2598 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2599 else: 2600 # Release all resource locks except those used by the instance 2601 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2602 keep=self.node_secondary_ip.keys()) 2603 2604 # TODO: Can the instance lock be downgraded here? Take the optional disk 2605 # shutdown in the caller into consideration. 2606 2607 # Wait for sync 2608 # This can fail as the old devices are degraded and _WaitForSync 2609 # does a combined result over all disks, so we don't check its return value 2610 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2611 WaitForSync(self.lu, self.instance) 2612 2613 # Check all devices manually 2614 self._CheckDevices(self.instance.primary_node, iv_names) 2615 2616 # Step: remove old storage 2617 if not self.early_release: 2618 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2619 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2620