Package ganeti :: Package cmdlib :: Module instance_storage
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_storage

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with storage of instances.""" 
  32   
  33  import itertools 
  34  import logging 
  35  import os 
  36  import time 
  37   
  38  from ganeti import compat 
  39  from ganeti import constants 
  40  from ganeti import errors 
  41  from ganeti import ht 
  42  from ganeti import locking 
  43  from ganeti.masterd import iallocator 
  44  from ganeti import objects 
  45  from ganeti import utils 
  46  import ganeti.rpc.node as rpc 
  47  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet 
  48  from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \ 
  49    AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \ 
  50    CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \ 
  51    IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \ 
  52    CheckDiskTemplateEnabled 
  53  from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \ 
  54    CopyLockList, ReleaseLocks, CheckNodeVmCapable, \ 
  55    BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy 
  56   
  57  import ganeti.masterd.instance 
  58   
  59   
  60  _DISK_TEMPLATE_NAME_PREFIX = { 
  61    constants.DT_PLAIN: "", 
  62    constants.DT_RBD: ".rbd", 
  63    constants.DT_EXT: ".ext", 
  64    constants.DT_FILE: ".file", 
  65    constants.DT_SHARED_FILE: ".sharedfile", 
  66    } 
67 68 69 -def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open, 70 excl_stor):
71 """Create a single block device on a given node. 72 73 This will not recurse over children of the device, so they must be 74 created in advance. 75 76 @param lu: the lu on whose behalf we execute 77 @param node_uuid: the node on which to create the device 78 @type instance: L{objects.Instance} 79 @param instance: the instance which owns the device 80 @type device: L{objects.Disk} 81 @param device: the device to create 82 @param info: the extra 'metadata' we should attach to the device 83 (this will be represented as a LVM tag) 84 @type force_open: boolean 85 @param force_open: this parameter will be passes to the 86 L{backend.BlockdevCreate} function where it specifies 87 whether we run on primary or not, and it affects both 88 the child assembly and the device own Open() execution 89 @type excl_stor: boolean 90 @param excl_stor: Whether exclusive_storage is active for the node 91 92 """ 93 result = lu.rpc.call_blockdev_create(node_uuid, (device, instance), 94 device.size, instance.name, force_open, 95 info, excl_stor) 96 result.Raise("Can't create block device %s on" 97 " node %s for instance %s" % (device, 98 lu.cfg.GetNodeName(node_uuid), 99 instance.name))
100
101 102 -def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create, 103 info, force_open, excl_stor):
104 """Create a tree of block devices on a given node. 105 106 If this device type has to be created on secondaries, create it and 107 all its children. 108 109 If not, just recurse to children keeping the same 'force' value. 110 111 @attention: The device has to be annotated already. 112 113 @param lu: the lu on whose behalf we execute 114 @param node_uuid: the node on which to create the device 115 @type instance: L{objects.Instance} 116 @param instance: the instance which owns the device 117 @type device: L{objects.Disk} 118 @param device: the device to create 119 @type force_create: boolean 120 @param force_create: whether to force creation of this device; this 121 will be change to True whenever we find a device which has 122 CreateOnSecondary() attribute 123 @param info: the extra 'metadata' we should attach to the device 124 (this will be represented as a LVM tag) 125 @type force_open: boolean 126 @param force_open: this parameter will be passes to the 127 L{backend.BlockdevCreate} function where it specifies 128 whether we run on primary or not, and it affects both 129 the child assembly and the device own Open() execution 130 @type excl_stor: boolean 131 @param excl_stor: Whether exclusive_storage is active for the node 132 133 @return: list of created devices 134 """ 135 created_devices = [] 136 try: 137 if device.CreateOnSecondary(): 138 force_create = True 139 140 if device.children: 141 for child in device.children: 142 devs = _CreateBlockDevInner(lu, node_uuid, instance, child, 143 force_create, info, force_open, excl_stor) 144 created_devices.extend(devs) 145 146 if not force_create: 147 return created_devices 148 149 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open, 150 excl_stor) 151 # The device has been completely created, so there is no point in keeping 152 # its subdevices in the list. We just add the device itself instead. 153 created_devices = [(node_uuid, device)] 154 return created_devices 155 156 except errors.DeviceCreationError, e: 157 e.created_devices.extend(created_devices) 158 raise e 159 except errors.OpExecError, e: 160 raise errors.DeviceCreationError(str(e), created_devices)
161
162 163 -def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
164 """Whether exclusive_storage is in effect for the given node. 165 166 @type cfg: L{config.ConfigWriter} 167 @param cfg: The cluster configuration 168 @type node_uuid: string 169 @param node_uuid: The node UUID 170 @rtype: bool 171 @return: The effective value of exclusive_storage 172 @raise errors.OpPrereqError: if no node exists with the given name 173 174 """ 175 ni = cfg.GetNodeInfo(node_uuid) 176 if ni is None: 177 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid, 178 errors.ECODE_NOENT) 179 return IsExclusiveStorageEnabledNode(cfg, ni)
180
181 182 -def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info, 183 force_open):
184 """Wrapper around L{_CreateBlockDevInner}. 185 186 This method annotates the root device first. 187 188 """ 189 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg) 190 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid) 191 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info, 192 force_open, excl_stor)
193
194 195 -def _UndoCreateDisks(lu, disks_created, instance):
196 """Undo the work performed by L{CreateDisks}. 197 198 This function is called in case of an error to undo the work of 199 L{CreateDisks}. 200 201 @type lu: L{LogicalUnit} 202 @param lu: the logical unit on whose behalf we execute 203 @param disks_created: the result returned by L{CreateDisks} 204 @type instance: L{objects.Instance} 205 @param instance: the instance for which disks were created 206 207 """ 208 for (node_uuid, disk) in disks_created: 209 result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance)) 210 result.Warn("Failed to remove newly-created disk %s on node %s" % 211 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
212
213 214 -def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
215 """Create all disks for an instance. 216 217 This abstracts away some work from AddInstance. 218 219 @type lu: L{LogicalUnit} 220 @param lu: the logical unit on whose behalf we execute 221 @type instance: L{objects.Instance} 222 @param instance: the instance whose disks we should create 223 @type to_skip: list 224 @param to_skip: list of indices to skip 225 @type target_node_uuid: string 226 @param target_node_uuid: if passed, overrides the target node for creation 227 @type disks: list of {objects.Disk} 228 @param disks: the disks to create; if not specified, all the disks of the 229 instance are created 230 @return: information about the created disks, to be used to call 231 L{_UndoCreateDisks} 232 @raise errors.OpPrereqError: in case of error 233 234 """ 235 info = GetInstanceInfoText(instance) 236 if target_node_uuid is None: 237 pnode_uuid = instance.primary_node 238 all_node_uuids = instance.all_nodes 239 else: 240 pnode_uuid = target_node_uuid 241 all_node_uuids = [pnode_uuid] 242 243 if disks is None: 244 disks = instance.disks 245 246 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template) 247 248 if instance.disk_template in constants.DTS_FILEBASED: 249 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) 250 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir) 251 252 result.Raise("Failed to create directory '%s' on" 253 " node %s" % (file_storage_dir, 254 lu.cfg.GetNodeName(pnode_uuid))) 255 256 disks_created = [] 257 for idx, device in enumerate(disks): 258 if to_skip and idx in to_skip: 259 continue 260 logging.info("Creating disk %s for instance '%s'", idx, instance.name) 261 for node_uuid in all_node_uuids: 262 f_create = node_uuid == pnode_uuid 263 try: 264 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info, 265 f_create) 266 disks_created.append((node_uuid, device)) 267 except errors.DeviceCreationError, e: 268 logging.warning("Creating disk %s for instance '%s' failed", 269 idx, instance.name) 270 disks_created.extend(e.created_devices) 271 _UndoCreateDisks(lu, disks_created, instance) 272 raise errors.OpExecError(e.message) 273 return disks_created
274
275 276 -def ComputeDiskSizePerVG(disk_template, disks):
277 """Compute disk size requirements in the volume group 278 279 """ 280 def _compute(disks, payload): 281 """Universal algorithm. 282 283 """ 284 vgs = {} 285 for disk in disks: 286 vgs[disk[constants.IDISK_VG]] = \ 287 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload 288 289 return vgs
290 291 # Required free disk space as a function of disk and swap space 292 req_size_dict = { 293 constants.DT_DISKLESS: {}, 294 constants.DT_PLAIN: _compute(disks, 0), 295 # 128 MB are added for drbd metadata for each disk 296 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE), 297 constants.DT_FILE: {}, 298 constants.DT_SHARED_FILE: {}, 299 constants.DT_GLUSTER: {}, 300 } 301 302 if disk_template not in req_size_dict: 303 raise errors.ProgrammerError("Disk template '%s' size requirement" 304 " is unknown" % disk_template) 305 306 return req_size_dict[disk_template] 307
308 309 -def ComputeDisks(op, default_vg):
310 """Computes the instance disks. 311 312 @param op: The instance opcode 313 @param default_vg: The default_vg to assume 314 315 @return: The computed disks 316 317 """ 318 disks = [] 319 for disk in op.disks: 320 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR) 321 if mode not in constants.DISK_ACCESS_SET: 322 raise errors.OpPrereqError("Invalid disk access mode '%s'" % 323 mode, errors.ECODE_INVAL) 324 size = disk.get(constants.IDISK_SIZE, None) 325 if size is None: 326 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) 327 try: 328 size = int(size) 329 except (TypeError, ValueError): 330 raise errors.OpPrereqError("Invalid disk size '%s'" % size, 331 errors.ECODE_INVAL) 332 333 ext_provider = disk.get(constants.IDISK_PROVIDER, None) 334 if ext_provider and op.disk_template != constants.DT_EXT: 335 raise errors.OpPrereqError("The '%s' option is only valid for the %s" 336 " disk template, not %s" % 337 (constants.IDISK_PROVIDER, constants.DT_EXT, 338 op.disk_template), errors.ECODE_INVAL) 339 340 data_vg = disk.get(constants.IDISK_VG, default_vg) 341 name = disk.get(constants.IDISK_NAME, None) 342 if name is not None and name.lower() == constants.VALUE_NONE: 343 name = None 344 new_disk = { 345 constants.IDISK_SIZE: size, 346 constants.IDISK_MODE: mode, 347 constants.IDISK_VG: data_vg, 348 constants.IDISK_NAME: name, 349 } 350 351 for key in [ 352 constants.IDISK_METAVG, 353 constants.IDISK_ADOPT, 354 constants.IDISK_SPINDLES, 355 ]: 356 if key in disk: 357 new_disk[key] = disk[key] 358 359 # For extstorage, demand the `provider' option and add any 360 # additional parameters (ext-params) to the dict 361 if op.disk_template == constants.DT_EXT: 362 if ext_provider: 363 new_disk[constants.IDISK_PROVIDER] = ext_provider 364 for key in disk: 365 if key not in constants.IDISK_PARAMS: 366 new_disk[key] = disk[key] 367 else: 368 raise errors.OpPrereqError("Missing provider for template '%s'" % 369 constants.DT_EXT, errors.ECODE_INVAL) 370 371 disks.append(new_disk) 372 373 return disks
374
375 376 -def CheckRADOSFreeSpace():
377 """Compute disk size requirements inside the RADOS cluster. 378 379 """ 380 # For the RADOS cluster we assume there is always enough space. 381 pass
382
383 384 -def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names, 385 iv_name, p_minor, s_minor):
386 """Generate a drbd8 device complete with its children. 387 388 """ 389 assert len(vgnames) == len(names) == 2 390 port = lu.cfg.AllocatePort() 391 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) 392 393 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size, 394 logical_id=(vgnames[0], names[0]), 395 params={}) 396 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 397 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN, 398 size=constants.DRBD_META_SIZE, 399 logical_id=(vgnames[1], names[1]), 400 params={}) 401 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 402 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size, 403 logical_id=(primary_uuid, secondary_uuid, port, 404 p_minor, s_minor, 405 shared_secret), 406 children=[dev_data, dev_meta], 407 iv_name=iv_name, params={}) 408 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 409 return drbd_dev
410
411 412 -def GenerateDiskTemplate( 413 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids, 414 disk_info, file_storage_dir, file_driver, base_index, 415 feedback_fn, full_disk_params):
416 """Generate the entire disk layout for a given template type. 417 418 """ 419 vgname = lu.cfg.GetVGName() 420 disk_count = len(disk_info) 421 disks = [] 422 423 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name) 424 425 if template_name == constants.DT_DISKLESS: 426 pass 427 elif template_name == constants.DT_DRBD8: 428 if len(secondary_node_uuids) != 1: 429 raise errors.ProgrammerError("Wrong template configuration") 430 remote_node_uuid = secondary_node_uuids[0] 431 minors = lu.cfg.AllocateDRBDMinor( 432 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid) 433 434 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name, 435 full_disk_params) 436 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] 437 438 names = [] 439 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) 440 for i in range(disk_count)]): 441 names.append(lv_prefix + "_data") 442 names.append(lv_prefix + "_meta") 443 for idx, disk in enumerate(disk_info): 444 disk_index = idx + base_index 445 data_vg = disk.get(constants.IDISK_VG, vgname) 446 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg) 447 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid, 448 disk[constants.IDISK_SIZE], 449 [data_vg, meta_vg], 450 names[idx * 2:idx * 2 + 2], 451 "disk/%d" % disk_index, 452 minors[idx * 2], minors[idx * 2 + 1]) 453 disk_dev.mode = disk[constants.IDISK_MODE] 454 disk_dev.name = disk.get(constants.IDISK_NAME, None) 455 disks.append(disk_dev) 456 else: 457 if secondary_node_uuids: 458 raise errors.ProgrammerError("Wrong template configuration") 459 460 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None) 461 if name_prefix is None: 462 names = None 463 else: 464 names = _GenerateUniqueNames(lu, ["%s.disk%s" % 465 (name_prefix, base_index + i) 466 for i in range(disk_count)]) 467 468 if template_name == constants.DT_PLAIN: 469 470 def logical_id_fn(idx, _, disk): 471 vg = disk.get(constants.IDISK_VG, vgname) 472 return (vg, names[idx])
473 474 elif template_name == constants.DT_GLUSTER: 475 logical_id_fn = lambda _1, disk_index, _2: \ 476 (file_driver, "ganeti/%s.%d" % (instance_uuid, 477 disk_index)) 478 479 elif template_name in constants.DTS_FILEBASED: # Gluster handled above 480 logical_id_fn = \ 481 lambda _, disk_index, disk: (file_driver, 482 "%s/%s" % (file_storage_dir, 483 names[idx])) 484 elif template_name == constants.DT_BLOCK: 485 logical_id_fn = \ 486 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL, 487 disk[constants.IDISK_ADOPT]) 488 elif template_name == constants.DT_RBD: 489 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx]) 490 elif template_name == constants.DT_EXT: 491 def logical_id_fn(idx, _, disk): 492 provider = disk.get(constants.IDISK_PROVIDER, None) 493 if provider is None: 494 raise errors.ProgrammerError("Disk template is %s, but '%s' is" 495 " not found", constants.DT_EXT, 496 constants.IDISK_PROVIDER) 497 return (provider, names[idx]) 498 else: 499 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) 500 501 dev_type = template_name 502 503 for idx, disk in enumerate(disk_info): 504 params = {} 505 # Only for the Ext template add disk_info to params 506 if template_name == constants.DT_EXT: 507 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER] 508 for key in disk: 509 if key not in constants.IDISK_PARAMS: 510 params[key] = disk[key] 511 disk_index = idx + base_index 512 size = disk[constants.IDISK_SIZE] 513 feedback_fn("* disk %s, size %s" % 514 (disk_index, utils.FormatUnit(size, "h"))) 515 disk_dev = objects.Disk(dev_type=dev_type, size=size, 516 logical_id=logical_id_fn(idx, disk_index, disk), 517 iv_name="disk/%d" % disk_index, 518 mode=disk[constants.IDISK_MODE], 519 params=params, 520 spindles=disk.get(constants.IDISK_SPINDLES)) 521 disk_dev.name = disk.get(constants.IDISK_NAME, None) 522 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 523 disks.append(disk_dev) 524 525 return disks 526
527 528 -def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
529 """Check the presence of the spindle options with exclusive_storage. 530 531 @type diskdict: dict 532 @param diskdict: disk parameters 533 @type es_flag: bool 534 @param es_flag: the effective value of the exlusive_storage flag 535 @type required: bool 536 @param required: whether spindles are required or just optional 537 @raise errors.OpPrereqError when spindles are given and they should not 538 539 """ 540 if (not es_flag and constants.IDISK_SPINDLES in diskdict and 541 diskdict[constants.IDISK_SPINDLES] is not None): 542 raise errors.OpPrereqError("Spindles in instance disks cannot be specified" 543 " when exclusive storage is not active", 544 errors.ECODE_INVAL) 545 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or 546 diskdict[constants.IDISK_SPINDLES] is None)): 547 raise errors.OpPrereqError("You must specify spindles in instance disks" 548 " when exclusive storage is active", 549 errors.ECODE_INVAL)
550
551 552 -class LUInstanceRecreateDisks(LogicalUnit):
553 """Recreate an instance's missing disks. 554 555 """ 556 HPATH = "instance-recreate-disks" 557 HTYPE = constants.HTYPE_INSTANCE 558 REQ_BGL = False 559 560 _MODIFYABLE = compat.UniqueFrozenset([ 561 constants.IDISK_SIZE, 562 constants.IDISK_MODE, 563 constants.IDISK_SPINDLES, 564 ]) 565 566 # New or changed disk parameters may have different semantics 567 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([ 568 constants.IDISK_ADOPT, 569 570 # TODO: Implement support changing VG while recreating 571 constants.IDISK_VG, 572 constants.IDISK_METAVG, 573 constants.IDISK_PROVIDER, 574 constants.IDISK_NAME, 575 ])) 576
577 - def _RunAllocator(self):
578 """Run the allocator based on input opcode. 579 580 """ 581 be_full = self.cfg.GetClusterInfo().FillBE(self.instance) 582 583 # FIXME 584 # The allocator should actually run in "relocate" mode, but current 585 # allocators don't support relocating all the nodes of an instance at 586 # the same time. As a workaround we use "allocate" mode, but this is 587 # suboptimal for two reasons: 588 # - The instance name passed to the allocator is present in the list of 589 # existing instances, so there could be a conflict within the 590 # internal structures of the allocator. This doesn't happen with the 591 # current allocators, but it's a liability. 592 # - The allocator counts the resources used by the instance twice: once 593 # because the instance exists already, and once because it tries to 594 # allocate a new instance. 595 # The allocator could choose some of the nodes on which the instance is 596 # running, but that's not a problem. If the instance nodes are broken, 597 # they should be already be marked as drained or offline, and hence 598 # skipped by the allocator. If instance disks have been lost for other 599 # reasons, then recreating the disks on the same nodes should be fine. 600 disk_template = self.instance.disk_template 601 spindle_use = be_full[constants.BE_SPINDLE_USE] 602 disks = [{ 603 constants.IDISK_SIZE: d.size, 604 constants.IDISK_MODE: d.mode, 605 constants.IDISK_SPINDLES: d.spindles, 606 } for d in self.instance.disks] 607 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name, 608 disk_template=disk_template, 609 tags=list(self.instance.GetTags()), 610 os=self.instance.os, 611 nics=[{}], 612 vcpus=be_full[constants.BE_VCPUS], 613 memory=be_full[constants.BE_MAXMEM], 614 spindle_use=spindle_use, 615 disks=disks, 616 hypervisor=self.instance.hypervisor, 617 node_whitelist=None) 618 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 619 620 ial.Run(self.op.iallocator) 621 622 assert req.RequiredNodes() == len(self.instance.all_nodes) 623 624 if not ial.success: 625 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 626 " %s" % (self.op.iallocator, ial.info), 627 errors.ECODE_NORES) 628 629 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result) 630 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 631 self.op.instance_name, self.op.iallocator, 632 utils.CommaJoin(self.op.nodes))
633
634 - def CheckArguments(self):
635 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]): 636 # Normalize and convert deprecated list of disk indices 637 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] 638 639 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks)) 640 if duplicates: 641 raise errors.OpPrereqError("Some disks have been specified more than" 642 " once: %s" % utils.CommaJoin(duplicates), 643 errors.ECODE_INVAL) 644 645 # We don't want _CheckIAllocatorOrNode selecting the default iallocator 646 # when neither iallocator nor nodes are specified 647 if self.op.iallocator or self.op.nodes: 648 CheckIAllocatorOrNode(self, "iallocator", "nodes") 649 650 for (idx, params) in self.op.disks: 651 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) 652 unsupported = frozenset(params.keys()) - self._MODIFYABLE 653 if unsupported: 654 raise errors.OpPrereqError("Parameters for disk %s try to change" 655 " unmodifyable parameter(s): %s" % 656 (idx, utils.CommaJoin(unsupported)), 657 errors.ECODE_INVAL)
658
659 - def ExpandNames(self):
660 self._ExpandAndLockInstance() 661 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 662 663 if self.op.nodes: 664 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes) 665 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids) 666 else: 667 self.needed_locks[locking.LEVEL_NODE] = [] 668 if self.op.iallocator: 669 # iallocator will select a new node in the same group 670 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 671 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 672 673 self.needed_locks[locking.LEVEL_NODE_RES] = []
674
675 - def DeclareLocks(self, level):
676 if level == locking.LEVEL_NODEGROUP: 677 assert self.op.iallocator is not None 678 assert not self.op.nodes 679 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 680 self.share_locks[locking.LEVEL_NODEGROUP] = 1 681 # Lock the primary group used by the instance optimistically; this 682 # requires going via the node before it's locked, requiring 683 # verification later on 684 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 685 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True) 686 687 elif level == locking.LEVEL_NODE: 688 # If an allocator is used, then we lock all the nodes in the current 689 # instance group, as we don't know yet which ones will be selected; 690 # if we replace the nodes without using an allocator, locks are 691 # already declared in ExpandNames; otherwise, we need to lock all the 692 # instance nodes for disk re-creation 693 if self.op.iallocator: 694 assert not self.op.nodes 695 assert not self.needed_locks[locking.LEVEL_NODE] 696 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1 697 698 # Lock member nodes of the group of the primary node 699 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP): 700 self.needed_locks[locking.LEVEL_NODE].extend( 701 self.cfg.GetNodeGroup(group_uuid).members) 702 703 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) 704 elif not self.op.nodes: 705 self._LockInstancesNodes(primary_only=False) 706 elif level == locking.LEVEL_NODE_RES: 707 # Copy node locks 708 self.needed_locks[locking.LEVEL_NODE_RES] = \ 709 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
710
711 - def BuildHooksEnv(self):
712 """Build hooks env. 713 714 This runs on master, primary and secondary nodes of the instance. 715 716 """ 717 return BuildInstanceHookEnvByObject(self, self.instance)
718
719 - def BuildHooksNodes(self):
720 """Build hooks nodes. 721 722 """ 723 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) 724 return (nl, nl)
725
726 - def CheckPrereq(self):
727 """Check prerequisites. 728 729 This checks that the instance is in the cluster and is not running. 730 731 """ 732 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 733 assert instance is not None, \ 734 "Cannot retrieve locked instance %s" % self.op.instance_name 735 if self.op.node_uuids: 736 if len(self.op.node_uuids) != len(instance.all_nodes): 737 raise errors.OpPrereqError("Instance %s currently has %d nodes, but" 738 " %d replacement nodes were specified" % 739 (instance.name, len(instance.all_nodes), 740 len(self.op.node_uuids)), 741 errors.ECODE_INVAL) 742 assert instance.disk_template != constants.DT_DRBD8 or \ 743 len(self.op.node_uuids) == 2 744 assert instance.disk_template != constants.DT_PLAIN or \ 745 len(self.op.node_uuids) == 1 746 primary_node = self.op.node_uuids[0] 747 else: 748 primary_node = instance.primary_node 749 if not self.op.iallocator: 750 CheckNodeOnline(self, primary_node) 751 752 if instance.disk_template == constants.DT_DISKLESS: 753 raise errors.OpPrereqError("Instance '%s' has no disks" % 754 self.op.instance_name, errors.ECODE_INVAL) 755 756 # Verify if node group locks are still correct 757 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 758 if owned_groups: 759 # Node group locks are acquired only for the primary node (and only 760 # when the allocator is used) 761 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups, 762 primary_only=True) 763 764 # if we replace nodes *and* the old primary is offline, we don't 765 # check the instance state 766 old_pnode = self.cfg.GetNodeInfo(instance.primary_node) 767 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline): 768 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, 769 msg="cannot recreate disks") 770 771 if self.op.disks: 772 self.disks = dict(self.op.disks) 773 else: 774 self.disks = dict((idx, {}) for idx in range(len(instance.disks))) 775 776 maxidx = max(self.disks.keys()) 777 if maxidx >= len(instance.disks): 778 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, 779 errors.ECODE_INVAL) 780 781 if ((self.op.node_uuids or self.op.iallocator) and 782 sorted(self.disks.keys()) != range(len(instance.disks))): 783 raise errors.OpPrereqError("Can't recreate disks partially and" 784 " change the nodes at the same time", 785 errors.ECODE_INVAL) 786 787 self.instance = instance 788 789 if self.op.iallocator: 790 self._RunAllocator() 791 # Release unneeded node and node resource locks 792 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids) 793 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids) 794 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) 795 796 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) 797 798 if self.op.node_uuids: 799 node_uuids = self.op.node_uuids 800 else: 801 node_uuids = instance.all_nodes 802 excl_stor = compat.any( 803 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values() 804 ) 805 for new_params in self.disks.values(): 806 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
807
808 - def Exec(self, feedback_fn):
809 """Recreate the disks. 810 811 """ 812 assert (self.owned_locks(locking.LEVEL_NODE) == 813 self.owned_locks(locking.LEVEL_NODE_RES)) 814 815 to_skip = [] 816 mods = [] # keeps track of needed changes 817 818 for idx, disk in enumerate(self.instance.disks): 819 try: 820 changes = self.disks[idx] 821 except KeyError: 822 # Disk should not be recreated 823 to_skip.append(idx) 824 continue 825 826 # update secondaries for disks, if needed 827 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8: 828 # need to update the nodes and minors 829 assert len(self.op.node_uuids) == 2 830 assert len(disk.logical_id) == 6 # otherwise disk internals 831 # have changed 832 (_, _, old_port, _, _, old_secret) = disk.logical_id 833 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids, 834 self.instance.uuid) 835 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port, 836 new_minors[0], new_minors[1], old_secret) 837 assert len(disk.logical_id) == len(new_id) 838 else: 839 new_id = None 840 841 mods.append((idx, new_id, changes)) 842 843 # now that we have passed all asserts above, we can apply the mods 844 # in a single run (to avoid partial changes) 845 for idx, new_id, changes in mods: 846 disk = self.instance.disks[idx] 847 if new_id is not None: 848 assert disk.dev_type == constants.DT_DRBD8 849 disk.logical_id = new_id 850 if changes: 851 disk.Update(size=changes.get(constants.IDISK_SIZE, None), 852 mode=changes.get(constants.IDISK_MODE, None), 853 spindles=changes.get(constants.IDISK_SPINDLES, None)) 854 855 # change primary node, if needed 856 if self.op.node_uuids: 857 self.instance.primary_node = self.op.node_uuids[0] 858 self.LogWarning("Changing the instance's nodes, you will have to" 859 " remove any disks left on the older nodes manually") 860 861 if self.op.node_uuids: 862 self.cfg.Update(self.instance, feedback_fn) 863 864 # All touched nodes must be locked 865 mylocks = self.owned_locks(locking.LEVEL_NODE) 866 assert mylocks.issuperset(frozenset(self.instance.all_nodes)) 867 new_disks = CreateDisks(self, self.instance, to_skip=to_skip) 868 869 # TODO: Release node locks before wiping, or explain why it's not possible 870 if self.cfg.GetClusterInfo().prealloc_wipe_disks: 871 wipedisks = [(idx, disk, 0) 872 for (idx, disk) in enumerate(self.instance.disks) 873 if idx not in to_skip] 874 WipeOrCleanupDisks(self, self.instance, disks=wipedisks, 875 cleanup=new_disks)
876
877 878 -def _PerformNodeInfoCall(lu, node_uuids, vg):
879 """Prepares the input and performs a node info call. 880 881 @type lu: C{LogicalUnit} 882 @param lu: a logical unit from which we get configuration data 883 @type node_uuids: list of string 884 @param node_uuids: list of node UUIDs to perform the call for 885 @type vg: string 886 @param vg: the volume group's name 887 888 """ 889 lvm_storage_units = [(constants.ST_LVM_VG, vg)] 890 storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units, 891 node_uuids) 892 hvname = lu.cfg.GetHypervisorType() 893 hvparams = lu.cfg.GetClusterInfo().hvparams 894 nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units, 895 [(hvname, hvparams[hvname])]) 896 return nodeinfo
897
898 899 -def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
900 """Checks the vg capacity for a given node. 901 902 @type node_info: tuple (_, list of dicts, _) 903 @param node_info: the result of the node info call for one node 904 @type node_name: string 905 @param node_name: the name of the node 906 @type vg: string 907 @param vg: volume group name 908 @type requested: int 909 @param requested: the amount of disk in MiB to check for 910 @raise errors.OpPrereqError: if the node doesn't have enough disk, 911 or we cannot check the node 912 913 """ 914 (_, space_info, _) = node_info 915 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType( 916 space_info, constants.ST_LVM_VG) 917 if not lvm_vg_info: 918 raise errors.OpPrereqError("Can't retrieve storage information for LVM") 919 vg_free = lvm_vg_info.get("storage_free", None) 920 if not isinstance(vg_free, int): 921 raise errors.OpPrereqError("Can't compute free disk space on node" 922 " %s for vg %s, result was '%s'" % 923 (node_name, vg, vg_free), errors.ECODE_ENVIRON) 924 if requested > vg_free: 925 raise errors.OpPrereqError("Not enough disk space on target node %s" 926 " vg %s: required %d MiB, available %d MiB" % 927 (node_name, vg, requested, vg_free), 928 errors.ECODE_NORES)
929
930 931 -def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
932 """Checks if nodes have enough free disk space in the specified VG. 933 934 This function checks if all given nodes have the needed amount of 935 free disk. In case any node has less disk or we cannot get the 936 information from the node, this function raises an OpPrereqError 937 exception. 938 939 @type lu: C{LogicalUnit} 940 @param lu: a logical unit from which we get configuration data 941 @type node_uuids: C{list} 942 @param node_uuids: the list of node UUIDs to check 943 @type vg: C{str} 944 @param vg: the volume group to check 945 @type requested: C{int} 946 @param requested: the amount of disk in MiB to check for 947 @raise errors.OpPrereqError: if the node doesn't have enough disk, 948 or we cannot check the node 949 950 """ 951 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg) 952 for node_uuid in node_uuids: 953 node_name = lu.cfg.GetNodeName(node_uuid) 954 info = nodeinfo[node_uuid] 955 info.Raise("Cannot get current information from node %s" % node_name, 956 prereq=True, ecode=errors.ECODE_ENVIRON) 957 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
958
959 960 -def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
961 """Checks if nodes have enough free disk space in all the VGs. 962 963 This function checks if all given nodes have the needed amount of 964 free disk. In case any node has less disk or we cannot get the 965 information from the node, this function raises an OpPrereqError 966 exception. 967 968 @type lu: C{LogicalUnit} 969 @param lu: a logical unit from which we get configuration data 970 @type node_uuids: C{list} 971 @param node_uuids: the list of node UUIDs to check 972 @type req_sizes: C{dict} 973 @param req_sizes: the hash of vg and corresponding amount of disk in 974 MiB to check for 975 @raise errors.OpPrereqError: if the node doesn't have enough disk, 976 or we cannot check the node 977 978 """ 979 for vg, req_size in req_sizes.items(): 980 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
981
982 983 -def _DiskSizeInBytesToMebibytes(lu, size):
984 """Converts a disk size in bytes to mebibytes. 985 986 Warns and rounds up if the size isn't an even multiple of 1 MiB. 987 988 """ 989 (mib, remainder) = divmod(size, 1024 * 1024) 990 991 if remainder != 0: 992 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up" 993 " to not overwrite existing data (%s bytes will not be" 994 " wiped)", (1024 * 1024) - remainder) 995 mib += 1 996 997 return mib
998
999 1000 -def _CalcEta(time_taken, written, total_size):
1001 """Calculates the ETA based on size written and total size. 1002 1003 @param time_taken: The time taken so far 1004 @param written: amount written so far 1005 @param total_size: The total size of data to be written 1006 @return: The remaining time in seconds 1007 1008 """ 1009 avg_time = time_taken / float(written) 1010 return (total_size - written) * avg_time
1011
1012 1013 -def WipeDisks(lu, instance, disks=None):
1014 """Wipes instance disks. 1015 1016 @type lu: L{LogicalUnit} 1017 @param lu: the logical unit on whose behalf we execute 1018 @type instance: L{objects.Instance} 1019 @param instance: the instance whose disks we should create 1020 @type disks: None or list of tuple of (number, L{objects.Disk}, number) 1021 @param disks: Disk details; tuple contains disk index, disk object and the 1022 start offset 1023 1024 """ 1025 node_uuid = instance.primary_node 1026 node_name = lu.cfg.GetNodeName(node_uuid) 1027 1028 if disks is None: 1029 disks = [(idx, disk, 0) 1030 for (idx, disk) in enumerate(instance.disks)] 1031 1032 logging.info("Pausing synchronization of disks of instance '%s'", 1033 instance.name) 1034 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1035 (map(compat.snd, disks), 1036 instance), 1037 True) 1038 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name) 1039 1040 for idx, success in enumerate(result.payload): 1041 if not success: 1042 logging.warn("Pausing synchronization of disk %s of instance '%s'" 1043 " failed", idx, instance.name) 1044 1045 try: 1046 for (idx, device, offset) in disks: 1047 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but 1048 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors. 1049 wipe_chunk_size = \ 1050 int(min(constants.MAX_WIPE_CHUNK, 1051 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT)) 1052 1053 size = device.size 1054 last_output = 0 1055 start_time = time.time() 1056 1057 if offset == 0: 1058 info_text = "" 1059 else: 1060 info_text = (" (from %s to %s)" % 1061 (utils.FormatUnit(offset, "h"), 1062 utils.FormatUnit(size, "h"))) 1063 1064 lu.LogInfo("* Wiping disk %s%s", idx, info_text) 1065 1066 logging.info("Wiping disk %d for instance %s on node %s using" 1067 " chunk size %s", idx, instance.name, node_name, 1068 wipe_chunk_size) 1069 1070 while offset < size: 1071 wipe_size = min(wipe_chunk_size, size - offset) 1072 1073 logging.debug("Wiping disk %d, offset %s, chunk %s", 1074 idx, offset, wipe_size) 1075 1076 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance), 1077 offset, wipe_size) 1078 result.Raise("Could not wipe disk %d at offset %d for size %d" % 1079 (idx, offset, wipe_size)) 1080 1081 now = time.time() 1082 offset += wipe_size 1083 if now - last_output >= 60: 1084 eta = _CalcEta(now - start_time, offset, size) 1085 lu.LogInfo(" - done: %.1f%% ETA: %s", 1086 offset / float(size) * 100, utils.FormatSeconds(eta)) 1087 last_output = now 1088 finally: 1089 logging.info("Resuming synchronization of disks for instance '%s'", 1090 instance.name) 1091 1092 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1093 (map(compat.snd, disks), 1094 instance), 1095 False) 1096 1097 if result.fail_msg: 1098 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s", 1099 node_name, result.fail_msg) 1100 else: 1101 for idx, success in enumerate(result.payload): 1102 if not success: 1103 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'" 1104 " failed", idx, instance.name)
1105
1106 1107 -def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1108 """Wrapper for L{WipeDisks} that handles errors. 1109 1110 @type lu: L{LogicalUnit} 1111 @param lu: the logical unit on whose behalf we execute 1112 @type instance: L{objects.Instance} 1113 @param instance: the instance whose disks we should wipe 1114 @param disks: see L{WipeDisks} 1115 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in 1116 case of error 1117 @raise errors.OpPrereqError: in case of failure 1118 1119 """ 1120 try: 1121 WipeDisks(lu, instance, disks=disks) 1122 except errors.OpExecError: 1123 logging.warning("Wiping disks for instance '%s' failed", 1124 instance.name) 1125 _UndoCreateDisks(lu, cleanup, instance) 1126 raise
1127
1128 1129 -def ExpandCheckDisks(instance, disks):
1130 """Return the instance disks selected by the disks list 1131 1132 @type disks: list of L{objects.Disk} or None 1133 @param disks: selected disks 1134 @rtype: list of L{objects.Disk} 1135 @return: selected instance disks to act on 1136 1137 """ 1138 if disks is None: 1139 return instance.disks 1140 else: 1141 if not set(disks).issubset(instance.disks): 1142 raise errors.ProgrammerError("Can only act on disks belonging to the" 1143 " target instance: expected a subset of %r," 1144 " got %r" % (instance.disks, disks)) 1145 return disks
1146
1147 1148 -def WaitForSync(lu, instance, disks=None, oneshot=False):
1149 """Sleep and poll for an instance's disk to sync. 1150 1151 """ 1152 if not instance.disks or disks is not None and not disks: 1153 return True 1154 1155 disks = ExpandCheckDisks(instance, disks) 1156 1157 if not oneshot: 1158 lu.LogInfo("Waiting for instance %s to sync disks", instance.name) 1159 1160 node_uuid = instance.primary_node 1161 node_name = lu.cfg.GetNodeName(node_uuid) 1162 1163 # TODO: Convert to utils.Retry 1164 1165 retries = 0 1166 degr_retries = 10 # in seconds, as we sleep 1 second each time 1167 while True: 1168 max_time = 0 1169 done = True 1170 cumul_degraded = False 1171 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance)) 1172 msg = rstats.fail_msg 1173 if msg: 1174 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg) 1175 retries += 1 1176 if retries >= 10: 1177 raise errors.RemoteError("Can't contact node %s for mirror data," 1178 " aborting." % node_name) 1179 time.sleep(6) 1180 continue 1181 rstats = rstats.payload 1182 retries = 0 1183 for i, mstat in enumerate(rstats): 1184 if mstat is None: 1185 lu.LogWarning("Can't compute data for node %s/%s", 1186 node_name, disks[i].iv_name) 1187 continue 1188 1189 cumul_degraded = (cumul_degraded or 1190 (mstat.is_degraded and mstat.sync_percent is None)) 1191 if mstat.sync_percent is not None: 1192 done = False 1193 if mstat.estimated_time is not None: 1194 rem_time = ("%s remaining (estimated)" % 1195 utils.FormatSeconds(mstat.estimated_time)) 1196 max_time = mstat.estimated_time 1197 else: 1198 rem_time = "no time estimate" 1199 max_time = 5 # sleep at least a bit between retries 1200 lu.LogInfo("- device %s: %5.2f%% done, %s", 1201 disks[i].iv_name, mstat.sync_percent, rem_time) 1202 1203 # if we're done but degraded, let's do a few small retries, to 1204 # make sure we see a stable and not transient situation; therefore 1205 # we force restart of the loop 1206 if (done or oneshot) and cumul_degraded and degr_retries > 0: 1207 logging.info("Degraded disks found, %d retries left", degr_retries) 1208 degr_retries -= 1 1209 time.sleep(1) 1210 continue 1211 1212 if done or oneshot: 1213 break 1214 1215 time.sleep(min(60, max_time)) 1216 1217 if done: 1218 lu.LogInfo("Instance %s's disks are in sync", instance.name) 1219 1220 return not cumul_degraded
1221
1222 1223 -def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1224 """Shutdown block devices of an instance. 1225 1226 This does the shutdown on all nodes of the instance. 1227 1228 If the ignore_primary is false, errors on the primary node are 1229 ignored. 1230 1231 """ 1232 all_result = True 1233 1234 if disks is None: 1235 # only mark instance disks as inactive if all disks are affected 1236 lu.cfg.MarkInstanceDisksInactive(instance.uuid) 1237 disks = ExpandCheckDisks(instance, disks) 1238 1239 for disk in disks: 1240 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node): 1241 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance)) 1242 msg = result.fail_msg 1243 if msg: 1244 lu.LogWarning("Could not shutdown block device %s on node %s: %s", 1245 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1246 if ((node_uuid == instance.primary_node and not ignore_primary) or 1247 (node_uuid != instance.primary_node and not result.offline)): 1248 all_result = False 1249 return all_result
1250
1251 1252 -def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1253 """Shutdown block devices of an instance. 1254 1255 This function checks if an instance is running, before calling 1256 _ShutdownInstanceDisks. 1257 1258 """ 1259 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks") 1260 ShutdownInstanceDisks(lu, instance, disks=disks)
1261
1262 1263 -def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, 1264 ignore_size=False):
1265 """Prepare the block devices for an instance. 1266 1267 This sets up the block devices on all nodes. 1268 1269 @type lu: L{LogicalUnit} 1270 @param lu: the logical unit on whose behalf we execute 1271 @type instance: L{objects.Instance} 1272 @param instance: the instance for whose disks we assemble 1273 @type disks: list of L{objects.Disk} or None 1274 @param disks: which disks to assemble (or all, if None) 1275 @type ignore_secondaries: boolean 1276 @param ignore_secondaries: if true, errors on secondary nodes 1277 won't result in an error return from the function 1278 @type ignore_size: boolean 1279 @param ignore_size: if true, the current known size of the disk 1280 will not be used during the disk activation, useful for cases 1281 when the size is wrong 1282 @return: False if the operation failed, otherwise a list of 1283 (host, instance_visible_name, node_visible_name) 1284 with the mapping from node devices to instance devices 1285 1286 """ 1287 device_info = [] 1288 disks_ok = True 1289 1290 if disks is None: 1291 # only mark instance disks as active if all disks are affected 1292 lu.cfg.MarkInstanceDisksActive(instance.uuid) 1293 1294 disks = ExpandCheckDisks(instance, disks) 1295 1296 # With the two passes mechanism we try to reduce the window of 1297 # opportunity for the race condition of switching DRBD to primary 1298 # before handshaking occured, but we do not eliminate it 1299 1300 # The proper fix would be to wait (with some limits) until the 1301 # connection has been made and drbd transitions from WFConnection 1302 # into any other network-connected state (Connected, SyncTarget, 1303 # SyncSource, etc.) 1304 1305 # 1st pass, assemble on all nodes in secondary mode 1306 for idx, inst_disk in enumerate(disks): 1307 for node_uuid, node_disk in inst_disk.ComputeNodeTree( 1308 instance.primary_node): 1309 if ignore_size: 1310 node_disk = node_disk.Copy() 1311 node_disk.UnsetSize() 1312 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance), 1313 instance, False, idx) 1314 msg = result.fail_msg 1315 if msg: 1316 is_offline_secondary = (node_uuid in instance.secondary_nodes and 1317 result.offline) 1318 lu.LogWarning("Could not prepare block device %s on node %s" 1319 " (is_primary=False, pass=1): %s", 1320 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1321 if not (ignore_secondaries or is_offline_secondary): 1322 disks_ok = False 1323 1324 # FIXME: race condition on drbd migration to primary 1325 1326 # 2nd pass, do only the primary node 1327 for idx, inst_disk in enumerate(disks): 1328 dev_path = None 1329 1330 for node_uuid, node_disk in inst_disk.ComputeNodeTree( 1331 instance.primary_node): 1332 if node_uuid != instance.primary_node: 1333 continue 1334 if ignore_size: 1335 node_disk = node_disk.Copy() 1336 node_disk.UnsetSize() 1337 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance), 1338 instance, True, idx) 1339 msg = result.fail_msg 1340 if msg: 1341 lu.LogWarning("Could not prepare block device %s on node %s" 1342 " (is_primary=True, pass=2): %s", 1343 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1344 disks_ok = False 1345 else: 1346 dev_path, _, __ = result.payload 1347 1348 device_info.append((lu.cfg.GetNodeName(instance.primary_node), 1349 inst_disk.iv_name, dev_path)) 1350 1351 if not disks_ok: 1352 lu.cfg.MarkInstanceDisksInactive(instance.uuid) 1353 1354 return disks_ok, device_info
1355
1356 1357 -def StartInstanceDisks(lu, instance, force):
1358 """Start the disks of an instance. 1359 1360 """ 1361 disks_ok, _ = AssembleInstanceDisks(lu, instance, 1362 ignore_secondaries=force) 1363 if not disks_ok: 1364 ShutdownInstanceDisks(lu, instance) 1365 if force is not None and not force: 1366 lu.LogWarning("", 1367 hint=("If the message above refers to a secondary node," 1368 " you can retry the operation using '--force'")) 1369 raise errors.OpExecError("Disk consistency error")
1370
1371 1372 -class LUInstanceGrowDisk(LogicalUnit):
1373 """Grow a disk of an instance. 1374 1375 """ 1376 HPATH = "disk-grow" 1377 HTYPE = constants.HTYPE_INSTANCE 1378 REQ_BGL = False 1379
1380 - def ExpandNames(self):
1381 self._ExpandAndLockInstance() 1382 self.needed_locks[locking.LEVEL_NODE] = [] 1383 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1384 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1385 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1386
1387 - def DeclareLocks(self, level):
1388 if level == locking.LEVEL_NODE: 1389 self._LockInstancesNodes() 1390 elif level == locking.LEVEL_NODE_RES: 1391 # Copy node locks 1392 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1393 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1394
1395 - def BuildHooksEnv(self):
1396 """Build hooks env. 1397 1398 This runs on the master, the primary and all the secondaries. 1399 1400 """ 1401 env = { 1402 "DISK": self.op.disk, 1403 "AMOUNT": self.op.amount, 1404 "ABSOLUTE": self.op.absolute, 1405 } 1406 env.update(BuildInstanceHookEnvByObject(self, self.instance)) 1407 return env
1408
1409 - def BuildHooksNodes(self):
1410 """Build hooks nodes. 1411 1412 """ 1413 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) 1414 return (nl, nl)
1415
1416 - def CheckPrereq(self):
1417 """Check prerequisites. 1418 1419 This checks that the instance is in the cluster. 1420 1421 """ 1422 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1423 assert self.instance is not None, \ 1424 "Cannot retrieve locked instance %s" % self.op.instance_name 1425 node_uuids = list(self.instance.all_nodes) 1426 for node_uuid in node_uuids: 1427 CheckNodeOnline(self, node_uuid) 1428 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids) 1429 1430 if self.instance.disk_template not in constants.DTS_GROWABLE: 1431 raise errors.OpPrereqError("Instance's disk layout does not support" 1432 " growing", errors.ECODE_INVAL) 1433 1434 self.disk = self.instance.FindDisk(self.op.disk) 1435 1436 if self.op.absolute: 1437 self.target = self.op.amount 1438 self.delta = self.target - self.disk.size 1439 if self.delta < 0: 1440 raise errors.OpPrereqError("Requested size (%s) is smaller than " 1441 "current disk size (%s)" % 1442 (utils.FormatUnit(self.target, "h"), 1443 utils.FormatUnit(self.disk.size, "h")), 1444 errors.ECODE_STATE) 1445 else: 1446 self.delta = self.op.amount 1447 self.target = self.disk.size + self.delta 1448 if self.delta < 0: 1449 raise errors.OpPrereqError("Requested increment (%s) is negative" % 1450 utils.FormatUnit(self.delta, "h"), 1451 errors.ECODE_INVAL) 1452 1453 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1454
1455 - def _CheckDiskSpace(self, node_uuids, req_vgspace):
1456 template = self.instance.disk_template 1457 if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and 1458 not any(self.node_es_flags.values())): 1459 # TODO: check the free disk space for file, when that feature will be 1460 # supported 1461 # With exclusive storage we need to do something smarter than just looking 1462 # at free space, which, in the end, is basically a dry run. So we rely on 1463 # the dry run performed in Exec() instead. 1464 CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1465
1466 - def Exec(self, feedback_fn):
1467 """Execute disk grow. 1468 1469 """ 1470 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) 1471 assert (self.owned_locks(locking.LEVEL_NODE) == 1472 self.owned_locks(locking.LEVEL_NODE_RES)) 1473 1474 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks 1475 1476 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk]) 1477 if not disks_ok: 1478 raise errors.OpExecError("Cannot activate block device to grow") 1479 1480 feedback_fn("Growing disk %s of instance '%s' by %s to %s" % 1481 (self.op.disk, self.instance.name, 1482 utils.FormatUnit(self.delta, "h"), 1483 utils.FormatUnit(self.target, "h"))) 1484 1485 # First run all grow ops in dry-run mode 1486 for node_uuid in self.instance.all_nodes: 1487 result = self.rpc.call_blockdev_grow(node_uuid, 1488 (self.disk, self.instance), 1489 self.delta, True, True, 1490 self.node_es_flags[node_uuid]) 1491 result.Raise("Dry-run grow request failed to node %s" % 1492 self.cfg.GetNodeName(node_uuid)) 1493 1494 if wipe_disks: 1495 # Get disk size from primary node for wiping 1496 result = self.rpc.call_blockdev_getdimensions( 1497 self.instance.primary_node, [([self.disk], self.instance)]) 1498 result.Raise("Failed to retrieve disk size from node '%s'" % 1499 self.instance.primary_node) 1500 1501 (disk_dimensions, ) = result.payload 1502 1503 if disk_dimensions is None: 1504 raise errors.OpExecError("Failed to retrieve disk size from primary" 1505 " node '%s'" % self.instance.primary_node) 1506 (disk_size_in_bytes, _) = disk_dimensions 1507 1508 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes) 1509 1510 assert old_disk_size >= self.disk.size, \ 1511 ("Retrieved disk size too small (got %s, should be at least %s)" % 1512 (old_disk_size, self.disk.size)) 1513 else: 1514 old_disk_size = None 1515 1516 # We know that (as far as we can test) operations across different 1517 # nodes will succeed, time to run it for real on the backing storage 1518 for node_uuid in self.instance.all_nodes: 1519 result = self.rpc.call_blockdev_grow(node_uuid, 1520 (self.disk, self.instance), 1521 self.delta, False, True, 1522 self.node_es_flags[node_uuid]) 1523 result.Raise("Grow request failed to node %s" % 1524 self.cfg.GetNodeName(node_uuid)) 1525 1526 # And now execute it for logical storage, on the primary node 1527 node_uuid = self.instance.primary_node 1528 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance), 1529 self.delta, False, False, 1530 self.node_es_flags[node_uuid]) 1531 result.Raise("Grow request failed to node %s" % 1532 self.cfg.GetNodeName(node_uuid)) 1533 1534 self.disk.RecordGrow(self.delta) 1535 self.cfg.Update(self.instance, feedback_fn) 1536 1537 # Changes have been recorded, release node lock 1538 ReleaseLocks(self, locking.LEVEL_NODE) 1539 1540 # Downgrade lock while waiting for sync 1541 self.glm.downgrade(locking.LEVEL_INSTANCE) 1542 1543 assert wipe_disks ^ (old_disk_size is None) 1544 1545 if wipe_disks: 1546 assert self.instance.disks[self.op.disk] == self.disk 1547 1548 # Wipe newly added disk space 1549 WipeDisks(self, self.instance, 1550 disks=[(self.op.disk, self.disk, old_disk_size)]) 1551 1552 if self.op.wait_for_sync: 1553 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk]) 1554 if disk_abort: 1555 self.LogWarning("Disk syncing has not returned a good status; check" 1556 " the instance") 1557 if not self.instance.disks_active: 1558 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk]) 1559 elif not self.instance.disks_active: 1560 self.LogWarning("Not shutting down the disk even if the instance is" 1561 " not supposed to be running because no wait for" 1562 " sync mode was requested") 1563 1564 assert self.owned_locks(locking.LEVEL_NODE_RES) 1565 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1566
1567 1568 -class LUInstanceReplaceDisks(LogicalUnit):
1569 """Replace the disks of an instance. 1570 1571 """ 1572 HPATH = "mirrors-replace" 1573 HTYPE = constants.HTYPE_INSTANCE 1574 REQ_BGL = False 1575
1576 - def CheckArguments(self):
1577 """Check arguments. 1578 1579 """ 1580 if self.op.mode == constants.REPLACE_DISK_CHG: 1581 if self.op.remote_node is None and self.op.iallocator is None: 1582 raise errors.OpPrereqError("When changing the secondary either an" 1583 " iallocator script must be used or the" 1584 " new node given", errors.ECODE_INVAL) 1585 else: 1586 CheckIAllocatorOrNode(self, "iallocator", "remote_node") 1587 1588 elif self.op.remote_node is not None or self.op.iallocator is not None: 1589 # Not replacing the secondary 1590 raise errors.OpPrereqError("The iallocator and new node options can" 1591 " only be used when changing the" 1592 " secondary node", errors.ECODE_INVAL)
1593
1594 - def ExpandNames(self):
1595 self._ExpandAndLockInstance() 1596 1597 assert locking.LEVEL_NODE not in self.needed_locks 1598 assert locking.LEVEL_NODE_RES not in self.needed_locks 1599 assert locking.LEVEL_NODEGROUP not in self.needed_locks 1600 1601 assert self.op.iallocator is None or self.op.remote_node is None, \ 1602 "Conflicting options" 1603 1604 if self.op.remote_node is not None: 1605 (self.op.remote_node_uuid, self.op.remote_node) = \ 1606 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 1607 self.op.remote_node) 1608 1609 # Warning: do not remove the locking of the new secondary here 1610 # unless DRBD8Dev.AddChildren is changed to work in parallel; 1611 # currently it doesn't since parallel invocations of 1612 # FindUnusedMinor will conflict 1613 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid] 1614 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 1615 else: 1616 self.needed_locks[locking.LEVEL_NODE] = [] 1617 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1618 1619 if self.op.iallocator is not None: 1620 # iallocator will select a new node in the same group 1621 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 1622 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 1623 1624 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1625 1626 self.replacer = TLReplaceDisks(self, self.op.instance_uuid, 1627 self.op.instance_name, self.op.mode, 1628 self.op.iallocator, self.op.remote_node_uuid, 1629 self.op.disks, self.op.early_release, 1630 self.op.ignore_ipolicy) 1631 1632 self.tasklets = [self.replacer]
1633
1634 - def DeclareLocks(self, level):
1635 if level == locking.LEVEL_NODEGROUP: 1636 assert self.op.remote_node_uuid is None 1637 assert self.op.iallocator is not None 1638 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 1639 1640 self.share_locks[locking.LEVEL_NODEGROUP] = 1 1641 # Lock all groups used by instance optimistically; this requires going 1642 # via the node before it's locked, requiring verification later on 1643 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1644 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid) 1645 1646 elif level == locking.LEVEL_NODE: 1647 if self.op.iallocator is not None: 1648 assert self.op.remote_node_uuid is None 1649 assert not self.needed_locks[locking.LEVEL_NODE] 1650 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) 1651 1652 # Lock member nodes of all locked groups 1653 self.needed_locks[locking.LEVEL_NODE] = \ 1654 [node_uuid 1655 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 1656 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members] 1657 else: 1658 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) 1659 1660 self._LockInstancesNodes() 1661 1662 elif level == locking.LEVEL_NODE_RES: 1663 # Reuse node locks 1664 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1665 self.needed_locks[locking.LEVEL_NODE]
1666
1667 - def BuildHooksEnv(self):
1668 """Build hooks env. 1669 1670 This runs on the master, the primary and all the secondaries. 1671 1672 """ 1673 instance = self.replacer.instance 1674 env = { 1675 "MODE": self.op.mode, 1676 "NEW_SECONDARY": self.op.remote_node, 1677 "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]), 1678 } 1679 env.update(BuildInstanceHookEnvByObject(self, instance)) 1680 return env
1681
1682 - def BuildHooksNodes(self):
1683 """Build hooks nodes. 1684 1685 """ 1686 instance = self.replacer.instance 1687 nl = [ 1688 self.cfg.GetMasterNode(), 1689 instance.primary_node, 1690 ] 1691 if self.op.remote_node_uuid is not None: 1692 nl.append(self.op.remote_node_uuid) 1693 return nl, nl
1694
1695 - def CheckPrereq(self):
1696 """Check prerequisites. 1697 1698 """ 1699 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or 1700 self.op.iallocator is None) 1701 1702 # Verify if node group locks are still correct 1703 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 1704 if owned_groups: 1705 CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups) 1706 1707 return LogicalUnit.CheckPrereq(self)
1708
1709 1710 -class LUInstanceActivateDisks(NoHooksLU):
1711 """Bring up an instance's disks. 1712 1713 """ 1714 REQ_BGL = False 1715
1716 - def ExpandNames(self):
1717 self._ExpandAndLockInstance() 1718 self.needed_locks[locking.LEVEL_NODE] = [] 1719 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1720
1721 - def DeclareLocks(self, level):
1722 if level == locking.LEVEL_NODE: 1723 self._LockInstancesNodes()
1724
1725 - def CheckPrereq(self):
1726 """Check prerequisites. 1727 1728 This checks that the instance is in the cluster. 1729 1730 """ 1731 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1732 assert self.instance is not None, \ 1733 "Cannot retrieve locked instance %s" % self.op.instance_name 1734 CheckNodeOnline(self, self.instance.primary_node)
1735
1736 - def Exec(self, feedback_fn):
1737 """Activate the disks. 1738 1739 """ 1740 disks_ok, disks_info = \ 1741 AssembleInstanceDisks(self, self.instance, 1742 ignore_size=self.op.ignore_size) 1743 if not disks_ok: 1744 raise errors.OpExecError("Cannot activate block devices") 1745 1746 if self.op.wait_for_sync: 1747 if not WaitForSync(self, self.instance): 1748 self.cfg.MarkInstanceDisksInactive(self.instance.uuid) 1749 raise errors.OpExecError("Some disks of the instance are degraded!") 1750 1751 return disks_info
1752
1753 1754 -class LUInstanceDeactivateDisks(NoHooksLU):
1755 """Shutdown an instance's disks. 1756 1757 """ 1758 REQ_BGL = False 1759
1760 - def ExpandNames(self):
1761 self._ExpandAndLockInstance() 1762 self.needed_locks[locking.LEVEL_NODE] = [] 1763 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1764
1765 - def DeclareLocks(self, level):
1766 if level == locking.LEVEL_NODE: 1767 self._LockInstancesNodes()
1768
1769 - def CheckPrereq(self):
1770 """Check prerequisites. 1771 1772 This checks that the instance is in the cluster. 1773 1774 """ 1775 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1776 assert self.instance is not None, \ 1777 "Cannot retrieve locked instance %s" % self.op.instance_name
1778
1779 - def Exec(self, feedback_fn):
1780 """Deactivate the disks 1781 1782 """ 1783 if self.op.force: 1784 ShutdownInstanceDisks(self, self.instance) 1785 else: 1786 _SafeShutdownInstanceDisks(self, self.instance)
1787
1788 1789 -def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary, 1790 ldisk=False):
1791 """Check that mirrors are not degraded. 1792 1793 @attention: The device has to be annotated already. 1794 1795 The ldisk parameter, if True, will change the test from the 1796 is_degraded attribute (which represents overall non-ok status for 1797 the device(s)) to the ldisk (representing the local storage status). 1798 1799 """ 1800 result = True 1801 1802 if on_primary or dev.AssembleOnSecondary(): 1803 rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance)) 1804 msg = rstats.fail_msg 1805 if msg: 1806 lu.LogWarning("Can't find disk on node %s: %s", 1807 lu.cfg.GetNodeName(node_uuid), msg) 1808 result = False 1809 elif not rstats.payload: 1810 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid)) 1811 result = False 1812 else: 1813 if ldisk: 1814 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY 1815 else: 1816 result = result and not rstats.payload.is_degraded 1817 1818 if dev.children: 1819 for child in dev.children: 1820 result = result and _CheckDiskConsistencyInner(lu, instance, child, 1821 node_uuid, on_primary) 1822 1823 return result
1824
1825 1826 -def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1827 """Wrapper around L{_CheckDiskConsistencyInner}. 1828 1829 """ 1830 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 1831 return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary, 1832 ldisk=ldisk)
1833
1834 1835 -def _BlockdevFind(lu, node_uuid, dev, instance):
1836 """Wrapper around call_blockdev_find to annotate diskparams. 1837 1838 @param lu: A reference to the lu object 1839 @param node_uuid: The node to call out 1840 @param dev: The device to find 1841 @param instance: The instance object the device belongs to 1842 @returns The result of the rpc call 1843 1844 """ 1845 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 1846 return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1847
1848 1849 -def _GenerateUniqueNames(lu, exts):
1850 """Generate a suitable LV name. 1851 1852 This will generate a logical volume name for the given instance. 1853 1854 """ 1855 results = [] 1856 for val in exts: 1857 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 1858 results.append("%s%s" % (new_id, val)) 1859 return results
1860
1861 1862 -class TLReplaceDisks(Tasklet):
1863 """Replaces disks for an instance. 1864 1865 Note: Locking is not within the scope of this class. 1866 1867 """
1868 - def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name, 1869 remote_node_uuid, disks, early_release, ignore_ipolicy):
1870 """Initializes this class. 1871 1872 """ 1873 Tasklet.__init__(self, lu) 1874 1875 # Parameters 1876 self.instance_uuid = instance_uuid 1877 self.instance_name = instance_name 1878 self.mode = mode 1879 self.iallocator_name = iallocator_name 1880 self.remote_node_uuid = remote_node_uuid 1881 self.disks = disks 1882 self.early_release = early_release 1883 self.ignore_ipolicy = ignore_ipolicy 1884 1885 # Runtime data 1886 self.instance = None 1887 self.new_node_uuid = None 1888 self.target_node_uuid = None 1889 self.other_node_uuid = None 1890 self.remote_node_info = None 1891 self.node_secondary_ip = None
1892 1893 @staticmethod
1894 - def _RunAllocator(lu, iallocator_name, instance_uuid, 1895 relocate_from_node_uuids):
1896 """Compute a new secondary node using an IAllocator. 1897 1898 """ 1899 req = iallocator.IAReqRelocate( 1900 inst_uuid=instance_uuid, 1901 relocate_from_node_uuids=list(relocate_from_node_uuids)) 1902 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req) 1903 1904 ial.Run(iallocator_name) 1905 1906 if not ial.success: 1907 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 1908 " %s" % (iallocator_name, ial.info), 1909 errors.ECODE_NORES) 1910 1911 remote_node_name = ial.result[0] 1912 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name) 1913 1914 if remote_node is None: 1915 raise errors.OpPrereqError("Node %s not found in configuration" % 1916 remote_node_name, errors.ECODE_NOENT) 1917 1918 lu.LogInfo("Selected new secondary for instance '%s': %s", 1919 instance_uuid, remote_node_name) 1920 1921 return remote_node.uuid
1922
1923 - def _FindFaultyDisks(self, node_uuid):
1924 """Wrapper for L{FindFaultyInstanceDisks}. 1925 1926 """ 1927 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, 1928 node_uuid, True)
1929
1930 - def _CheckDisksActivated(self, instance):
1931 """Checks if the instance disks are activated. 1932 1933 @param instance: The instance to check disks 1934 @return: True if they are activated, False otherwise 1935 1936 """ 1937 node_uuids = instance.all_nodes 1938 1939 for idx, dev in enumerate(instance.disks): 1940 for node_uuid in node_uuids: 1941 self.lu.LogInfo("Checking disk/%d on %s", idx, 1942 self.cfg.GetNodeName(node_uuid)) 1943 1944 result = _BlockdevFind(self, node_uuid, dev, instance) 1945 1946 if result.offline: 1947 continue 1948 elif result.fail_msg or not result.payload: 1949 return False 1950 1951 return True
1952
1953 - def CheckPrereq(self):
1954 """Check prerequisites. 1955 1956 This checks that the instance is in the cluster. 1957 1958 """ 1959 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 1960 assert self.instance is not None, \ 1961 "Cannot retrieve locked instance %s" % self.instance_name 1962 1963 if self.instance.disk_template != constants.DT_DRBD8: 1964 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" 1965 " instances", errors.ECODE_INVAL) 1966 1967 if len(self.instance.secondary_nodes) != 1: 1968 raise errors.OpPrereqError("The instance has a strange layout," 1969 " expected one secondary but found %d" % 1970 len(self.instance.secondary_nodes), 1971 errors.ECODE_FAULT) 1972 1973 secondary_node_uuid = self.instance.secondary_nodes[0] 1974 1975 if self.iallocator_name is None: 1976 remote_node_uuid = self.remote_node_uuid 1977 else: 1978 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name, 1979 self.instance.uuid, 1980 self.instance.secondary_nodes) 1981 1982 if remote_node_uuid is None: 1983 self.remote_node_info = None 1984 else: 1985 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \ 1986 "Remote node '%s' is not locked" % remote_node_uuid 1987 1988 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid) 1989 assert self.remote_node_info is not None, \ 1990 "Cannot retrieve locked node %s" % remote_node_uuid 1991 1992 if remote_node_uuid == self.instance.primary_node: 1993 raise errors.OpPrereqError("The specified node is the primary node of" 1994 " the instance", errors.ECODE_INVAL) 1995 1996 if remote_node_uuid == secondary_node_uuid: 1997 raise errors.OpPrereqError("The specified node is already the" 1998 " secondary node of the instance", 1999 errors.ECODE_INVAL) 2000 2001 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, 2002 constants.REPLACE_DISK_CHG): 2003 raise errors.OpPrereqError("Cannot specify disks to be replaced", 2004 errors.ECODE_INVAL) 2005 2006 if self.mode == constants.REPLACE_DISK_AUTO: 2007 if not self._CheckDisksActivated(self.instance): 2008 raise errors.OpPrereqError("Please run activate-disks on instance %s" 2009 " first" % self.instance_name, 2010 errors.ECODE_STATE) 2011 faulty_primary = self._FindFaultyDisks(self.instance.primary_node) 2012 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid) 2013 2014 if faulty_primary and faulty_secondary: 2015 raise errors.OpPrereqError("Instance %s has faulty disks on more than" 2016 " one node and can not be repaired" 2017 " automatically" % self.instance_name, 2018 errors.ECODE_STATE) 2019 2020 if faulty_primary: 2021 self.disks = faulty_primary 2022 self.target_node_uuid = self.instance.primary_node 2023 self.other_node_uuid = secondary_node_uuid 2024 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2025 elif faulty_secondary: 2026 self.disks = faulty_secondary 2027 self.target_node_uuid = secondary_node_uuid 2028 self.other_node_uuid = self.instance.primary_node 2029 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2030 else: 2031 self.disks = [] 2032 check_nodes = [] 2033 2034 else: 2035 # Non-automatic modes 2036 if self.mode == constants.REPLACE_DISK_PRI: 2037 self.target_node_uuid = self.instance.primary_node 2038 self.other_node_uuid = secondary_node_uuid 2039 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2040 2041 elif self.mode == constants.REPLACE_DISK_SEC: 2042 self.target_node_uuid = secondary_node_uuid 2043 self.other_node_uuid = self.instance.primary_node 2044 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2045 2046 elif self.mode == constants.REPLACE_DISK_CHG: 2047 self.new_node_uuid = remote_node_uuid 2048 self.other_node_uuid = self.instance.primary_node 2049 self.target_node_uuid = secondary_node_uuid 2050 check_nodes = [self.new_node_uuid, self.other_node_uuid] 2051 2052 CheckNodeNotDrained(self.lu, remote_node_uuid) 2053 CheckNodeVmCapable(self.lu, remote_node_uuid) 2054 2055 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid) 2056 assert old_node_info is not None 2057 if old_node_info.offline and not self.early_release: 2058 # doesn't make sense to delay the release 2059 self.early_release = True 2060 self.lu.LogInfo("Old secondary %s is offline, automatically enabling" 2061 " early-release mode", secondary_node_uuid) 2062 2063 else: 2064 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % 2065 self.mode) 2066 2067 # If not specified all disks should be replaced 2068 if not self.disks: 2069 self.disks = range(len(self.instance.disks)) 2070 2071 # TODO: This is ugly, but right now we can't distinguish between internal 2072 # submitted opcode and external one. We should fix that. 2073 if self.remote_node_info: 2074 # We change the node, lets verify it still meets instance policy 2075 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) 2076 cluster = self.cfg.GetClusterInfo() 2077 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 2078 new_group_info) 2079 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, 2080 self.remote_node_info, self.cfg, 2081 ignore=self.ignore_ipolicy) 2082 2083 for node_uuid in check_nodes: 2084 CheckNodeOnline(self.lu, node_uuid) 2085 2086 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid, 2087 self.other_node_uuid, 2088 self.target_node_uuid] 2089 if node_uuid is not None) 2090 2091 # Release unneeded node and node resource locks 2092 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) 2093 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) 2094 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) 2095 2096 # Release any owned node group 2097 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) 2098 2099 # Check whether disks are valid 2100 for disk_idx in self.disks: 2101 self.instance.FindDisk(disk_idx) 2102 2103 # Get secondary node IP addresses 2104 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node) 2105 in self.cfg.GetMultiNodeInfo(touched_nodes))
2106
2107 - def Exec(self, feedback_fn):
2108 """Execute disk replacement. 2109 2110 This dispatches the disk replacement to the appropriate handler. 2111 2112 """ 2113 if __debug__: 2114 # Verify owned locks before starting operation 2115 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) 2116 assert set(owned_nodes) == set(self.node_secondary_ip), \ 2117 ("Incorrect node locks, owning %s, expected %s" % 2118 (owned_nodes, self.node_secondary_ip.keys())) 2119 assert (self.lu.owned_locks(locking.LEVEL_NODE) == 2120 self.lu.owned_locks(locking.LEVEL_NODE_RES)) 2121 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) 2122 2123 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) 2124 assert list(owned_instances) == [self.instance_name], \ 2125 "Instance '%s' not locked" % self.instance_name 2126 2127 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ 2128 "Should not own any node group lock at this point" 2129 2130 if not self.disks: 2131 feedback_fn("No disks need replacement for instance '%s'" % 2132 self.instance.name) 2133 return 2134 2135 feedback_fn("Replacing disk(s) %s for instance '%s'" % 2136 (utils.CommaJoin(self.disks), self.instance.name)) 2137 feedback_fn("Current primary node: %s" % 2138 self.cfg.GetNodeName(self.instance.primary_node)) 2139 feedback_fn("Current secondary node: %s" % 2140 utils.CommaJoin(self.cfg.GetNodeNames( 2141 self.instance.secondary_nodes))) 2142 2143 activate_disks = not self.instance.disks_active 2144 2145 # Activate the instance disks if we're replacing them on a down instance 2146 if activate_disks: 2147 StartInstanceDisks(self.lu, self.instance, True) 2148 2149 try: 2150 # Should we replace the secondary node? 2151 if self.new_node_uuid is not None: 2152 fn = self._ExecDrbd8Secondary 2153 else: 2154 fn = self._ExecDrbd8DiskOnly 2155 2156 result = fn(feedback_fn) 2157 finally: 2158 # Deactivate the instance disks if we're replacing them on a 2159 # down instance 2160 if activate_disks: 2161 _SafeShutdownInstanceDisks(self.lu, self.instance) 2162 2163 assert not self.lu.owned_locks(locking.LEVEL_NODE) 2164 2165 if __debug__: 2166 # Verify owned locks 2167 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES) 2168 nodes = frozenset(self.node_secondary_ip) 2169 assert ((self.early_release and not owned_nodes) or 2170 (not self.early_release and not (set(owned_nodes) - nodes))), \ 2171 ("Not owning the correct locks, early_release=%s, owned=%r," 2172 " nodes=%r" % (self.early_release, owned_nodes, nodes)) 2173 2174 return result
2175
2176 - def _CheckVolumeGroup(self, node_uuids):
2177 self.lu.LogInfo("Checking volume groups") 2178 2179 vgname = self.cfg.GetVGName() 2180 2181 # Make sure volume group exists on all involved nodes 2182 results = self.rpc.call_vg_list(node_uuids) 2183 if not results: 2184 raise errors.OpExecError("Can't list volume groups on the nodes") 2185 2186 for node_uuid in node_uuids: 2187 res = results[node_uuid] 2188 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid)) 2189 if vgname not in res.payload: 2190 raise errors.OpExecError("Volume group '%s' not found on node %s" % 2191 (vgname, self.cfg.GetNodeName(node_uuid)))
2192
2193 - def _CheckDisksExistence(self, node_uuids):
2194 # Check disk existence 2195 for idx, dev in enumerate(self.instance.disks): 2196 if idx not in self.disks: 2197 continue 2198 2199 for node_uuid in node_uuids: 2200 self.lu.LogInfo("Checking disk/%d on %s", idx, 2201 self.cfg.GetNodeName(node_uuid)) 2202 2203 result = _BlockdevFind(self, node_uuid, dev, self.instance) 2204 2205 msg = result.fail_msg 2206 if msg or not result.payload: 2207 if not msg: 2208 msg = "disk not found" 2209 if not self._CheckDisksActivated(self.instance): 2210 extra_hint = ("\nDisks seem to be not properly activated. Try" 2211 " running activate-disks on the instance before" 2212 " using replace-disks.") 2213 else: 2214 extra_hint = "" 2215 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" % 2216 (idx, self.cfg.GetNodeName(node_uuid), msg, 2217 extra_hint))
2218
2219 - def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2220 for idx, dev in enumerate(self.instance.disks): 2221 if idx not in self.disks: 2222 continue 2223 2224 self.lu.LogInfo("Checking disk/%d consistency on node %s" % 2225 (idx, self.cfg.GetNodeName(node_uuid))) 2226 2227 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid, 2228 on_primary, ldisk=ldisk): 2229 raise errors.OpExecError("Node %s has degraded storage, unsafe to" 2230 " replace disks for instance %s" % 2231 (self.cfg.GetNodeName(node_uuid), 2232 self.instance.name))
2233
2234 - def _CreateNewStorage(self, node_uuid):
2235 """Create new storage on the primary or secondary node. 2236 2237 This is only used for same-node replaces, not for changing the 2238 secondary node, hence we don't want to modify the existing disk. 2239 2240 """ 2241 iv_names = {} 2242 2243 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) 2244 for idx, dev in enumerate(disks): 2245 if idx not in self.disks: 2246 continue 2247 2248 self.lu.LogInfo("Adding storage on %s for disk/%d", 2249 self.cfg.GetNodeName(node_uuid), idx) 2250 2251 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] 2252 names = _GenerateUniqueNames(self.lu, lv_names) 2253 2254 (data_disk, meta_disk) = dev.children 2255 vg_data = data_disk.logical_id[0] 2256 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size, 2257 logical_id=(vg_data, names[0]), 2258 params=data_disk.params) 2259 vg_meta = meta_disk.logical_id[0] 2260 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN, 2261 size=constants.DRBD_META_SIZE, 2262 logical_id=(vg_meta, names[1]), 2263 params=meta_disk.params) 2264 2265 new_lvs = [lv_data, lv_meta] 2266 old_lvs = [child.Copy() for child in dev.children] 2267 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) 2268 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid) 2269 2270 # we pass force_create=True to force the LVM creation 2271 for new_lv in new_lvs: 2272 try: 2273 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True, 2274 GetInstanceInfoText(self.instance), False, 2275 excl_stor) 2276 except errors.DeviceCreationError, e: 2277 raise errors.OpExecError("Can't create block device: %s" % e.message) 2278 2279 return iv_names
2280
2281 - def _CheckDevices(self, node_uuid, iv_names):
2282 for name, (dev, _, _) in iv_names.iteritems(): 2283 result = _BlockdevFind(self, node_uuid, dev, self.instance) 2284 2285 msg = result.fail_msg 2286 if msg or not result.payload: 2287 if not msg: 2288 msg = "disk not found" 2289 raise errors.OpExecError("Can't find DRBD device %s: %s" % 2290 (name, msg)) 2291 2292 if result.payload.is_degraded: 2293 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2294
2295 - def _RemoveOldStorage(self, node_uuid, iv_names):
2296 for name, (_, old_lvs, _) in iv_names.iteritems(): 2297 self.lu.LogInfo("Remove logical volumes for %s", name) 2298 2299 for lv in old_lvs: 2300 msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \ 2301 .fail_msg 2302 if msg: 2303 self.lu.LogWarning("Can't remove old LV: %s", msg, 2304 hint="remove unused LVs manually")
2305
2306 - def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2307 """Replace a disk on the primary or secondary for DRBD 8. 2308 2309 The algorithm for replace is quite complicated: 2310 2311 1. for each disk to be replaced: 2312 2313 1. create new LVs on the target node with unique names 2314 1. detach old LVs from the drbd device 2315 1. rename old LVs to name_replaced.<time_t> 2316 1. rename new LVs to old LVs 2317 1. attach the new LVs (with the old names now) to the drbd device 2318 2319 1. wait for sync across all devices 2320 2321 1. for each modified disk: 2322 2323 1. remove old LVs (which have the name name_replaces.<time_t>) 2324 2325 Failures are not very well handled. 2326 2327 """ 2328 steps_total = 6 2329 2330 # Step: check device activation 2331 self.lu.LogStep(1, steps_total, "Check device existence") 2332 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid]) 2333 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid]) 2334 2335 # Step: check other node consistency 2336 self.lu.LogStep(2, steps_total, "Check peer consistency") 2337 self._CheckDisksConsistency( 2338 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node, 2339 False) 2340 2341 # Step: create new storage 2342 self.lu.LogStep(3, steps_total, "Allocate new storage") 2343 iv_names = self._CreateNewStorage(self.target_node_uuid) 2344 2345 # Step: for each lv, detach+rename*2+attach 2346 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2347 for dev, old_lvs, new_lvs in iv_names.itervalues(): 2348 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name) 2349 2350 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, 2351 (dev, self.instance), 2352 (old_lvs, self.instance)) 2353 result.Raise("Can't detach drbd from local storage on node" 2354 " %s for device %s" % 2355 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name)) 2356 #dev.children = [] 2357 #cfg.Update(instance) 2358 2359 # ok, we created the new LVs, so now we know we have the needed 2360 # storage; as such, we proceed on the target node to rename 2361 # old_lv to _old, and new_lv to old_lv; note that we rename LVs 2362 # using the assumption that logical_id == unique_id on that node 2363 2364 # FIXME(iustin): use a better name for the replaced LVs 2365 temp_suffix = int(time.time()) 2366 ren_fn = lambda d, suff: (d.logical_id[0], 2367 d.logical_id[1] + "_replaced-%s" % suff) 2368 2369 # Build the rename list based on what LVs exist on the node 2370 rename_old_to_new = [] 2371 for to_ren in old_lvs: 2372 result = self.rpc.call_blockdev_find(self.target_node_uuid, 2373 (to_ren, self.instance)) 2374 if not result.fail_msg and result.payload: 2375 # device exists 2376 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) 2377 2378 self.lu.LogInfo("Renaming the old LVs on the target node") 2379 result = self.rpc.call_blockdev_rename(self.target_node_uuid, 2380 rename_old_to_new) 2381 result.Raise("Can't rename old LVs on node %s" % 2382 self.cfg.GetNodeName(self.target_node_uuid)) 2383 2384 # Now we rename the new LVs to the old LVs 2385 self.lu.LogInfo("Renaming the new LVs on the target node") 2386 rename_new_to_old = [(new, old.logical_id) 2387 for old, new in zip(old_lvs, new_lvs)] 2388 result = self.rpc.call_blockdev_rename(self.target_node_uuid, 2389 rename_new_to_old) 2390 result.Raise("Can't rename new LVs on node %s" % 2391 self.cfg.GetNodeName(self.target_node_uuid)) 2392 2393 # Intermediate steps of in memory modifications 2394 for old, new in zip(old_lvs, new_lvs): 2395 new.logical_id = old.logical_id 2396 2397 # We need to modify old_lvs so that removal later removes the 2398 # right LVs, not the newly added ones; note that old_lvs is a 2399 # copy here 2400 for disk in old_lvs: 2401 disk.logical_id = ren_fn(disk, temp_suffix) 2402 2403 # Now that the new lvs have the old name, we can add them to the device 2404 self.lu.LogInfo("Adding new mirror component on %s", 2405 self.cfg.GetNodeName(self.target_node_uuid)) 2406 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid, 2407 (dev, self.instance), 2408 (new_lvs, self.instance)) 2409 msg = result.fail_msg 2410 if msg: 2411 for new_lv in new_lvs: 2412 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid, 2413 (new_lv, self.instance)).fail_msg 2414 if msg2: 2415 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, 2416 hint=("cleanup manually the unused logical" 2417 "volumes")) 2418 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) 2419 2420 cstep = itertools.count(5) 2421 2422 if self.early_release: 2423 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2424 self._RemoveOldStorage(self.target_node_uuid, iv_names) 2425 # TODO: Check if releasing locks early still makes sense 2426 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2427 else: 2428 # Release all resource locks except those used by the instance 2429 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2430 keep=self.node_secondary_ip.keys()) 2431 2432 # Release all node locks while waiting for sync 2433 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2434 2435 # TODO: Can the instance lock be downgraded here? Take the optional disk 2436 # shutdown in the caller into consideration. 2437 2438 # Wait for sync 2439 # This can fail as the old devices are degraded and _WaitForSync 2440 # does a combined result over all disks, so we don't check its return value 2441 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2442 WaitForSync(self.lu, self.instance) 2443 2444 # Check all devices manually 2445 self._CheckDevices(self.instance.primary_node, iv_names) 2446 2447 # Step: remove old storage 2448 if not self.early_release: 2449 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2450 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2451
2452 - def _ExecDrbd8Secondary(self, feedback_fn):
2453 """Replace the secondary node for DRBD 8. 2454 2455 The algorithm for replace is quite complicated: 2456 - for all disks of the instance: 2457 - create new LVs on the new node with same names 2458 - shutdown the drbd device on the old secondary 2459 - disconnect the drbd network on the primary 2460 - create the drbd device on the new secondary 2461 - network attach the drbd on the primary, using an artifice: 2462 the drbd code for Attach() will connect to the network if it 2463 finds a device which is connected to the good local disks but 2464 not network enabled 2465 - wait for sync across all devices 2466 - remove all disks from the old secondary 2467 2468 Failures are not very well handled. 2469 2470 """ 2471 steps_total = 6 2472 2473 pnode = self.instance.primary_node 2474 2475 # Step: check device activation 2476 self.lu.LogStep(1, steps_total, "Check device existence") 2477 self._CheckDisksExistence([self.instance.primary_node]) 2478 self._CheckVolumeGroup([self.instance.primary_node]) 2479 2480 # Step: check other node consistency 2481 self.lu.LogStep(2, steps_total, "Check peer consistency") 2482 self._CheckDisksConsistency(self.instance.primary_node, True, True) 2483 2484 # Step: create new storage 2485 self.lu.LogStep(3, steps_total, "Allocate new storage") 2486 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) 2487 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, 2488 self.new_node_uuid) 2489 for idx, dev in enumerate(disks): 2490 self.lu.LogInfo("Adding new local storage on %s for disk/%d" % 2491 (self.cfg.GetNodeName(self.new_node_uuid), idx)) 2492 # we pass force_create=True to force LVM creation 2493 for new_lv in dev.children: 2494 try: 2495 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance, 2496 new_lv, True, GetInstanceInfoText(self.instance), 2497 False, excl_stor) 2498 except errors.DeviceCreationError, e: 2499 raise errors.OpExecError("Can't create block device: %s" % e.message) 2500 2501 # Step 4: dbrd minors and drbd setups changes 2502 # after this, we must manually remove the drbd minors on both the 2503 # error and the success paths 2504 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2505 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid 2506 for _ in self.instance.disks], 2507 self.instance.uuid) 2508 logging.debug("Allocated minors %r", minors) 2509 2510 iv_names = {} 2511 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)): 2512 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % 2513 (self.cfg.GetNodeName(self.new_node_uuid), idx)) 2514 # create new devices on new_node; note that we create two IDs: 2515 # one without port, so the drbd will be activated without 2516 # networking information on the new node at this stage, and one 2517 # with network, for the latter activation in step 4 2518 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id 2519 if self.instance.primary_node == o_node1: 2520 p_minor = o_minor1 2521 else: 2522 assert self.instance.primary_node == o_node2, "Three-node instance?" 2523 p_minor = o_minor2 2524 2525 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None, 2526 p_minor, new_minor, o_secret) 2527 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port, 2528 p_minor, new_minor, o_secret) 2529 2530 iv_names[idx] = (dev, dev.children, new_net_id) 2531 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, 2532 new_net_id) 2533 new_drbd = objects.Disk(dev_type=constants.DT_DRBD8, 2534 logical_id=new_alone_id, 2535 children=dev.children, 2536 size=dev.size, 2537 params={}) 2538 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd], 2539 self.cfg) 2540 try: 2541 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance, 2542 anno_new_drbd, 2543 GetInstanceInfoText(self.instance), False, 2544 excl_stor) 2545 except errors.GenericError: 2546 self.cfg.ReleaseDRBDMinors(self.instance.uuid) 2547 raise 2548 2549 # We have new devices, shutdown the drbd on the old secondary 2550 for idx, dev in enumerate(self.instance.disks): 2551 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx) 2552 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid, 2553 (dev, self.instance)).fail_msg 2554 if msg: 2555 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" 2556 "node: %s" % (idx, msg), 2557 hint=("Please cleanup this device manually as" 2558 " soon as possible")) 2559 2560 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") 2561 result = self.rpc.call_drbd_disconnect_net( 2562 [pnode], (self.instance.disks, self.instance))[pnode] 2563 2564 msg = result.fail_msg 2565 if msg: 2566 # detaches didn't succeed (unlikely) 2567 self.cfg.ReleaseDRBDMinors(self.instance.uuid) 2568 raise errors.OpExecError("Can't detach the disks from the network on" 2569 " old node: %s" % (msg,)) 2570 2571 # if we managed to detach at least one, we update all the disks of 2572 # the instance to point to the new secondary 2573 self.lu.LogInfo("Updating instance configuration") 2574 for dev, _, new_logical_id in iv_names.itervalues(): 2575 dev.logical_id = new_logical_id 2576 2577 self.cfg.Update(self.instance, feedback_fn) 2578 2579 # Release all node locks (the configuration has been updated) 2580 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2581 2582 # and now perform the drbd attach 2583 self.lu.LogInfo("Attaching primary drbds to new secondary" 2584 " (standalone => connected)") 2585 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, 2586 self.new_node_uuid], 2587 (self.instance.disks, self.instance), 2588 self.instance.name, 2589 False) 2590 for to_node, to_result in result.items(): 2591 msg = to_result.fail_msg 2592 if msg: 2593 raise errors.OpExecError( 2594 "Can't attach drbd disks on node %s: %s (please do a gnt-instance " 2595 "info %s to see the status of disks)" % 2596 (self.cfg.GetNodeName(to_node), msg, self.instance.name)) 2597 2598 cstep = itertools.count(5) 2599 2600 if self.early_release: 2601 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2602 self._RemoveOldStorage(self.target_node_uuid, iv_names) 2603 # TODO: Check if releasing locks early still makes sense 2604 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2605 else: 2606 # Release all resource locks except those used by the instance 2607 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2608 keep=self.node_secondary_ip.keys()) 2609 2610 # TODO: Can the instance lock be downgraded here? Take the optional disk 2611 # shutdown in the caller into consideration. 2612 2613 # Wait for sync 2614 # This can fail as the old devices are degraded and _WaitForSync 2615 # does a combined result over all disks, so we don't check its return value 2616 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2617 WaitForSync(self.lu, self.instance) 2618 2619 # Check all devices manually 2620 self._CheckDevices(self.instance.primary_node, iv_names) 2621 2622 # Step: remove old storage 2623 if not self.early_release: 2624 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2625 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2626