Package ganeti :: Package cmdlib :: Module instance_storage
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_storage

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Logical units dealing with storage of instances.""" 
  23   
  24  import itertools 
  25  import logging 
  26  import os 
  27  import time 
  28   
  29  from ganeti import compat 
  30  from ganeti import constants 
  31  from ganeti import errors 
  32  from ganeti import ht 
  33  from ganeti import locking 
  34  from ganeti.masterd import iallocator 
  35  from ganeti import objects 
  36  from ganeti import utils 
  37  from ganeti import rpc 
  38  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet 
  39  from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \ 
  40    AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \ 
  41    CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \ 
  42    IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \ 
  43    CheckDiskTemplateEnabled 
  44  from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \ 
  45    CopyLockList, ReleaseLocks, CheckNodeVmCapable, \ 
  46    BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy 
  47   
  48  import ganeti.masterd.instance 
  49   
  50   
  51  _DISK_TEMPLATE_NAME_PREFIX = { 
  52    constants.DT_PLAIN: "", 
  53    constants.DT_RBD: ".rbd", 
  54    constants.DT_EXT: ".ext", 
  55    constants.DT_FILE: ".file", 
  56    constants.DT_SHARED_FILE: ".sharedfile", 
  57    } 
58 59 60 -def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open, 61 excl_stor):
62 """Create a single block device on a given node. 63 64 This will not recurse over children of the device, so they must be 65 created in advance. 66 67 @param lu: the lu on whose behalf we execute 68 @param node_uuid: the node on which to create the device 69 @type instance: L{objects.Instance} 70 @param instance: the instance which owns the device 71 @type device: L{objects.Disk} 72 @param device: the device to create 73 @param info: the extra 'metadata' we should attach to the device 74 (this will be represented as a LVM tag) 75 @type force_open: boolean 76 @param force_open: this parameter will be passes to the 77 L{backend.BlockdevCreate} function where it specifies 78 whether we run on primary or not, and it affects both 79 the child assembly and the device own Open() execution 80 @type excl_stor: boolean 81 @param excl_stor: Whether exclusive_storage is active for the node 82 83 """ 84 lu.cfg.SetDiskID(device, node_uuid) 85 result = lu.rpc.call_blockdev_create(node_uuid, device, device.size, 86 instance.name, force_open, info, 87 excl_stor) 88 result.Raise("Can't create block device %s on" 89 " node %s for instance %s" % (device, 90 lu.cfg.GetNodeName(node_uuid), 91 instance.name)) 92 if device.physical_id is None: 93 device.physical_id = result.payload
94
95 96 -def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create, 97 info, force_open, excl_stor):
98 """Create a tree of block devices on a given node. 99 100 If this device type has to be created on secondaries, create it and 101 all its children. 102 103 If not, just recurse to children keeping the same 'force' value. 104 105 @attention: The device has to be annotated already. 106 107 @param lu: the lu on whose behalf we execute 108 @param node_uuid: the node on which to create the device 109 @type instance: L{objects.Instance} 110 @param instance: the instance which owns the device 111 @type device: L{objects.Disk} 112 @param device: the device to create 113 @type force_create: boolean 114 @param force_create: whether to force creation of this device; this 115 will be change to True whenever we find a device which has 116 CreateOnSecondary() attribute 117 @param info: the extra 'metadata' we should attach to the device 118 (this will be represented as a LVM tag) 119 @type force_open: boolean 120 @param force_open: this parameter will be passes to the 121 L{backend.BlockdevCreate} function where it specifies 122 whether we run on primary or not, and it affects both 123 the child assembly and the device own Open() execution 124 @type excl_stor: boolean 125 @param excl_stor: Whether exclusive_storage is active for the node 126 127 @return: list of created devices 128 """ 129 created_devices = [] 130 try: 131 if device.CreateOnSecondary(): 132 force_create = True 133 134 if device.children: 135 for child in device.children: 136 devs = _CreateBlockDevInner(lu, node_uuid, instance, child, 137 force_create, info, force_open, excl_stor) 138 created_devices.extend(devs) 139 140 if not force_create: 141 return created_devices 142 143 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open, 144 excl_stor) 145 # The device has been completely created, so there is no point in keeping 146 # its subdevices in the list. We just add the device itself instead. 147 created_devices = [(node_uuid, device)] 148 return created_devices 149 150 except errors.DeviceCreationError, e: 151 e.created_devices.extend(created_devices) 152 raise e 153 except errors.OpExecError, e: 154 raise errors.DeviceCreationError(str(e), created_devices)
155
156 157 -def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
158 """Whether exclusive_storage is in effect for the given node. 159 160 @type cfg: L{config.ConfigWriter} 161 @param cfg: The cluster configuration 162 @type node_uuid: string 163 @param node_uuid: The node UUID 164 @rtype: bool 165 @return: The effective value of exclusive_storage 166 @raise errors.OpPrereqError: if no node exists with the given name 167 168 """ 169 ni = cfg.GetNodeInfo(node_uuid) 170 if ni is None: 171 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid, 172 errors.ECODE_NOENT) 173 return IsExclusiveStorageEnabledNode(cfg, ni)
174
175 176 -def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info, 177 force_open):
178 """Wrapper around L{_CreateBlockDevInner}. 179 180 This method annotates the root device first. 181 182 """ 183 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg) 184 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid) 185 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info, 186 force_open, excl_stor)
187
188 189 -def _UndoCreateDisks(lu, disks_created):
190 """Undo the work performed by L{CreateDisks}. 191 192 This function is called in case of an error to undo the work of 193 L{CreateDisks}. 194 195 @type lu: L{LogicalUnit} 196 @param lu: the logical unit on whose behalf we execute 197 @param disks_created: the result returned by L{CreateDisks} 198 199 """ 200 for (node_uuid, disk) in disks_created: 201 lu.cfg.SetDiskID(disk, node_uuid) 202 result = lu.rpc.call_blockdev_remove(node_uuid, disk) 203 result.Warn("Failed to remove newly-created disk %s on node %s" % 204 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
205
206 207 -def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
208 """Create all disks for an instance. 209 210 This abstracts away some work from AddInstance. 211 212 @type lu: L{LogicalUnit} 213 @param lu: the logical unit on whose behalf we execute 214 @type instance: L{objects.Instance} 215 @param instance: the instance whose disks we should create 216 @type to_skip: list 217 @param to_skip: list of indices to skip 218 @type target_node_uuid: string 219 @param target_node_uuid: if passed, overrides the target node for creation 220 @type disks: list of {objects.Disk} 221 @param disks: the disks to create; if not specified, all the disks of the 222 instance are created 223 @return: information about the created disks, to be used to call 224 L{_UndoCreateDisks} 225 @raise errors.OpPrereqError: in case of error 226 227 """ 228 info = GetInstanceInfoText(instance) 229 if target_node_uuid is None: 230 pnode_uuid = instance.primary_node 231 all_node_uuids = instance.all_nodes 232 else: 233 pnode_uuid = target_node_uuid 234 all_node_uuids = [pnode_uuid] 235 236 if disks is None: 237 disks = instance.disks 238 239 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template) 240 241 if instance.disk_template in constants.DTS_FILEBASED: 242 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) 243 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir) 244 245 result.Raise("Failed to create directory '%s' on" 246 " node %s" % (file_storage_dir, 247 lu.cfg.GetNodeName(pnode_uuid))) 248 249 disks_created = [] 250 for idx, device in enumerate(disks): 251 if to_skip and idx in to_skip: 252 continue 253 logging.info("Creating disk %s for instance '%s'", idx, instance.name) 254 for node_uuid in all_node_uuids: 255 f_create = node_uuid == pnode_uuid 256 try: 257 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info, 258 f_create) 259 disks_created.append((node_uuid, device)) 260 except errors.DeviceCreationError, e: 261 logging.warning("Creating disk %s for instance '%s' failed", 262 idx, instance.name) 263 disks_created.extend(e.created_devices) 264 _UndoCreateDisks(lu, disks_created) 265 raise errors.OpExecError(e.message) 266 return disks_created
267
268 269 -def ComputeDiskSizePerVG(disk_template, disks):
270 """Compute disk size requirements in the volume group 271 272 """ 273 def _compute(disks, payload): 274 """Universal algorithm. 275 276 """ 277 vgs = {} 278 for disk in disks: 279 vgs[disk[constants.IDISK_VG]] = \ 280 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload 281 282 return vgs
283 284 # Required free disk space as a function of disk and swap space 285 req_size_dict = { 286 constants.DT_DISKLESS: {}, 287 constants.DT_PLAIN: _compute(disks, 0), 288 # 128 MB are added for drbd metadata for each disk 289 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE), 290 constants.DT_FILE: {}, 291 constants.DT_SHARED_FILE: {}, 292 } 293 294 if disk_template not in req_size_dict: 295 raise errors.ProgrammerError("Disk template '%s' size requirement" 296 " is unknown" % disk_template) 297 298 return req_size_dict[disk_template] 299
300 301 -def ComputeDisks(op, default_vg):
302 """Computes the instance disks. 303 304 @param op: The instance opcode 305 @param default_vg: The default_vg to assume 306 307 @return: The computed disks 308 309 """ 310 disks = [] 311 for disk in op.disks: 312 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR) 313 if mode not in constants.DISK_ACCESS_SET: 314 raise errors.OpPrereqError("Invalid disk access mode '%s'" % 315 mode, errors.ECODE_INVAL) 316 size = disk.get(constants.IDISK_SIZE, None) 317 if size is None: 318 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) 319 try: 320 size = int(size) 321 except (TypeError, ValueError): 322 raise errors.OpPrereqError("Invalid disk size '%s'" % size, 323 errors.ECODE_INVAL) 324 325 ext_provider = disk.get(constants.IDISK_PROVIDER, None) 326 if ext_provider and op.disk_template != constants.DT_EXT: 327 raise errors.OpPrereqError("The '%s' option is only valid for the %s" 328 " disk template, not %s" % 329 (constants.IDISK_PROVIDER, constants.DT_EXT, 330 op.disk_template), errors.ECODE_INVAL) 331 332 data_vg = disk.get(constants.IDISK_VG, default_vg) 333 name = disk.get(constants.IDISK_NAME, None) 334 if name is not None and name.lower() == constants.VALUE_NONE: 335 name = None 336 new_disk = { 337 constants.IDISK_SIZE: size, 338 constants.IDISK_MODE: mode, 339 constants.IDISK_VG: data_vg, 340 constants.IDISK_NAME: name, 341 } 342 343 for key in [ 344 constants.IDISK_METAVG, 345 constants.IDISK_ADOPT, 346 constants.IDISK_SPINDLES, 347 ]: 348 if key in disk: 349 new_disk[key] = disk[key] 350 351 # For extstorage, demand the `provider' option and add any 352 # additional parameters (ext-params) to the dict 353 if op.disk_template == constants.DT_EXT: 354 if ext_provider: 355 new_disk[constants.IDISK_PROVIDER] = ext_provider 356 for key in disk: 357 if key not in constants.IDISK_PARAMS: 358 new_disk[key] = disk[key] 359 else: 360 raise errors.OpPrereqError("Missing provider for template '%s'" % 361 constants.DT_EXT, errors.ECODE_INVAL) 362 363 disks.append(new_disk) 364 365 return disks
366
367 368 -def CheckRADOSFreeSpace():
369 """Compute disk size requirements inside the RADOS cluster. 370 371 """ 372 # For the RADOS cluster we assume there is always enough space. 373 pass
374
375 376 -def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names, 377 iv_name, p_minor, s_minor):
378 """Generate a drbd8 device complete with its children. 379 380 """ 381 assert len(vgnames) == len(names) == 2 382 port = lu.cfg.AllocatePort() 383 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) 384 385 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size, 386 logical_id=(vgnames[0], names[0]), 387 params={}) 388 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 389 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN, 390 size=constants.DRBD_META_SIZE, 391 logical_id=(vgnames[1], names[1]), 392 params={}) 393 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 394 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size, 395 logical_id=(primary_uuid, secondary_uuid, port, 396 p_minor, s_minor, 397 shared_secret), 398 children=[dev_data, dev_meta], 399 iv_name=iv_name, params={}) 400 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 401 return drbd_dev
402
403 404 -def GenerateDiskTemplate( 405 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids, 406 disk_info, file_storage_dir, file_driver, base_index, 407 feedback_fn, full_disk_params):
408 """Generate the entire disk layout for a given template type. 409 410 """ 411 vgname = lu.cfg.GetVGName() 412 disk_count = len(disk_info) 413 disks = [] 414 415 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name) 416 417 if template_name == constants.DT_DISKLESS: 418 pass 419 elif template_name == constants.DT_DRBD8: 420 if len(secondary_node_uuids) != 1: 421 raise errors.ProgrammerError("Wrong template configuration") 422 remote_node_uuid = secondary_node_uuids[0] 423 minors = lu.cfg.AllocateDRBDMinor( 424 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid) 425 426 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name, 427 full_disk_params) 428 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] 429 430 names = [] 431 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) 432 for i in range(disk_count)]): 433 names.append(lv_prefix + "_data") 434 names.append(lv_prefix + "_meta") 435 for idx, disk in enumerate(disk_info): 436 disk_index = idx + base_index 437 data_vg = disk.get(constants.IDISK_VG, vgname) 438 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg) 439 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid, 440 disk[constants.IDISK_SIZE], 441 [data_vg, meta_vg], 442 names[idx * 2:idx * 2 + 2], 443 "disk/%d" % disk_index, 444 minors[idx * 2], minors[idx * 2 + 1]) 445 disk_dev.mode = disk[constants.IDISK_MODE] 446 disk_dev.name = disk.get(constants.IDISK_NAME, None) 447 disks.append(disk_dev) 448 else: 449 if secondary_node_uuids: 450 raise errors.ProgrammerError("Wrong template configuration") 451 452 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None) 453 if name_prefix is None: 454 names = None 455 else: 456 names = _GenerateUniqueNames(lu, ["%s.disk%s" % 457 (name_prefix, base_index + i) 458 for i in range(disk_count)]) 459 460 if template_name == constants.DT_PLAIN: 461 462 def logical_id_fn(idx, _, disk): 463 vg = disk.get(constants.IDISK_VG, vgname) 464 return (vg, names[idx])
465 466 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE): 467 logical_id_fn = \ 468 lambda _, disk_index, disk: (file_driver, 469 "%s/%s" % (file_storage_dir, 470 names[idx])) 471 elif template_name == constants.DT_BLOCK: 472 logical_id_fn = \ 473 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL, 474 disk[constants.IDISK_ADOPT]) 475 elif template_name == constants.DT_RBD: 476 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx]) 477 elif template_name == constants.DT_EXT: 478 def logical_id_fn(idx, _, disk): 479 provider = disk.get(constants.IDISK_PROVIDER, None) 480 if provider is None: 481 raise errors.ProgrammerError("Disk template is %s, but '%s' is" 482 " not found", constants.DT_EXT, 483 constants.IDISK_PROVIDER) 484 return (provider, names[idx]) 485 else: 486 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) 487 488 dev_type = template_name 489 490 for idx, disk in enumerate(disk_info): 491 params = {} 492 # Only for the Ext template add disk_info to params 493 if template_name == constants.DT_EXT: 494 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER] 495 for key in disk: 496 if key not in constants.IDISK_PARAMS: 497 params[key] = disk[key] 498 disk_index = idx + base_index 499 size = disk[constants.IDISK_SIZE] 500 feedback_fn("* disk %s, size %s" % 501 (disk_index, utils.FormatUnit(size, "h"))) 502 disk_dev = objects.Disk(dev_type=dev_type, size=size, 503 logical_id=logical_id_fn(idx, disk_index, disk), 504 iv_name="disk/%d" % disk_index, 505 mode=disk[constants.IDISK_MODE], 506 params=params, 507 spindles=disk.get(constants.IDISK_SPINDLES)) 508 disk_dev.name = disk.get(constants.IDISK_NAME, None) 509 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 510 disks.append(disk_dev) 511 512 return disks 513
514 515 -def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
516 """Check the presence of the spindle options with exclusive_storage. 517 518 @type diskdict: dict 519 @param diskdict: disk parameters 520 @type es_flag: bool 521 @param es_flag: the effective value of the exlusive_storage flag 522 @type required: bool 523 @param required: whether spindles are required or just optional 524 @raise errors.OpPrereqError when spindles are given and they should not 525 526 """ 527 if (not es_flag and constants.IDISK_SPINDLES in diskdict and 528 diskdict[constants.IDISK_SPINDLES] is not None): 529 raise errors.OpPrereqError("Spindles in instance disks cannot be specified" 530 " when exclusive storage is not active", 531 errors.ECODE_INVAL) 532 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or 533 diskdict[constants.IDISK_SPINDLES] is None)): 534 raise errors.OpPrereqError("You must specify spindles in instance disks" 535 " when exclusive storage is active", 536 errors.ECODE_INVAL)
537
538 539 -class LUInstanceRecreateDisks(LogicalUnit):
540 """Recreate an instance's missing disks. 541 542 """ 543 HPATH = "instance-recreate-disks" 544 HTYPE = constants.HTYPE_INSTANCE 545 REQ_BGL = False 546 547 _MODIFYABLE = compat.UniqueFrozenset([ 548 constants.IDISK_SIZE, 549 constants.IDISK_MODE, 550 constants.IDISK_SPINDLES, 551 ]) 552 553 # New or changed disk parameters may have different semantics 554 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([ 555 constants.IDISK_ADOPT, 556 557 # TODO: Implement support changing VG while recreating 558 constants.IDISK_VG, 559 constants.IDISK_METAVG, 560 constants.IDISK_PROVIDER, 561 constants.IDISK_NAME, 562 ])) 563
564 - def _RunAllocator(self):
565 """Run the allocator based on input opcode. 566 567 """ 568 be_full = self.cfg.GetClusterInfo().FillBE(self.instance) 569 570 # FIXME 571 # The allocator should actually run in "relocate" mode, but current 572 # allocators don't support relocating all the nodes of an instance at 573 # the same time. As a workaround we use "allocate" mode, but this is 574 # suboptimal for two reasons: 575 # - The instance name passed to the allocator is present in the list of 576 # existing instances, so there could be a conflict within the 577 # internal structures of the allocator. This doesn't happen with the 578 # current allocators, but it's a liability. 579 # - The allocator counts the resources used by the instance twice: once 580 # because the instance exists already, and once because it tries to 581 # allocate a new instance. 582 # The allocator could choose some of the nodes on which the instance is 583 # running, but that's not a problem. If the instance nodes are broken, 584 # they should be already be marked as drained or offline, and hence 585 # skipped by the allocator. If instance disks have been lost for other 586 # reasons, then recreating the disks on the same nodes should be fine. 587 disk_template = self.instance.disk_template 588 spindle_use = be_full[constants.BE_SPINDLE_USE] 589 disks = [{ 590 constants.IDISK_SIZE: d.size, 591 constants.IDISK_MODE: d.mode, 592 constants.IDISK_SPINDLES: d.spindles, 593 } for d in self.instance.disks] 594 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name, 595 disk_template=disk_template, 596 tags=list(self.instance.GetTags()), 597 os=self.instance.os, 598 nics=[{}], 599 vcpus=be_full[constants.BE_VCPUS], 600 memory=be_full[constants.BE_MAXMEM], 601 spindle_use=spindle_use, 602 disks=disks, 603 hypervisor=self.instance.hypervisor, 604 node_whitelist=None) 605 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 606 607 ial.Run(self.op.iallocator) 608 609 assert req.RequiredNodes() == len(self.instance.all_nodes) 610 611 if not ial.success: 612 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 613 " %s" % (self.op.iallocator, ial.info), 614 errors.ECODE_NORES) 615 616 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result) 617 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 618 self.op.instance_name, self.op.iallocator, 619 utils.CommaJoin(self.op.nodes))
620
621 - def CheckArguments(self):
622 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]): 623 # Normalize and convert deprecated list of disk indices 624 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] 625 626 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks)) 627 if duplicates: 628 raise errors.OpPrereqError("Some disks have been specified more than" 629 " once: %s" % utils.CommaJoin(duplicates), 630 errors.ECODE_INVAL) 631 632 # We don't want _CheckIAllocatorOrNode selecting the default iallocator 633 # when neither iallocator nor nodes are specified 634 if self.op.iallocator or self.op.nodes: 635 CheckIAllocatorOrNode(self, "iallocator", "nodes") 636 637 for (idx, params) in self.op.disks: 638 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) 639 unsupported = frozenset(params.keys()) - self._MODIFYABLE 640 if unsupported: 641 raise errors.OpPrereqError("Parameters for disk %s try to change" 642 " unmodifyable parameter(s): %s" % 643 (idx, utils.CommaJoin(unsupported)), 644 errors.ECODE_INVAL)
645
646 - def ExpandNames(self):
647 self._ExpandAndLockInstance() 648 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 649 650 if self.op.nodes: 651 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes) 652 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids) 653 else: 654 self.needed_locks[locking.LEVEL_NODE] = [] 655 if self.op.iallocator: 656 # iallocator will select a new node in the same group 657 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 658 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 659 660 self.needed_locks[locking.LEVEL_NODE_RES] = []
661
662 - def DeclareLocks(self, level):
663 if level == locking.LEVEL_NODEGROUP: 664 assert self.op.iallocator is not None 665 assert not self.op.nodes 666 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 667 self.share_locks[locking.LEVEL_NODEGROUP] = 1 668 # Lock the primary group used by the instance optimistically; this 669 # requires going via the node before it's locked, requiring 670 # verification later on 671 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 672 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True) 673 674 elif level == locking.LEVEL_NODE: 675 # If an allocator is used, then we lock all the nodes in the current 676 # instance group, as we don't know yet which ones will be selected; 677 # if we replace the nodes without using an allocator, locks are 678 # already declared in ExpandNames; otherwise, we need to lock all the 679 # instance nodes for disk re-creation 680 if self.op.iallocator: 681 assert not self.op.nodes 682 assert not self.needed_locks[locking.LEVEL_NODE] 683 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1 684 685 # Lock member nodes of the group of the primary node 686 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP): 687 self.needed_locks[locking.LEVEL_NODE].extend( 688 self.cfg.GetNodeGroup(group_uuid).members) 689 690 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) 691 elif not self.op.nodes: 692 self._LockInstancesNodes(primary_only=False) 693 elif level == locking.LEVEL_NODE_RES: 694 # Copy node locks 695 self.needed_locks[locking.LEVEL_NODE_RES] = \ 696 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
697
698 - def BuildHooksEnv(self):
699 """Build hooks env. 700 701 This runs on master, primary and secondary nodes of the instance. 702 703 """ 704 return BuildInstanceHookEnvByObject(self, self.instance)
705
706 - def BuildHooksNodes(self):
707 """Build hooks nodes. 708 709 """ 710 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) 711 return (nl, nl)
712
713 - def CheckPrereq(self):
714 """Check prerequisites. 715 716 This checks that the instance is in the cluster and is not running. 717 718 """ 719 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 720 assert instance is not None, \ 721 "Cannot retrieve locked instance %s" % self.op.instance_name 722 if self.op.node_uuids: 723 if len(self.op.node_uuids) != len(instance.all_nodes): 724 raise errors.OpPrereqError("Instance %s currently has %d nodes, but" 725 " %d replacement nodes were specified" % 726 (instance.name, len(instance.all_nodes), 727 len(self.op.node_uuids)), 728 errors.ECODE_INVAL) 729 assert instance.disk_template != constants.DT_DRBD8 or \ 730 len(self.op.node_uuids) == 2 731 assert instance.disk_template != constants.DT_PLAIN or \ 732 len(self.op.node_uuids) == 1 733 primary_node = self.op.node_uuids[0] 734 else: 735 primary_node = instance.primary_node 736 if not self.op.iallocator: 737 CheckNodeOnline(self, primary_node) 738 739 if instance.disk_template == constants.DT_DISKLESS: 740 raise errors.OpPrereqError("Instance '%s' has no disks" % 741 self.op.instance_name, errors.ECODE_INVAL) 742 743 # Verify if node group locks are still correct 744 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 745 if owned_groups: 746 # Node group locks are acquired only for the primary node (and only 747 # when the allocator is used) 748 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups, 749 primary_only=True) 750 751 # if we replace nodes *and* the old primary is offline, we don't 752 # check the instance state 753 old_pnode = self.cfg.GetNodeInfo(instance.primary_node) 754 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline): 755 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, 756 msg="cannot recreate disks") 757 758 if self.op.disks: 759 self.disks = dict(self.op.disks) 760 else: 761 self.disks = dict((idx, {}) for idx in range(len(instance.disks))) 762 763 maxidx = max(self.disks.keys()) 764 if maxidx >= len(instance.disks): 765 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, 766 errors.ECODE_INVAL) 767 768 if ((self.op.node_uuids or self.op.iallocator) and 769 sorted(self.disks.keys()) != range(len(instance.disks))): 770 raise errors.OpPrereqError("Can't recreate disks partially and" 771 " change the nodes at the same time", 772 errors.ECODE_INVAL) 773 774 self.instance = instance 775 776 if self.op.iallocator: 777 self._RunAllocator() 778 # Release unneeded node and node resource locks 779 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids) 780 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids) 781 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) 782 783 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) 784 785 if self.op.node_uuids: 786 node_uuids = self.op.node_uuids 787 else: 788 node_uuids = instance.all_nodes 789 excl_stor = compat.any( 790 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values() 791 ) 792 for new_params in self.disks.values(): 793 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
794
795 - def Exec(self, feedback_fn):
796 """Recreate the disks. 797 798 """ 799 assert (self.owned_locks(locking.LEVEL_NODE) == 800 self.owned_locks(locking.LEVEL_NODE_RES)) 801 802 to_skip = [] 803 mods = [] # keeps track of needed changes 804 805 for idx, disk in enumerate(self.instance.disks): 806 try: 807 changes = self.disks[idx] 808 except KeyError: 809 # Disk should not be recreated 810 to_skip.append(idx) 811 continue 812 813 # update secondaries for disks, if needed 814 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8: 815 # need to update the nodes and minors 816 assert len(self.op.node_uuids) == 2 817 assert len(disk.logical_id) == 6 # otherwise disk internals 818 # have changed 819 (_, _, old_port, _, _, old_secret) = disk.logical_id 820 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids, 821 self.instance.uuid) 822 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port, 823 new_minors[0], new_minors[1], old_secret) 824 assert len(disk.logical_id) == len(new_id) 825 else: 826 new_id = None 827 828 mods.append((idx, new_id, changes)) 829 830 # now that we have passed all asserts above, we can apply the mods 831 # in a single run (to avoid partial changes) 832 for idx, new_id, changes in mods: 833 disk = self.instance.disks[idx] 834 if new_id is not None: 835 assert disk.dev_type == constants.DT_DRBD8 836 disk.logical_id = new_id 837 if changes: 838 disk.Update(size=changes.get(constants.IDISK_SIZE, None), 839 mode=changes.get(constants.IDISK_MODE, None), 840 spindles=changes.get(constants.IDISK_SPINDLES, None)) 841 842 # change primary node, if needed 843 if self.op.node_uuids: 844 self.instance.primary_node = self.op.node_uuids[0] 845 self.LogWarning("Changing the instance's nodes, you will have to" 846 " remove any disks left on the older nodes manually") 847 848 if self.op.node_uuids: 849 self.cfg.Update(self.instance, feedback_fn) 850 851 # All touched nodes must be locked 852 mylocks = self.owned_locks(locking.LEVEL_NODE) 853 assert mylocks.issuperset(frozenset(self.instance.all_nodes)) 854 new_disks = CreateDisks(self, self.instance, to_skip=to_skip) 855 856 # TODO: Release node locks before wiping, or explain why it's not possible 857 if self.cfg.GetClusterInfo().prealloc_wipe_disks: 858 wipedisks = [(idx, disk, 0) 859 for (idx, disk) in enumerate(self.instance.disks) 860 if idx not in to_skip] 861 WipeOrCleanupDisks(self, self.instance, disks=wipedisks, 862 cleanup=new_disks)
863
864 865 -def _PerformNodeInfoCall(lu, node_uuids, vg):
866 """Prepares the input and performs a node info call. 867 868 @type lu: C{LogicalUnit} 869 @param lu: a logical unit from which we get configuration data 870 @type node_uuids: list of string 871 @param node_uuids: list of node UUIDs to perform the call for 872 @type vg: string 873 @param vg: the volume group's name 874 875 """ 876 lvm_storage_units = [(constants.ST_LVM_VG, vg)] 877 storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units, 878 node_uuids) 879 hvname = lu.cfg.GetHypervisorType() 880 hvparams = lu.cfg.GetClusterInfo().hvparams 881 nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units, 882 [(hvname, hvparams[hvname])]) 883 return nodeinfo
884
885 886 -def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
887 """Checks the vg capacity for a given node. 888 889 @type node_info: tuple (_, list of dicts, _) 890 @param node_info: the result of the node info call for one node 891 @type node_name: string 892 @param node_name: the name of the node 893 @type vg: string 894 @param vg: volume group name 895 @type requested: int 896 @param requested: the amount of disk in MiB to check for 897 @raise errors.OpPrereqError: if the node doesn't have enough disk, 898 or we cannot check the node 899 900 """ 901 (_, space_info, _) = node_info 902 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType( 903 space_info, constants.ST_LVM_VG) 904 if not lvm_vg_info: 905 raise errors.OpPrereqError("Can't retrieve storage information for LVM") 906 vg_free = lvm_vg_info.get("storage_free", None) 907 if not isinstance(vg_free, int): 908 raise errors.OpPrereqError("Can't compute free disk space on node" 909 " %s for vg %s, result was '%s'" % 910 (node_name, vg, vg_free), errors.ECODE_ENVIRON) 911 if requested > vg_free: 912 raise errors.OpPrereqError("Not enough disk space on target node %s" 913 " vg %s: required %d MiB, available %d MiB" % 914 (node_name, vg, requested, vg_free), 915 errors.ECODE_NORES)
916
917 918 -def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
919 """Checks if nodes have enough free disk space in the specified VG. 920 921 This function checks if all given nodes have the needed amount of 922 free disk. In case any node has less disk or we cannot get the 923 information from the node, this function raises an OpPrereqError 924 exception. 925 926 @type lu: C{LogicalUnit} 927 @param lu: a logical unit from which we get configuration data 928 @type node_uuids: C{list} 929 @param node_uuids: the list of node UUIDs to check 930 @type vg: C{str} 931 @param vg: the volume group to check 932 @type requested: C{int} 933 @param requested: the amount of disk in MiB to check for 934 @raise errors.OpPrereqError: if the node doesn't have enough disk, 935 or we cannot check the node 936 937 """ 938 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg) 939 for node in node_uuids: 940 node_name = lu.cfg.GetNodeName(node) 941 info = nodeinfo[node] 942 info.Raise("Cannot get current information from node %s" % node_name, 943 prereq=True, ecode=errors.ECODE_ENVIRON) 944 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
945
946 947 -def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
948 """Checks if nodes have enough free disk space in all the VGs. 949 950 This function checks if all given nodes have the needed amount of 951 free disk. In case any node has less disk or we cannot get the 952 information from the node, this function raises an OpPrereqError 953 exception. 954 955 @type lu: C{LogicalUnit} 956 @param lu: a logical unit from which we get configuration data 957 @type node_uuids: C{list} 958 @param node_uuids: the list of node UUIDs to check 959 @type req_sizes: C{dict} 960 @param req_sizes: the hash of vg and corresponding amount of disk in 961 MiB to check for 962 @raise errors.OpPrereqError: if the node doesn't have enough disk, 963 or we cannot check the node 964 965 """ 966 for vg, req_size in req_sizes.items(): 967 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
968
969 970 -def _DiskSizeInBytesToMebibytes(lu, size):
971 """Converts a disk size in bytes to mebibytes. 972 973 Warns and rounds up if the size isn't an even multiple of 1 MiB. 974 975 """ 976 (mib, remainder) = divmod(size, 1024 * 1024) 977 978 if remainder != 0: 979 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up" 980 " to not overwrite existing data (%s bytes will not be" 981 " wiped)", (1024 * 1024) - remainder) 982 mib += 1 983 984 return mib
985
986 987 -def _CalcEta(time_taken, written, total_size):
988 """Calculates the ETA based on size written and total size. 989 990 @param time_taken: The time taken so far 991 @param written: amount written so far 992 @param total_size: The total size of data to be written 993 @return: The remaining time in seconds 994 995 """ 996 avg_time = time_taken / float(written) 997 return (total_size - written) * avg_time
998
999 1000 -def WipeDisks(lu, instance, disks=None):
1001 """Wipes instance disks. 1002 1003 @type lu: L{LogicalUnit} 1004 @param lu: the logical unit on whose behalf we execute 1005 @type instance: L{objects.Instance} 1006 @param instance: the instance whose disks we should create 1007 @type disks: None or list of tuple of (number, L{objects.Disk}, number) 1008 @param disks: Disk details; tuple contains disk index, disk object and the 1009 start offset 1010 1011 """ 1012 node_uuid = instance.primary_node 1013 node_name = lu.cfg.GetNodeName(node_uuid) 1014 1015 if disks is None: 1016 disks = [(idx, disk, 0) 1017 for (idx, disk) in enumerate(instance.disks)] 1018 1019 for (_, device, _) in disks: 1020 lu.cfg.SetDiskID(device, node_uuid) 1021 1022 logging.info("Pausing synchronization of disks of instance '%s'", 1023 instance.name) 1024 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1025 (map(compat.snd, disks), 1026 instance), 1027 True) 1028 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name) 1029 1030 for idx, success in enumerate(result.payload): 1031 if not success: 1032 logging.warn("Pausing synchronization of disk %s of instance '%s'" 1033 " failed", idx, instance.name) 1034 1035 try: 1036 for (idx, device, offset) in disks: 1037 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but 1038 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors. 1039 wipe_chunk_size = \ 1040 int(min(constants.MAX_WIPE_CHUNK, 1041 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT)) 1042 1043 size = device.size 1044 last_output = 0 1045 start_time = time.time() 1046 1047 if offset == 0: 1048 info_text = "" 1049 else: 1050 info_text = (" (from %s to %s)" % 1051 (utils.FormatUnit(offset, "h"), 1052 utils.FormatUnit(size, "h"))) 1053 1054 lu.LogInfo("* Wiping disk %s%s", idx, info_text) 1055 1056 logging.info("Wiping disk %d for instance %s on node %s using" 1057 " chunk size %s", idx, instance.name, node_name, 1058 wipe_chunk_size) 1059 1060 while offset < size: 1061 wipe_size = min(wipe_chunk_size, size - offset) 1062 1063 logging.debug("Wiping disk %d, offset %s, chunk %s", 1064 idx, offset, wipe_size) 1065 1066 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance), 1067 offset, wipe_size) 1068 result.Raise("Could not wipe disk %d at offset %d for size %d" % 1069 (idx, offset, wipe_size)) 1070 1071 now = time.time() 1072 offset += wipe_size 1073 if now - last_output >= 60: 1074 eta = _CalcEta(now - start_time, offset, size) 1075 lu.LogInfo(" - done: %.1f%% ETA: %s", 1076 offset / float(size) * 100, utils.FormatSeconds(eta)) 1077 last_output = now 1078 finally: 1079 logging.info("Resuming synchronization of disks for instance '%s'", 1080 instance.name) 1081 1082 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1083 (map(compat.snd, disks), 1084 instance), 1085 False) 1086 1087 if result.fail_msg: 1088 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s", 1089 node_name, result.fail_msg) 1090 else: 1091 for idx, success in enumerate(result.payload): 1092 if not success: 1093 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'" 1094 " failed", idx, instance.name)
1095
1096 1097 -def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1098 """Wrapper for L{WipeDisks} that handles errors. 1099 1100 @type lu: L{LogicalUnit} 1101 @param lu: the logical unit on whose behalf we execute 1102 @type instance: L{objects.Instance} 1103 @param instance: the instance whose disks we should wipe 1104 @param disks: see L{WipeDisks} 1105 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in 1106 case of error 1107 @raise errors.OpPrereqError: in case of failure 1108 1109 """ 1110 try: 1111 WipeDisks(lu, instance, disks=disks) 1112 except errors.OpExecError: 1113 logging.warning("Wiping disks for instance '%s' failed", 1114 instance.name) 1115 _UndoCreateDisks(lu, cleanup) 1116 raise
1117
1118 1119 -def ExpandCheckDisks(instance, disks):
1120 """Return the instance disks selected by the disks list 1121 1122 @type disks: list of L{objects.Disk} or None 1123 @param disks: selected disks 1124 @rtype: list of L{objects.Disk} 1125 @return: selected instance disks to act on 1126 1127 """ 1128 if disks is None: 1129 return instance.disks 1130 else: 1131 if not set(disks).issubset(instance.disks): 1132 raise errors.ProgrammerError("Can only act on disks belonging to the" 1133 " target instance: expected a subset of %r," 1134 " got %r" % (instance.disks, disks)) 1135 return disks
1136
1137 1138 -def WaitForSync(lu, instance, disks=None, oneshot=False):
1139 """Sleep and poll for an instance's disk to sync. 1140 1141 """ 1142 if not instance.disks or disks is not None and not disks: 1143 return True 1144 1145 disks = ExpandCheckDisks(instance, disks) 1146 1147 if not oneshot: 1148 lu.LogInfo("Waiting for instance %s to sync disks", instance.name) 1149 1150 node_uuid = instance.primary_node 1151 node_name = lu.cfg.GetNodeName(node_uuid) 1152 1153 for dev in disks: 1154 lu.cfg.SetDiskID(dev, node_uuid) 1155 1156 # TODO: Convert to utils.Retry 1157 1158 retries = 0 1159 degr_retries = 10 # in seconds, as we sleep 1 second each time 1160 while True: 1161 max_time = 0 1162 done = True 1163 cumul_degraded = False 1164 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance)) 1165 msg = rstats.fail_msg 1166 if msg: 1167 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg) 1168 retries += 1 1169 if retries >= 10: 1170 raise errors.RemoteError("Can't contact node %s for mirror data," 1171 " aborting." % node_name) 1172 time.sleep(6) 1173 continue 1174 rstats = rstats.payload 1175 retries = 0 1176 for i, mstat in enumerate(rstats): 1177 if mstat is None: 1178 lu.LogWarning("Can't compute data for node %s/%s", 1179 node_name, disks[i].iv_name) 1180 continue 1181 1182 cumul_degraded = (cumul_degraded or 1183 (mstat.is_degraded and mstat.sync_percent is None)) 1184 if mstat.sync_percent is not None: 1185 done = False 1186 if mstat.estimated_time is not None: 1187 rem_time = ("%s remaining (estimated)" % 1188 utils.FormatSeconds(mstat.estimated_time)) 1189 max_time = mstat.estimated_time 1190 else: 1191 rem_time = "no time estimate" 1192 lu.LogInfo("- device %s: %5.2f%% done, %s", 1193 disks[i].iv_name, mstat.sync_percent, rem_time) 1194 1195 # if we're done but degraded, let's do a few small retries, to 1196 # make sure we see a stable and not transient situation; therefore 1197 # we force restart of the loop 1198 if (done or oneshot) and cumul_degraded and degr_retries > 0: 1199 logging.info("Degraded disks found, %d retries left", degr_retries) 1200 degr_retries -= 1 1201 time.sleep(1) 1202 continue 1203 1204 if done or oneshot: 1205 break 1206 1207 time.sleep(min(60, max_time)) 1208 1209 if done: 1210 lu.LogInfo("Instance %s's disks are in sync", instance.name) 1211 1212 return not cumul_degraded
1213
1214 1215 -def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1216 """Shutdown block devices of an instance. 1217 1218 This does the shutdown on all nodes of the instance. 1219 1220 If the ignore_primary is false, errors on the primary node are 1221 ignored. 1222 1223 """ 1224 lu.cfg.MarkInstanceDisksInactive(instance.uuid) 1225 all_result = True 1226 disks = ExpandCheckDisks(instance, disks) 1227 1228 for disk in disks: 1229 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node): 1230 lu.cfg.SetDiskID(top_disk, node_uuid) 1231 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance)) 1232 msg = result.fail_msg 1233 if msg: 1234 lu.LogWarning("Could not shutdown block device %s on node %s: %s", 1235 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1236 if ((node_uuid == instance.primary_node and not ignore_primary) or 1237 (node_uuid != instance.primary_node and not result.offline)): 1238 all_result = False 1239 return all_result
1240
1241 1242 -def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1243 """Shutdown block devices of an instance. 1244 1245 This function checks if an instance is running, before calling 1246 _ShutdownInstanceDisks. 1247 1248 """ 1249 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks") 1250 ShutdownInstanceDisks(lu, instance, disks=disks)
1251
1252 1253 -def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, 1254 ignore_size=False):
1255 """Prepare the block devices for an instance. 1256 1257 This sets up the block devices on all nodes. 1258 1259 @type lu: L{LogicalUnit} 1260 @param lu: the logical unit on whose behalf we execute 1261 @type instance: L{objects.Instance} 1262 @param instance: the instance for whose disks we assemble 1263 @type disks: list of L{objects.Disk} or None 1264 @param disks: which disks to assemble (or all, if None) 1265 @type ignore_secondaries: boolean 1266 @param ignore_secondaries: if true, errors on secondary nodes 1267 won't result in an error return from the function 1268 @type ignore_size: boolean 1269 @param ignore_size: if true, the current known size of the disk 1270 will not be used during the disk activation, useful for cases 1271 when the size is wrong 1272 @return: False if the operation failed, otherwise a list of 1273 (host, instance_visible_name, node_visible_name) 1274 with the mapping from node devices to instance devices 1275 1276 """ 1277 device_info = [] 1278 disks_ok = True 1279 disks = ExpandCheckDisks(instance, disks) 1280 1281 # With the two passes mechanism we try to reduce the window of 1282 # opportunity for the race condition of switching DRBD to primary 1283 # before handshaking occured, but we do not eliminate it 1284 1285 # The proper fix would be to wait (with some limits) until the 1286 # connection has been made and drbd transitions from WFConnection 1287 # into any other network-connected state (Connected, SyncTarget, 1288 # SyncSource, etc.) 1289 1290 # mark instance disks as active before doing actual work, so watcher does 1291 # not try to shut them down erroneously 1292 lu.cfg.MarkInstanceDisksActive(instance.uuid) 1293 1294 # 1st pass, assemble on all nodes in secondary mode 1295 for idx, inst_disk in enumerate(disks): 1296 for node_uuid, node_disk in inst_disk.ComputeNodeTree( 1297 instance.primary_node): 1298 if ignore_size: 1299 node_disk = node_disk.Copy() 1300 node_disk.UnsetSize() 1301 lu.cfg.SetDiskID(node_disk, node_uuid) 1302 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance), 1303 instance.name, False, idx) 1304 msg = result.fail_msg 1305 if msg: 1306 is_offline_secondary = (node_uuid in instance.secondary_nodes and 1307 result.offline) 1308 lu.LogWarning("Could not prepare block device %s on node %s" 1309 " (is_primary=False, pass=1): %s", 1310 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1311 if not (ignore_secondaries or is_offline_secondary): 1312 disks_ok = False 1313 1314 # FIXME: race condition on drbd migration to primary 1315 1316 # 2nd pass, do only the primary node 1317 for idx, inst_disk in enumerate(disks): 1318 dev_path = None 1319 1320 for node_uuid, node_disk in inst_disk.ComputeNodeTree( 1321 instance.primary_node): 1322 if node_uuid != instance.primary_node: 1323 continue 1324 if ignore_size: 1325 node_disk = node_disk.Copy() 1326 node_disk.UnsetSize() 1327 lu.cfg.SetDiskID(node_disk, node_uuid) 1328 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance), 1329 instance.name, True, idx) 1330 msg = result.fail_msg 1331 if msg: 1332 lu.LogWarning("Could not prepare block device %s on node %s" 1333 " (is_primary=True, pass=2): %s", 1334 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1335 disks_ok = False 1336 else: 1337 dev_path = result.payload 1338 1339 device_info.append((lu.cfg.GetNodeName(instance.primary_node), 1340 inst_disk.iv_name, dev_path)) 1341 1342 # leave the disks configured for the primary node 1343 # this is a workaround that would be fixed better by 1344 # improving the logical/physical id handling 1345 for disk in disks: 1346 lu.cfg.SetDiskID(disk, instance.primary_node) 1347 1348 if not disks_ok: 1349 lu.cfg.MarkInstanceDisksInactive(instance.uuid) 1350 1351 return disks_ok, device_info
1352
1353 1354 -def StartInstanceDisks(lu, instance, force):
1355 """Start the disks of an instance. 1356 1357 """ 1358 disks_ok, _ = AssembleInstanceDisks(lu, instance, 1359 ignore_secondaries=force) 1360 if not disks_ok: 1361 ShutdownInstanceDisks(lu, instance) 1362 if force is not None and not force: 1363 lu.LogWarning("", 1364 hint=("If the message above refers to a secondary node," 1365 " you can retry the operation using '--force'")) 1366 raise errors.OpExecError("Disk consistency error")
1367
1368 1369 -class LUInstanceGrowDisk(LogicalUnit):
1370 """Grow a disk of an instance. 1371 1372 """ 1373 HPATH = "disk-grow" 1374 HTYPE = constants.HTYPE_INSTANCE 1375 REQ_BGL = False 1376
1377 - def ExpandNames(self):
1378 self._ExpandAndLockInstance() 1379 self.needed_locks[locking.LEVEL_NODE] = [] 1380 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1381 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1382 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1383
1384 - def DeclareLocks(self, level):
1385 if level == locking.LEVEL_NODE: 1386 self._LockInstancesNodes() 1387 elif level == locking.LEVEL_NODE_RES: 1388 # Copy node locks 1389 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1390 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1391
1392 - def BuildHooksEnv(self):
1393 """Build hooks env. 1394 1395 This runs on the master, the primary and all the secondaries. 1396 1397 """ 1398 env = { 1399 "DISK": self.op.disk, 1400 "AMOUNT": self.op.amount, 1401 "ABSOLUTE": self.op.absolute, 1402 } 1403 env.update(BuildInstanceHookEnvByObject(self, self.instance)) 1404 return env
1405
1406 - def BuildHooksNodes(self):
1407 """Build hooks nodes. 1408 1409 """ 1410 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) 1411 return (nl, nl)
1412
1413 - def CheckPrereq(self):
1414 """Check prerequisites. 1415 1416 This checks that the instance is in the cluster. 1417 1418 """ 1419 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1420 assert self.instance is not None, \ 1421 "Cannot retrieve locked instance %s" % self.op.instance_name 1422 node_uuids = list(self.instance.all_nodes) 1423 for node_uuid in node_uuids: 1424 CheckNodeOnline(self, node_uuid) 1425 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids) 1426 1427 if self.instance.disk_template not in constants.DTS_GROWABLE: 1428 raise errors.OpPrereqError("Instance's disk layout does not support" 1429 " growing", errors.ECODE_INVAL) 1430 1431 self.disk = self.instance.FindDisk(self.op.disk) 1432 1433 if self.op.absolute: 1434 self.target = self.op.amount 1435 self.delta = self.target - self.disk.size 1436 if self.delta < 0: 1437 raise errors.OpPrereqError("Requested size (%s) is smaller than " 1438 "current disk size (%s)" % 1439 (utils.FormatUnit(self.target, "h"), 1440 utils.FormatUnit(self.disk.size, "h")), 1441 errors.ECODE_STATE) 1442 else: 1443 self.delta = self.op.amount 1444 self.target = self.disk.size + self.delta 1445 if self.delta < 0: 1446 raise errors.OpPrereqError("Requested increment (%s) is negative" % 1447 utils.FormatUnit(self.delta, "h"), 1448 errors.ECODE_INVAL) 1449 1450 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1451
1452 - def _CheckDiskSpace(self, node_uuids, req_vgspace):
1453 template = self.instance.disk_template 1454 if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and 1455 not any(self.node_es_flags.values())): 1456 # TODO: check the free disk space for file, when that feature will be 1457 # supported 1458 # With exclusive storage we need to do something smarter than just looking 1459 # at free space, which, in the end, is basically a dry run. So we rely on 1460 # the dry run performed in Exec() instead. 1461 CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1462
1463 - def Exec(self, feedback_fn):
1464 """Execute disk grow. 1465 1466 """ 1467 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) 1468 assert (self.owned_locks(locking.LEVEL_NODE) == 1469 self.owned_locks(locking.LEVEL_NODE_RES)) 1470 1471 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks 1472 1473 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk]) 1474 if not disks_ok: 1475 raise errors.OpExecError("Cannot activate block device to grow") 1476 1477 feedback_fn("Growing disk %s of instance '%s' by %s to %s" % 1478 (self.op.disk, self.instance.name, 1479 utils.FormatUnit(self.delta, "h"), 1480 utils.FormatUnit(self.target, "h"))) 1481 1482 # First run all grow ops in dry-run mode 1483 for node_uuid in self.instance.all_nodes: 1484 self.cfg.SetDiskID(self.disk, node_uuid) 1485 result = self.rpc.call_blockdev_grow(node_uuid, 1486 (self.disk, self.instance), 1487 self.delta, True, True, 1488 self.node_es_flags[node_uuid]) 1489 result.Raise("Dry-run grow request failed to node %s" % 1490 self.cfg.GetNodeName(node_uuid)) 1491 1492 if wipe_disks: 1493 # Get disk size from primary node for wiping 1494 self.cfg.SetDiskID(self.disk, self.instance.primary_node) 1495 result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node, 1496 [self.disk]) 1497 result.Raise("Failed to retrieve disk size from node '%s'" % 1498 self.instance.primary_node) 1499 1500 (disk_dimensions, ) = result.payload 1501 1502 if disk_dimensions is None: 1503 raise errors.OpExecError("Failed to retrieve disk size from primary" 1504 " node '%s'" % self.instance.primary_node) 1505 (disk_size_in_bytes, _) = disk_dimensions 1506 1507 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes) 1508 1509 assert old_disk_size >= self.disk.size, \ 1510 ("Retrieved disk size too small (got %s, should be at least %s)" % 1511 (old_disk_size, self.disk.size)) 1512 else: 1513 old_disk_size = None 1514 1515 # We know that (as far as we can test) operations across different 1516 # nodes will succeed, time to run it for real on the backing storage 1517 for node_uuid in self.instance.all_nodes: 1518 self.cfg.SetDiskID(self.disk, node_uuid) 1519 result = self.rpc.call_blockdev_grow(node_uuid, 1520 (self.disk, self.instance), 1521 self.delta, False, True, 1522 self.node_es_flags[node_uuid]) 1523 result.Raise("Grow request failed to node %s" % 1524 self.cfg.GetNodeName(node_uuid)) 1525 1526 # And now execute it for logical storage, on the primary node 1527 node_uuid = self.instance.primary_node 1528 self.cfg.SetDiskID(self.disk, node_uuid) 1529 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance), 1530 self.delta, False, False, 1531 self.node_es_flags[node_uuid]) 1532 result.Raise("Grow request failed to node %s" % 1533 self.cfg.GetNodeName(node_uuid)) 1534 1535 self.disk.RecordGrow(self.delta) 1536 self.cfg.Update(self.instance, feedback_fn) 1537 1538 # Changes have been recorded, release node lock 1539 ReleaseLocks(self, locking.LEVEL_NODE) 1540 1541 # Downgrade lock while waiting for sync 1542 self.glm.downgrade(locking.LEVEL_INSTANCE) 1543 1544 assert wipe_disks ^ (old_disk_size is None) 1545 1546 if wipe_disks: 1547 assert self.instance.disks[self.op.disk] == self.disk 1548 1549 # Wipe newly added disk space 1550 WipeDisks(self, self.instance, 1551 disks=[(self.op.disk, self.disk, old_disk_size)]) 1552 1553 if self.op.wait_for_sync: 1554 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk]) 1555 if disk_abort: 1556 self.LogWarning("Disk syncing has not returned a good status; check" 1557 " the instance") 1558 if not self.instance.disks_active: 1559 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk]) 1560 elif not self.instance.disks_active: 1561 self.LogWarning("Not shutting down the disk even if the instance is" 1562 " not supposed to be running because no wait for" 1563 " sync mode was requested") 1564 1565 assert self.owned_locks(locking.LEVEL_NODE_RES) 1566 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1567
1568 1569 -class LUInstanceReplaceDisks(LogicalUnit):
1570 """Replace the disks of an instance. 1571 1572 """ 1573 HPATH = "mirrors-replace" 1574 HTYPE = constants.HTYPE_INSTANCE 1575 REQ_BGL = False 1576
1577 - def CheckArguments(self):
1578 """Check arguments. 1579 1580 """ 1581 if self.op.mode == constants.REPLACE_DISK_CHG: 1582 if self.op.remote_node is None and self.op.iallocator is None: 1583 raise errors.OpPrereqError("When changing the secondary either an" 1584 " iallocator script must be used or the" 1585 " new node given", errors.ECODE_INVAL) 1586 else: 1587 CheckIAllocatorOrNode(self, "iallocator", "remote_node") 1588 1589 elif self.op.remote_node is not None or self.op.iallocator is not None: 1590 # Not replacing the secondary 1591 raise errors.OpPrereqError("The iallocator and new node options can" 1592 " only be used when changing the" 1593 " secondary node", errors.ECODE_INVAL)
1594
1595 - def ExpandNames(self):
1596 self._ExpandAndLockInstance() 1597 1598 assert locking.LEVEL_NODE not in self.needed_locks 1599 assert locking.LEVEL_NODE_RES not in self.needed_locks 1600 assert locking.LEVEL_NODEGROUP not in self.needed_locks 1601 1602 assert self.op.iallocator is None or self.op.remote_node is None, \ 1603 "Conflicting options" 1604 1605 if self.op.remote_node is not None: 1606 (self.op.remote_node_uuid, self.op.remote_node) = \ 1607 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 1608 self.op.remote_node) 1609 1610 # Warning: do not remove the locking of the new secondary here 1611 # unless DRBD8Dev.AddChildren is changed to work in parallel; 1612 # currently it doesn't since parallel invocations of 1613 # FindUnusedMinor will conflict 1614 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid] 1615 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 1616 else: 1617 self.needed_locks[locking.LEVEL_NODE] = [] 1618 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1619 1620 if self.op.iallocator is not None: 1621 # iallocator will select a new node in the same group 1622 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 1623 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 1624 1625 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1626 1627 self.replacer = TLReplaceDisks(self, self.op.instance_uuid, 1628 self.op.instance_name, self.op.mode, 1629 self.op.iallocator, self.op.remote_node_uuid, 1630 self.op.disks, self.op.early_release, 1631 self.op.ignore_ipolicy) 1632 1633 self.tasklets = [self.replacer]
1634
1635 - def DeclareLocks(self, level):
1636 if level == locking.LEVEL_NODEGROUP: 1637 assert self.op.remote_node_uuid is None 1638 assert self.op.iallocator is not None 1639 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 1640 1641 self.share_locks[locking.LEVEL_NODEGROUP] = 1 1642 # Lock all groups used by instance optimistically; this requires going 1643 # via the node before it's locked, requiring verification later on 1644 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1645 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid) 1646 1647 elif level == locking.LEVEL_NODE: 1648 if self.op.iallocator is not None: 1649 assert self.op.remote_node_uuid is None 1650 assert not self.needed_locks[locking.LEVEL_NODE] 1651 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) 1652 1653 # Lock member nodes of all locked groups 1654 self.needed_locks[locking.LEVEL_NODE] = \ 1655 [node_uuid 1656 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 1657 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members] 1658 else: 1659 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) 1660 1661 self._LockInstancesNodes() 1662 1663 elif level == locking.LEVEL_NODE_RES: 1664 # Reuse node locks 1665 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1666 self.needed_locks[locking.LEVEL_NODE]
1667
1668 - def BuildHooksEnv(self):
1669 """Build hooks env. 1670 1671 This runs on the master, the primary and all the secondaries. 1672 1673 """ 1674 instance = self.replacer.instance 1675 env = { 1676 "MODE": self.op.mode, 1677 "NEW_SECONDARY": self.op.remote_node, 1678 "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]), 1679 } 1680 env.update(BuildInstanceHookEnvByObject(self, instance)) 1681 return env
1682
1683 - def BuildHooksNodes(self):
1684 """Build hooks nodes. 1685 1686 """ 1687 instance = self.replacer.instance 1688 nl = [ 1689 self.cfg.GetMasterNode(), 1690 instance.primary_node, 1691 ] 1692 if self.op.remote_node_uuid is not None: 1693 nl.append(self.op.remote_node_uuid) 1694 return nl, nl
1695
1696 - def CheckPrereq(self):
1697 """Check prerequisites. 1698 1699 """ 1700 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or 1701 self.op.iallocator is None) 1702 1703 # Verify if node group locks are still correct 1704 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 1705 if owned_groups: 1706 CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups) 1707 1708 return LogicalUnit.CheckPrereq(self)
1709
1710 1711 -class LUInstanceActivateDisks(NoHooksLU):
1712 """Bring up an instance's disks. 1713 1714 """ 1715 REQ_BGL = False 1716
1717 - def ExpandNames(self):
1718 self._ExpandAndLockInstance() 1719 self.needed_locks[locking.LEVEL_NODE] = [] 1720 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1721
1722 - def DeclareLocks(self, level):
1723 if level == locking.LEVEL_NODE: 1724 self._LockInstancesNodes()
1725
1726 - def CheckPrereq(self):
1727 """Check prerequisites. 1728 1729 This checks that the instance is in the cluster. 1730 1731 """ 1732 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1733 assert self.instance is not None, \ 1734 "Cannot retrieve locked instance %s" % self.op.instance_name 1735 CheckNodeOnline(self, self.instance.primary_node)
1736
1737 - def Exec(self, feedback_fn):
1738 """Activate the disks. 1739 1740 """ 1741 disks_ok, disks_info = \ 1742 AssembleInstanceDisks(self, self.instance, 1743 ignore_size=self.op.ignore_size) 1744 if not disks_ok: 1745 raise errors.OpExecError("Cannot activate block devices") 1746 1747 if self.op.wait_for_sync: 1748 if not WaitForSync(self, self.instance): 1749 self.cfg.MarkInstanceDisksInactive(self.instance.uuid) 1750 raise errors.OpExecError("Some disks of the instance are degraded!") 1751 1752 return disks_info
1753
1754 1755 -class LUInstanceDeactivateDisks(NoHooksLU):
1756 """Shutdown an instance's disks. 1757 1758 """ 1759 REQ_BGL = False 1760
1761 - def ExpandNames(self):
1762 self._ExpandAndLockInstance() 1763 self.needed_locks[locking.LEVEL_NODE] = [] 1764 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1765
1766 - def DeclareLocks(self, level):
1767 if level == locking.LEVEL_NODE: 1768 self._LockInstancesNodes()
1769
1770 - def CheckPrereq(self):
1771 """Check prerequisites. 1772 1773 This checks that the instance is in the cluster. 1774 1775 """ 1776 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1777 assert self.instance is not None, \ 1778 "Cannot retrieve locked instance %s" % self.op.instance_name
1779
1780 - def Exec(self, feedback_fn):
1781 """Deactivate the disks 1782 1783 """ 1784 if self.op.force: 1785 ShutdownInstanceDisks(self, self.instance) 1786 else: 1787 _SafeShutdownInstanceDisks(self, self.instance)
1788
1789 1790 -def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary, 1791 ldisk=False):
1792 """Check that mirrors are not degraded. 1793 1794 @attention: The device has to be annotated already. 1795 1796 The ldisk parameter, if True, will change the test from the 1797 is_degraded attribute (which represents overall non-ok status for 1798 the device(s)) to the ldisk (representing the local storage status). 1799 1800 """ 1801 lu.cfg.SetDiskID(dev, node_uuid) 1802 1803 result = True 1804 1805 if on_primary or dev.AssembleOnSecondary(): 1806 rstats = lu.rpc.call_blockdev_find(node_uuid, dev) 1807 msg = rstats.fail_msg 1808 if msg: 1809 lu.LogWarning("Can't find disk on node %s: %s", 1810 lu.cfg.GetNodeName(node_uuid), msg) 1811 result = False 1812 elif not rstats.payload: 1813 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid)) 1814 result = False 1815 else: 1816 if ldisk: 1817 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY 1818 else: 1819 result = result and not rstats.payload.is_degraded 1820 1821 if dev.children: 1822 for child in dev.children: 1823 result = result and _CheckDiskConsistencyInner(lu, instance, child, 1824 node_uuid, on_primary) 1825 1826 return result
1827
1828 1829 -def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1830 """Wrapper around L{_CheckDiskConsistencyInner}. 1831 1832 """ 1833 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 1834 return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary, 1835 ldisk=ldisk)
1836
1837 1838 -def _BlockdevFind(lu, node_uuid, dev, instance):
1839 """Wrapper around call_blockdev_find to annotate diskparams. 1840 1841 @param lu: A reference to the lu object 1842 @param node_uuid: The node to call out 1843 @param dev: The device to find 1844 @param instance: The instance object the device belongs to 1845 @returns The result of the rpc call 1846 1847 """ 1848 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 1849 return lu.rpc.call_blockdev_find(node_uuid, disk)
1850
1851 1852 -def _GenerateUniqueNames(lu, exts):
1853 """Generate a suitable LV name. 1854 1855 This will generate a logical volume name for the given instance. 1856 1857 """ 1858 results = [] 1859 for val in exts: 1860 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 1861 results.append("%s%s" % (new_id, val)) 1862 return results
1863
1864 1865 -class TLReplaceDisks(Tasklet):
1866 """Replaces disks for an instance. 1867 1868 Note: Locking is not within the scope of this class. 1869 1870 """
1871 - def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name, 1872 remote_node_uuid, disks, early_release, ignore_ipolicy):
1873 """Initializes this class. 1874 1875 """ 1876 Tasklet.__init__(self, lu) 1877 1878 # Parameters 1879 self.instance_uuid = instance_uuid 1880 self.instance_name = instance_name 1881 self.mode = mode 1882 self.iallocator_name = iallocator_name 1883 self.remote_node_uuid = remote_node_uuid 1884 self.disks = disks 1885 self.early_release = early_release 1886 self.ignore_ipolicy = ignore_ipolicy 1887 1888 # Runtime data 1889 self.instance = None 1890 self.new_node_uuid = None 1891 self.target_node_uuid = None 1892 self.other_node_uuid = None 1893 self.remote_node_info = None 1894 self.node_secondary_ip = None
1895 1896 @staticmethod
1897 - def _RunAllocator(lu, iallocator_name, instance_uuid, 1898 relocate_from_node_uuids):
1899 """Compute a new secondary node using an IAllocator. 1900 1901 """ 1902 req = iallocator.IAReqRelocate( 1903 inst_uuid=instance_uuid, 1904 relocate_from_node_uuids=list(relocate_from_node_uuids)) 1905 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req) 1906 1907 ial.Run(iallocator_name) 1908 1909 if not ial.success: 1910 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 1911 " %s" % (iallocator_name, ial.info), 1912 errors.ECODE_NORES) 1913 1914 remote_node_name = ial.result[0] 1915 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name) 1916 1917 if remote_node is None: 1918 raise errors.OpPrereqError("Node %s not found in configuration" % 1919 remote_node_name, errors.ECODE_NOENT) 1920 1921 lu.LogInfo("Selected new secondary for instance '%s': %s", 1922 instance_uuid, remote_node_name) 1923 1924 return remote_node.uuid
1925
1926 - def _FindFaultyDisks(self, node_uuid):
1927 """Wrapper for L{FindFaultyInstanceDisks}. 1928 1929 """ 1930 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, 1931 node_uuid, True)
1932
1933 - def _CheckDisksActivated(self, instance):
1934 """Checks if the instance disks are activated. 1935 1936 @param instance: The instance to check disks 1937 @return: True if they are activated, False otherwise 1938 1939 """ 1940 node_uuids = instance.all_nodes 1941 1942 for idx, dev in enumerate(instance.disks): 1943 for node_uuid in node_uuids: 1944 self.lu.LogInfo("Checking disk/%d on %s", idx, 1945 self.cfg.GetNodeName(node_uuid)) 1946 self.cfg.SetDiskID(dev, node_uuid) 1947 1948 result = _BlockdevFind(self, node_uuid, dev, instance) 1949 1950 if result.offline: 1951 continue 1952 elif result.fail_msg or not result.payload: 1953 return False 1954 1955 return True
1956
1957 - def CheckPrereq(self):
1958 """Check prerequisites. 1959 1960 This checks that the instance is in the cluster. 1961 1962 """ 1963 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 1964 assert self.instance is not None, \ 1965 "Cannot retrieve locked instance %s" % self.instance_name 1966 1967 if self.instance.disk_template != constants.DT_DRBD8: 1968 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" 1969 " instances", errors.ECODE_INVAL) 1970 1971 if len(self.instance.secondary_nodes) != 1: 1972 raise errors.OpPrereqError("The instance has a strange layout," 1973 " expected one secondary but found %d" % 1974 len(self.instance.secondary_nodes), 1975 errors.ECODE_FAULT) 1976 1977 secondary_node_uuid = self.instance.secondary_nodes[0] 1978 1979 if self.iallocator_name is None: 1980 remote_node_uuid = self.remote_node_uuid 1981 else: 1982 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name, 1983 self.instance.uuid, 1984 self.instance.secondary_nodes) 1985 1986 if remote_node_uuid is None: 1987 self.remote_node_info = None 1988 else: 1989 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \ 1990 "Remote node '%s' is not locked" % remote_node_uuid 1991 1992 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid) 1993 assert self.remote_node_info is not None, \ 1994 "Cannot retrieve locked node %s" % remote_node_uuid 1995 1996 if remote_node_uuid == self.instance.primary_node: 1997 raise errors.OpPrereqError("The specified node is the primary node of" 1998 " the instance", errors.ECODE_INVAL) 1999 2000 if remote_node_uuid == secondary_node_uuid: 2001 raise errors.OpPrereqError("The specified node is already the" 2002 " secondary node of the instance", 2003 errors.ECODE_INVAL) 2004 2005 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, 2006 constants.REPLACE_DISK_CHG): 2007 raise errors.OpPrereqError("Cannot specify disks to be replaced", 2008 errors.ECODE_INVAL) 2009 2010 if self.mode == constants.REPLACE_DISK_AUTO: 2011 if not self._CheckDisksActivated(self.instance): 2012 raise errors.OpPrereqError("Please run activate-disks on instance %s" 2013 " first" % self.instance_name, 2014 errors.ECODE_STATE) 2015 faulty_primary = self._FindFaultyDisks(self.instance.primary_node) 2016 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid) 2017 2018 if faulty_primary and faulty_secondary: 2019 raise errors.OpPrereqError("Instance %s has faulty disks on more than" 2020 " one node and can not be repaired" 2021 " automatically" % self.instance_name, 2022 errors.ECODE_STATE) 2023 2024 if faulty_primary: 2025 self.disks = faulty_primary 2026 self.target_node_uuid = self.instance.primary_node 2027 self.other_node_uuid = secondary_node_uuid 2028 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2029 elif faulty_secondary: 2030 self.disks = faulty_secondary 2031 self.target_node_uuid = secondary_node_uuid 2032 self.other_node_uuid = self.instance.primary_node 2033 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2034 else: 2035 self.disks = [] 2036 check_nodes = [] 2037 2038 else: 2039 # Non-automatic modes 2040 if self.mode == constants.REPLACE_DISK_PRI: 2041 self.target_node_uuid = self.instance.primary_node 2042 self.other_node_uuid = secondary_node_uuid 2043 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2044 2045 elif self.mode == constants.REPLACE_DISK_SEC: 2046 self.target_node_uuid = secondary_node_uuid 2047 self.other_node_uuid = self.instance.primary_node 2048 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2049 2050 elif self.mode == constants.REPLACE_DISK_CHG: 2051 self.new_node_uuid = remote_node_uuid 2052 self.other_node_uuid = self.instance.primary_node 2053 self.target_node_uuid = secondary_node_uuid 2054 check_nodes = [self.new_node_uuid, self.other_node_uuid] 2055 2056 CheckNodeNotDrained(self.lu, remote_node_uuid) 2057 CheckNodeVmCapable(self.lu, remote_node_uuid) 2058 2059 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid) 2060 assert old_node_info is not None 2061 if old_node_info.offline and not self.early_release: 2062 # doesn't make sense to delay the release 2063 self.early_release = True 2064 self.lu.LogInfo("Old secondary %s is offline, automatically enabling" 2065 " early-release mode", secondary_node_uuid) 2066 2067 else: 2068 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % 2069 self.mode) 2070 2071 # If not specified all disks should be replaced 2072 if not self.disks: 2073 self.disks = range(len(self.instance.disks)) 2074 2075 # TODO: This is ugly, but right now we can't distinguish between internal 2076 # submitted opcode and external one. We should fix that. 2077 if self.remote_node_info: 2078 # We change the node, lets verify it still meets instance policy 2079 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) 2080 cluster = self.cfg.GetClusterInfo() 2081 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 2082 new_group_info) 2083 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, 2084 self.remote_node_info, self.cfg, 2085 ignore=self.ignore_ipolicy) 2086 2087 for node_uuid in check_nodes: 2088 CheckNodeOnline(self.lu, node_uuid) 2089 2090 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid, 2091 self.other_node_uuid, 2092 self.target_node_uuid] 2093 if node_uuid is not None) 2094 2095 # Release unneeded node and node resource locks 2096 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) 2097 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) 2098 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) 2099 2100 # Release any owned node group 2101 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) 2102 2103 # Check whether disks are valid 2104 for disk_idx in self.disks: 2105 self.instance.FindDisk(disk_idx) 2106 2107 # Get secondary node IP addresses 2108 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node) 2109 in self.cfg.GetMultiNodeInfo(touched_nodes))
2110
2111 - def Exec(self, feedback_fn):
2112 """Execute disk replacement. 2113 2114 This dispatches the disk replacement to the appropriate handler. 2115 2116 """ 2117 if __debug__: 2118 # Verify owned locks before starting operation 2119 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) 2120 assert set(owned_nodes) == set(self.node_secondary_ip), \ 2121 ("Incorrect node locks, owning %s, expected %s" % 2122 (owned_nodes, self.node_secondary_ip.keys())) 2123 assert (self.lu.owned_locks(locking.LEVEL_NODE) == 2124 self.lu.owned_locks(locking.LEVEL_NODE_RES)) 2125 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) 2126 2127 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) 2128 assert list(owned_instances) == [self.instance_name], \ 2129 "Instance '%s' not locked" % self.instance_name 2130 2131 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ 2132 "Should not own any node group lock at this point" 2133 2134 if not self.disks: 2135 feedback_fn("No disks need replacement for instance '%s'" % 2136 self.instance.name) 2137 return 2138 2139 feedback_fn("Replacing disk(s) %s for instance '%s'" % 2140 (utils.CommaJoin(self.disks), self.instance.name)) 2141 feedback_fn("Current primary node: %s" % 2142 self.cfg.GetNodeName(self.instance.primary_node)) 2143 feedback_fn("Current seconary node: %s" % 2144 utils.CommaJoin(self.cfg.GetNodeNames( 2145 self.instance.secondary_nodes))) 2146 2147 activate_disks = not self.instance.disks_active 2148 2149 # Activate the instance disks if we're replacing them on a down instance 2150 if activate_disks: 2151 StartInstanceDisks(self.lu, self.instance, True) 2152 2153 try: 2154 # Should we replace the secondary node? 2155 if self.new_node_uuid is not None: 2156 fn = self._ExecDrbd8Secondary 2157 else: 2158 fn = self._ExecDrbd8DiskOnly 2159 2160 result = fn(feedback_fn) 2161 finally: 2162 # Deactivate the instance disks if we're replacing them on a 2163 # down instance 2164 if activate_disks: 2165 _SafeShutdownInstanceDisks(self.lu, self.instance) 2166 2167 assert not self.lu.owned_locks(locking.LEVEL_NODE) 2168 2169 if __debug__: 2170 # Verify owned locks 2171 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES) 2172 nodes = frozenset(self.node_secondary_ip) 2173 assert ((self.early_release and not owned_nodes) or 2174 (not self.early_release and not (set(owned_nodes) - nodes))), \ 2175 ("Not owning the correct locks, early_release=%s, owned=%r," 2176 " nodes=%r" % (self.early_release, owned_nodes, nodes)) 2177 2178 return result
2179
2180 - def _CheckVolumeGroup(self, node_uuids):
2181 self.lu.LogInfo("Checking volume groups") 2182 2183 vgname = self.cfg.GetVGName() 2184 2185 # Make sure volume group exists on all involved nodes 2186 results = self.rpc.call_vg_list(node_uuids) 2187 if not results: 2188 raise errors.OpExecError("Can't list volume groups on the nodes") 2189 2190 for node_uuid in node_uuids: 2191 res = results[node_uuid] 2192 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid)) 2193 if vgname not in res.payload: 2194 raise errors.OpExecError("Volume group '%s' not found on node %s" % 2195 (vgname, self.cfg.GetNodeName(node_uuid)))
2196
2197 - def _CheckDisksExistence(self, node_uuids):
2198 # Check disk existence 2199 for idx, dev in enumerate(self.instance.disks): 2200 if idx not in self.disks: 2201 continue 2202 2203 for node_uuid in node_uuids: 2204 self.lu.LogInfo("Checking disk/%d on %s", idx, 2205 self.cfg.GetNodeName(node_uuid)) 2206 self.cfg.SetDiskID(dev, node_uuid) 2207 2208 result = _BlockdevFind(self, node_uuid, dev, self.instance) 2209 2210 msg = result.fail_msg 2211 if msg or not result.payload: 2212 if not msg: 2213 msg = "disk not found" 2214 if not self._CheckDisksActivated(self.instance): 2215 extra_hint = ("\nDisks seem to be not properly activated. Try" 2216 " running activate-disks on the instance before" 2217 " using replace-disks.") 2218 else: 2219 extra_hint = "" 2220 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" % 2221 (idx, self.cfg.GetNodeName(node_uuid), msg, 2222 extra_hint))
2223
2224 - def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2225 for idx, dev in enumerate(self.instance.disks): 2226 if idx not in self.disks: 2227 continue 2228 2229 self.lu.LogInfo("Checking disk/%d consistency on node %s" % 2230 (idx, self.cfg.GetNodeName(node_uuid))) 2231 2232 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid, 2233 on_primary, ldisk=ldisk): 2234 raise errors.OpExecError("Node %s has degraded storage, unsafe to" 2235 " replace disks for instance %s" % 2236 (self.cfg.GetNodeName(node_uuid), 2237 self.instance.name))
2238
2239 - def _CreateNewStorage(self, node_uuid):
2240 """Create new storage on the primary or secondary node. 2241 2242 This is only used for same-node replaces, not for changing the 2243 secondary node, hence we don't want to modify the existing disk. 2244 2245 """ 2246 iv_names = {} 2247 2248 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) 2249 for idx, dev in enumerate(disks): 2250 if idx not in self.disks: 2251 continue 2252 2253 self.lu.LogInfo("Adding storage on %s for disk/%d", 2254 self.cfg.GetNodeName(node_uuid), idx) 2255 2256 self.cfg.SetDiskID(dev, node_uuid) 2257 2258 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] 2259 names = _GenerateUniqueNames(self.lu, lv_names) 2260 2261 (data_disk, meta_disk) = dev.children 2262 vg_data = data_disk.logical_id[0] 2263 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size, 2264 logical_id=(vg_data, names[0]), 2265 params=data_disk.params) 2266 vg_meta = meta_disk.logical_id[0] 2267 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN, 2268 size=constants.DRBD_META_SIZE, 2269 logical_id=(vg_meta, names[1]), 2270 params=meta_disk.params) 2271 2272 new_lvs = [lv_data, lv_meta] 2273 old_lvs = [child.Copy() for child in dev.children] 2274 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) 2275 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid) 2276 2277 # we pass force_create=True to force the LVM creation 2278 for new_lv in new_lvs: 2279 try: 2280 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True, 2281 GetInstanceInfoText(self.instance), False, 2282 excl_stor) 2283 except errors.DeviceCreationError, e: 2284 raise errors.OpExecError("Can't create block device: %s" % e.message) 2285 2286 return iv_names
2287
2288 - def _CheckDevices(self, node_uuid, iv_names):
2289 for name, (dev, _, _) in iv_names.iteritems(): 2290 self.cfg.SetDiskID(dev, node_uuid) 2291 2292 result = _BlockdevFind(self, node_uuid, dev, self.instance) 2293 2294 msg = result.fail_msg 2295 if msg or not result.payload: 2296 if not msg: 2297 msg = "disk not found" 2298 raise errors.OpExecError("Can't find DRBD device %s: %s" % 2299 (name, msg)) 2300 2301 if result.payload.is_degraded: 2302 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2303
2304 - def _RemoveOldStorage(self, node_uuid, iv_names):
2305 for name, (_, old_lvs, _) in iv_names.iteritems(): 2306 self.lu.LogInfo("Remove logical volumes for %s", name) 2307 2308 for lv in old_lvs: 2309 self.cfg.SetDiskID(lv, node_uuid) 2310 2311 msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg 2312 if msg: 2313 self.lu.LogWarning("Can't remove old LV: %s", msg, 2314 hint="remove unused LVs manually")
2315
2316 - def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2317 """Replace a disk on the primary or secondary for DRBD 8. 2318 2319 The algorithm for replace is quite complicated: 2320 2321 1. for each disk to be replaced: 2322 2323 1. create new LVs on the target node with unique names 2324 1. detach old LVs from the drbd device 2325 1. rename old LVs to name_replaced.<time_t> 2326 1. rename new LVs to old LVs 2327 1. attach the new LVs (with the old names now) to the drbd device 2328 2329 1. wait for sync across all devices 2330 2331 1. for each modified disk: 2332 2333 1. remove old LVs (which have the name name_replaces.<time_t>) 2334 2335 Failures are not very well handled. 2336 2337 """ 2338 steps_total = 6 2339 2340 # Step: check device activation 2341 self.lu.LogStep(1, steps_total, "Check device existence") 2342 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid]) 2343 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid]) 2344 2345 # Step: check other node consistency 2346 self.lu.LogStep(2, steps_total, "Check peer consistency") 2347 self._CheckDisksConsistency( 2348 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node, 2349 False) 2350 2351 # Step: create new storage 2352 self.lu.LogStep(3, steps_total, "Allocate new storage") 2353 iv_names = self._CreateNewStorage(self.target_node_uuid) 2354 2355 # Step: for each lv, detach+rename*2+attach 2356 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2357 for dev, old_lvs, new_lvs in iv_names.itervalues(): 2358 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name) 2359 2360 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev, 2361 old_lvs) 2362 result.Raise("Can't detach drbd from local storage on node" 2363 " %s for device %s" % 2364 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name)) 2365 #dev.children = [] 2366 #cfg.Update(instance) 2367 2368 # ok, we created the new LVs, so now we know we have the needed 2369 # storage; as such, we proceed on the target node to rename 2370 # old_lv to _old, and new_lv to old_lv; note that we rename LVs 2371 # using the assumption that logical_id == physical_id (which in 2372 # turn is the unique_id on that node) 2373 2374 # FIXME(iustin): use a better name for the replaced LVs 2375 temp_suffix = int(time.time()) 2376 ren_fn = lambda d, suff: (d.physical_id[0], 2377 d.physical_id[1] + "_replaced-%s" % suff) 2378 2379 # Build the rename list based on what LVs exist on the node 2380 rename_old_to_new = [] 2381 for to_ren in old_lvs: 2382 result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren) 2383 if not result.fail_msg and result.payload: 2384 # device exists 2385 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) 2386 2387 self.lu.LogInfo("Renaming the old LVs on the target node") 2388 result = self.rpc.call_blockdev_rename(self.target_node_uuid, 2389 rename_old_to_new) 2390 result.Raise("Can't rename old LVs on node %s" % 2391 self.cfg.GetNodeName(self.target_node_uuid)) 2392 2393 # Now we rename the new LVs to the old LVs 2394 self.lu.LogInfo("Renaming the new LVs on the target node") 2395 rename_new_to_old = [(new, old.physical_id) 2396 for old, new in zip(old_lvs, new_lvs)] 2397 result = self.rpc.call_blockdev_rename(self.target_node_uuid, 2398 rename_new_to_old) 2399 result.Raise("Can't rename new LVs on node %s" % 2400 self.cfg.GetNodeName(self.target_node_uuid)) 2401 2402 # Intermediate steps of in memory modifications 2403 for old, new in zip(old_lvs, new_lvs): 2404 new.logical_id = old.logical_id 2405 self.cfg.SetDiskID(new, self.target_node_uuid) 2406 2407 # We need to modify old_lvs so that removal later removes the 2408 # right LVs, not the newly added ones; note that old_lvs is a 2409 # copy here 2410 for disk in old_lvs: 2411 disk.logical_id = ren_fn(disk, temp_suffix) 2412 self.cfg.SetDiskID(disk, self.target_node_uuid) 2413 2414 # Now that the new lvs have the old name, we can add them to the device 2415 self.lu.LogInfo("Adding new mirror component on %s", 2416 self.cfg.GetNodeName(self.target_node_uuid)) 2417 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid, 2418 (dev, self.instance), new_lvs) 2419 msg = result.fail_msg 2420 if msg: 2421 for new_lv in new_lvs: 2422 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid, 2423 new_lv).fail_msg 2424 if msg2: 2425 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, 2426 hint=("cleanup manually the unused logical" 2427 "volumes")) 2428 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) 2429 2430 cstep = itertools.count(5) 2431 2432 if self.early_release: 2433 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2434 self._RemoveOldStorage(self.target_node_uuid, iv_names) 2435 # TODO: Check if releasing locks early still makes sense 2436 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2437 else: 2438 # Release all resource locks except those used by the instance 2439 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2440 keep=self.node_secondary_ip.keys()) 2441 2442 # Release all node locks while waiting for sync 2443 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2444 2445 # TODO: Can the instance lock be downgraded here? Take the optional disk 2446 # shutdown in the caller into consideration. 2447 2448 # Wait for sync 2449 # This can fail as the old devices are degraded and _WaitForSync 2450 # does a combined result over all disks, so we don't check its return value 2451 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2452 WaitForSync(self.lu, self.instance) 2453 2454 # Check all devices manually 2455 self._CheckDevices(self.instance.primary_node, iv_names) 2456 2457 # Step: remove old storage 2458 if not self.early_release: 2459 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2460 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2461
2462 - def _ExecDrbd8Secondary(self, feedback_fn):
2463 """Replace the secondary node for DRBD 8. 2464 2465 The algorithm for replace is quite complicated: 2466 - for all disks of the instance: 2467 - create new LVs on the new node with same names 2468 - shutdown the drbd device on the old secondary 2469 - disconnect the drbd network on the primary 2470 - create the drbd device on the new secondary 2471 - network attach the drbd on the primary, using an artifice: 2472 the drbd code for Attach() will connect to the network if it 2473 finds a device which is connected to the good local disks but 2474 not network enabled 2475 - wait for sync across all devices 2476 - remove all disks from the old secondary 2477 2478 Failures are not very well handled. 2479 2480 """ 2481 steps_total = 6 2482 2483 pnode = self.instance.primary_node 2484 2485 # Step: check device activation 2486 self.lu.LogStep(1, steps_total, "Check device existence") 2487 self._CheckDisksExistence([self.instance.primary_node]) 2488 self._CheckVolumeGroup([self.instance.primary_node]) 2489 2490 # Step: check other node consistency 2491 self.lu.LogStep(2, steps_total, "Check peer consistency") 2492 self._CheckDisksConsistency(self.instance.primary_node, True, True) 2493 2494 # Step: create new storage 2495 self.lu.LogStep(3, steps_total, "Allocate new storage") 2496 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) 2497 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, 2498 self.new_node_uuid) 2499 for idx, dev in enumerate(disks): 2500 self.lu.LogInfo("Adding new local storage on %s for disk/%d" % 2501 (self.cfg.GetNodeName(self.new_node_uuid), idx)) 2502 # we pass force_create=True to force LVM creation 2503 for new_lv in dev.children: 2504 try: 2505 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance, 2506 new_lv, True, GetInstanceInfoText(self.instance), 2507 False, excl_stor) 2508 except errors.DeviceCreationError, e: 2509 raise errors.OpExecError("Can't create block device: %s" % e.message) 2510 2511 # Step 4: dbrd minors and drbd setups changes 2512 # after this, we must manually remove the drbd minors on both the 2513 # error and the success paths 2514 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2515 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid 2516 for _ in self.instance.disks], 2517 self.instance.uuid) 2518 logging.debug("Allocated minors %r", minors) 2519 2520 iv_names = {} 2521 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)): 2522 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % 2523 (self.cfg.GetNodeName(self.new_node_uuid), idx)) 2524 # create new devices on new_node; note that we create two IDs: 2525 # one without port, so the drbd will be activated without 2526 # networking information on the new node at this stage, and one 2527 # with network, for the latter activation in step 4 2528 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id 2529 if self.instance.primary_node == o_node1: 2530 p_minor = o_minor1 2531 else: 2532 assert self.instance.primary_node == o_node2, "Three-node instance?" 2533 p_minor = o_minor2 2534 2535 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None, 2536 p_minor, new_minor, o_secret) 2537 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port, 2538 p_minor, new_minor, o_secret) 2539 2540 iv_names[idx] = (dev, dev.children, new_net_id) 2541 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, 2542 new_net_id) 2543 new_drbd = objects.Disk(dev_type=constants.DT_DRBD8, 2544 logical_id=new_alone_id, 2545 children=dev.children, 2546 size=dev.size, 2547 params={}) 2548 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd], 2549 self.cfg) 2550 try: 2551 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance, 2552 anno_new_drbd, 2553 GetInstanceInfoText(self.instance), False, 2554 excl_stor) 2555 except errors.GenericError: 2556 self.cfg.ReleaseDRBDMinors(self.instance.uuid) 2557 raise 2558 2559 # We have new devices, shutdown the drbd on the old secondary 2560 for idx, dev in enumerate(self.instance.disks): 2561 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx) 2562 self.cfg.SetDiskID(dev, self.target_node_uuid) 2563 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid, 2564 (dev, self.instance)).fail_msg 2565 if msg: 2566 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" 2567 "node: %s" % (idx, msg), 2568 hint=("Please cleanup this device manually as" 2569 " soon as possible")) 2570 2571 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") 2572 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip, 2573 self.instance.disks)[pnode] 2574 2575 msg = result.fail_msg 2576 if msg: 2577 # detaches didn't succeed (unlikely) 2578 self.cfg.ReleaseDRBDMinors(self.instance.uuid) 2579 raise errors.OpExecError("Can't detach the disks from the network on" 2580 " old node: %s" % (msg,)) 2581 2582 # if we managed to detach at least one, we update all the disks of 2583 # the instance to point to the new secondary 2584 self.lu.LogInfo("Updating instance configuration") 2585 for dev, _, new_logical_id in iv_names.itervalues(): 2586 dev.logical_id = new_logical_id 2587 self.cfg.SetDiskID(dev, self.instance.primary_node) 2588 2589 self.cfg.Update(self.instance, feedback_fn) 2590 2591 # Release all node locks (the configuration has been updated) 2592 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2593 2594 # and now perform the drbd attach 2595 self.lu.LogInfo("Attaching primary drbds to new secondary" 2596 " (standalone => connected)") 2597 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, 2598 self.new_node_uuid], 2599 self.node_secondary_ip, 2600 (self.instance.disks, self.instance), 2601 self.instance.name, 2602 False) 2603 for to_node, to_result in result.items(): 2604 msg = to_result.fail_msg 2605 if msg: 2606 self.lu.LogWarning("Can't attach drbd disks on node %s: %s", 2607 self.cfg.GetNodeName(to_node), msg, 2608 hint=("please do a gnt-instance info to see the" 2609 " status of disks")) 2610 2611 cstep = itertools.count(5) 2612 2613 if self.early_release: 2614 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2615 self._RemoveOldStorage(self.target_node_uuid, iv_names) 2616 # TODO: Check if releasing locks early still makes sense 2617 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2618 else: 2619 # Release all resource locks except those used by the instance 2620 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2621 keep=self.node_secondary_ip.keys()) 2622 2623 # TODO: Can the instance lock be downgraded here? Take the optional disk 2624 # shutdown in the caller into consideration. 2625 2626 # Wait for sync 2627 # This can fail as the old devices are degraded and _WaitForSync 2628 # does a combined result over all disks, so we don't check its return value 2629 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2630 WaitForSync(self.lu, self.instance) 2631 2632 # Check all devices manually 2633 self._CheckDevices(self.instance.primary_node, iv_names) 2634 2635 # Step: remove old storage 2636 if not self.early_release: 2637 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2638 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2639