Package ganeti :: Package cmdlib :: Module instance_storage
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_storage

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Logical units dealing with storage of instances.""" 
  23   
  24  import itertools 
  25  import logging 
  26  import os 
  27  import time 
  28   
  29  from ganeti import compat 
  30  from ganeti import constants 
  31  from ganeti import errors 
  32  from ganeti import ht 
  33  from ganeti import locking 
  34  from ganeti.masterd import iallocator 
  35  from ganeti import objects 
  36  from ganeti import utils 
  37  from ganeti import opcodes 
  38  from ganeti import rpc 
  39  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet 
  40  from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \ 
  41    AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \ 
  42    CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \ 
  43    IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks 
  44  from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \ 
  45    CopyLockList, ReleaseLocks, CheckNodeVmCapable, \ 
  46    BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy 
  47   
  48  import ganeti.masterd.instance 
  49   
  50   
  51  _DISK_TEMPLATE_NAME_PREFIX = { 
  52    constants.DT_PLAIN: "", 
  53    constants.DT_RBD: ".rbd", 
  54    constants.DT_EXT: ".ext", 
  55    } 
  56   
  57   
  58  _DISK_TEMPLATE_DEVICE_TYPE = { 
  59    constants.DT_PLAIN: constants.LD_LV, 
  60    constants.DT_FILE: constants.LD_FILE, 
  61    constants.DT_SHARED_FILE: constants.LD_FILE, 
  62    constants.DT_BLOCK: constants.LD_BLOCKDEV, 
  63    constants.DT_RBD: constants.LD_RBD, 
  64    constants.DT_EXT: constants.LD_EXT, 
  65    } 
66 67 68 -def CreateSingleBlockDev(lu, node, instance, device, info, force_open, 69 excl_stor):
70 """Create a single block device on a given node. 71 72 This will not recurse over children of the device, so they must be 73 created in advance. 74 75 @param lu: the lu on whose behalf we execute 76 @param node: the node on which to create the device 77 @type instance: L{objects.Instance} 78 @param instance: the instance which owns the device 79 @type device: L{objects.Disk} 80 @param device: the device to create 81 @param info: the extra 'metadata' we should attach to the device 82 (this will be represented as a LVM tag) 83 @type force_open: boolean 84 @param force_open: this parameter will be passes to the 85 L{backend.BlockdevCreate} function where it specifies 86 whether we run on primary or not, and it affects both 87 the child assembly and the device own Open() execution 88 @type excl_stor: boolean 89 @param excl_stor: Whether exclusive_storage is active for the node 90 91 """ 92 lu.cfg.SetDiskID(device, node) 93 result = lu.rpc.call_blockdev_create(node, device, device.size, 94 instance.name, force_open, info, 95 excl_stor) 96 result.Raise("Can't create block device %s on" 97 " node %s for instance %s" % (device, node, instance.name)) 98 if device.physical_id is None: 99 device.physical_id = result.payload
100
101 102 -def _CreateBlockDevInner(lu, node, instance, device, force_create, 103 info, force_open, excl_stor):
104 """Create a tree of block devices on a given node. 105 106 If this device type has to be created on secondaries, create it and 107 all its children. 108 109 If not, just recurse to children keeping the same 'force' value. 110 111 @attention: The device has to be annotated already. 112 113 @param lu: the lu on whose behalf we execute 114 @param node: the node on which to create the device 115 @type instance: L{objects.Instance} 116 @param instance: the instance which owns the device 117 @type device: L{objects.Disk} 118 @param device: the device to create 119 @type force_create: boolean 120 @param force_create: whether to force creation of this device; this 121 will be change to True whenever we find a device which has 122 CreateOnSecondary() attribute 123 @param info: the extra 'metadata' we should attach to the device 124 (this will be represented as a LVM tag) 125 @type force_open: boolean 126 @param force_open: this parameter will be passes to the 127 L{backend.BlockdevCreate} function where it specifies 128 whether we run on primary or not, and it affects both 129 the child assembly and the device own Open() execution 130 @type excl_stor: boolean 131 @param excl_stor: Whether exclusive_storage is active for the node 132 133 @return: list of created devices 134 """ 135 created_devices = [] 136 try: 137 if device.CreateOnSecondary(): 138 force_create = True 139 140 if device.children: 141 for child in device.children: 142 devs = _CreateBlockDevInner(lu, node, instance, child, force_create, 143 info, force_open, excl_stor) 144 created_devices.extend(devs) 145 146 if not force_create: 147 return created_devices 148 149 CreateSingleBlockDev(lu, node, instance, device, info, force_open, 150 excl_stor) 151 # The device has been completely created, so there is no point in keeping 152 # its subdevices in the list. We just add the device itself instead. 153 created_devices = [(node, device)] 154 return created_devices 155 156 except errors.DeviceCreationError, e: 157 e.created_devices.extend(created_devices) 158 raise e 159 except errors.OpExecError, e: 160 raise errors.DeviceCreationError(str(e), created_devices)
161
162 163 -def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164 """Whether exclusive_storage is in effect for the given node. 165 166 @type cfg: L{config.ConfigWriter} 167 @param cfg: The cluster configuration 168 @type nodename: string 169 @param nodename: The node 170 @rtype: bool 171 @return: The effective value of exclusive_storage 172 @raise errors.OpPrereqError: if no node exists with the given name 173 174 """ 175 ni = cfg.GetNodeInfo(nodename) 176 if ni is None: 177 raise errors.OpPrereqError("Invalid node name %s" % nodename, 178 errors.ECODE_NOENT) 179 return IsExclusiveStorageEnabledNode(cfg, ni)
180
181 182 -def _CreateBlockDev(lu, node, instance, device, force_create, info, 183 force_open):
184 """Wrapper around L{_CreateBlockDevInner}. 185 186 This method annotates the root device first. 187 188 """ 189 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg) 190 excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node) 191 return _CreateBlockDevInner(lu, node, instance, disk, force_create, info, 192 force_open, excl_stor)
193
194 195 -def _UndoCreateDisks(lu, disks_created):
196 """Undo the work performed by L{CreateDisks}. 197 198 This function is called in case of an error to undo the work of 199 L{CreateDisks}. 200 201 @type lu: L{LogicalUnit} 202 @param lu: the logical unit on whose behalf we execute 203 @param disks_created: the result returned by L{CreateDisks} 204 205 """ 206 for (node, disk) in disks_created: 207 lu.cfg.SetDiskID(disk, node) 208 result = lu.rpc.call_blockdev_remove(node, disk) 209 if result.fail_msg: 210 logging.warning("Failed to remove newly-created disk %s on node %s:" 211 " %s", disk, node, result.fail_msg)
212
213 214 -def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215 """Create all disks for an instance. 216 217 This abstracts away some work from AddInstance. 218 219 @type lu: L{LogicalUnit} 220 @param lu: the logical unit on whose behalf we execute 221 @type instance: L{objects.Instance} 222 @param instance: the instance whose disks we should create 223 @type to_skip: list 224 @param to_skip: list of indices to skip 225 @type target_node: string 226 @param target_node: if passed, overrides the target node for creation 227 @type disks: list of {objects.Disk} 228 @param disks: the disks to create; if not specified, all the disks of the 229 instance are created 230 @return: information about the created disks, to be used to call 231 L{_UndoCreateDisks} 232 @raise errors.OpPrereqError: in case of error 233 234 """ 235 info = GetInstanceInfoText(instance) 236 if target_node is None: 237 pnode = instance.primary_node 238 all_nodes = instance.all_nodes 239 else: 240 pnode = target_node 241 all_nodes = [pnode] 242 243 if disks is None: 244 disks = instance.disks 245 246 if instance.disk_template in constants.DTS_FILEBASED: 247 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) 248 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir) 249 250 result.Raise("Failed to create directory '%s' on" 251 " node %s" % (file_storage_dir, pnode)) 252 253 disks_created = [] 254 for idx, device in enumerate(disks): 255 if to_skip and idx in to_skip: 256 continue 257 logging.info("Creating disk %s for instance '%s'", idx, instance.name) 258 for node in all_nodes: 259 f_create = node == pnode 260 try: 261 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create) 262 disks_created.append((node, device)) 263 except errors.DeviceCreationError, e: 264 logging.warning("Creating disk %s for instance '%s' failed", 265 idx, instance.name) 266 disks_created.extend(e.created_devices) 267 _UndoCreateDisks(lu, disks_created) 268 raise errors.OpExecError(e.message) 269 return disks_created
270
271 272 -def ComputeDiskSizePerVG(disk_template, disks):
273 """Compute disk size requirements in the volume group 274 275 """ 276 def _compute(disks, payload): 277 """Universal algorithm. 278 279 """ 280 vgs = {} 281 for disk in disks: 282 vgs[disk[constants.IDISK_VG]] = \ 283 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload 284 285 return vgs
286 287 # Required free disk space as a function of disk and swap space 288 req_size_dict = { 289 constants.DT_DISKLESS: {}, 290 constants.DT_PLAIN: _compute(disks, 0), 291 # 128 MB are added for drbd metadata for each disk 292 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE), 293 constants.DT_FILE: {}, 294 constants.DT_SHARED_FILE: {}, 295 } 296 297 if disk_template not in req_size_dict: 298 raise errors.ProgrammerError("Disk template '%s' size requirement" 299 " is unknown" % disk_template) 300 301 return req_size_dict[disk_template] 302
303 304 -def ComputeDisks(op, default_vg):
305 """Computes the instance disks. 306 307 @param op: The instance opcode 308 @param default_vg: The default_vg to assume 309 310 @return: The computed disks 311 312 """ 313 disks = [] 314 for disk in op.disks: 315 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR) 316 if mode not in constants.DISK_ACCESS_SET: 317 raise errors.OpPrereqError("Invalid disk access mode '%s'" % 318 mode, errors.ECODE_INVAL) 319 size = disk.get(constants.IDISK_SIZE, None) 320 if size is None: 321 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) 322 try: 323 size = int(size) 324 except (TypeError, ValueError): 325 raise errors.OpPrereqError("Invalid disk size '%s'" % size, 326 errors.ECODE_INVAL) 327 328 ext_provider = disk.get(constants.IDISK_PROVIDER, None) 329 if ext_provider and op.disk_template != constants.DT_EXT: 330 raise errors.OpPrereqError("The '%s' option is only valid for the %s" 331 " disk template, not %s" % 332 (constants.IDISK_PROVIDER, constants.DT_EXT, 333 op.disk_template), errors.ECODE_INVAL) 334 335 data_vg = disk.get(constants.IDISK_VG, default_vg) 336 name = disk.get(constants.IDISK_NAME, None) 337 if name is not None and name.lower() == constants.VALUE_NONE: 338 name = None 339 new_disk = { 340 constants.IDISK_SIZE: size, 341 constants.IDISK_MODE: mode, 342 constants.IDISK_VG: data_vg, 343 constants.IDISK_NAME: name, 344 } 345 346 if constants.IDISK_METAVG in disk: 347 new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG] 348 if constants.IDISK_ADOPT in disk: 349 new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT] 350 351 # For extstorage, demand the `provider' option and add any 352 # additional parameters (ext-params) to the dict 353 if op.disk_template == constants.DT_EXT: 354 if ext_provider: 355 new_disk[constants.IDISK_PROVIDER] = ext_provider 356 for key in disk: 357 if key not in constants.IDISK_PARAMS: 358 new_disk[key] = disk[key] 359 else: 360 raise errors.OpPrereqError("Missing provider for template '%s'" % 361 constants.DT_EXT, errors.ECODE_INVAL) 362 363 disks.append(new_disk) 364 365 return disks
366
367 368 -def CheckRADOSFreeSpace():
369 """Compute disk size requirements inside the RADOS cluster. 370 371 """ 372 # For the RADOS cluster we assume there is always enough space. 373 pass
374
375 376 -def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, 377 iv_name, p_minor, s_minor):
378 """Generate a drbd8 device complete with its children. 379 380 """ 381 assert len(vgnames) == len(names) == 2 382 port = lu.cfg.AllocatePort() 383 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) 384 385 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, 386 logical_id=(vgnames[0], names[0]), 387 params={}) 388 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 389 dev_meta = objects.Disk(dev_type=constants.LD_LV, 390 size=constants.DRBD_META_SIZE, 391 logical_id=(vgnames[1], names[1]), 392 params={}) 393 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 394 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, 395 logical_id=(primary, secondary, port, 396 p_minor, s_minor, 397 shared_secret), 398 children=[dev_data, dev_meta], 399 iv_name=iv_name, params={}) 400 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 401 return drbd_dev
402
403 404 -def GenerateDiskTemplate( 405 lu, template_name, instance_name, primary_node, secondary_nodes, 406 disk_info, file_storage_dir, file_driver, base_index, 407 feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage, 408 _req_shr_file_storage=opcodes.RequireSharedFileStorage):
409 """Generate the entire disk layout for a given template type. 410 411 """ 412 vgname = lu.cfg.GetVGName() 413 disk_count = len(disk_info) 414 disks = [] 415 416 if template_name == constants.DT_DISKLESS: 417 pass 418 elif template_name == constants.DT_DRBD8: 419 if len(secondary_nodes) != 1: 420 raise errors.ProgrammerError("Wrong template configuration") 421 remote_node = secondary_nodes[0] 422 minors = lu.cfg.AllocateDRBDMinor( 423 [primary_node, remote_node] * len(disk_info), instance_name) 424 425 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name, 426 full_disk_params) 427 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] 428 429 names = [] 430 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) 431 for i in range(disk_count)]): 432 names.append(lv_prefix + "_data") 433 names.append(lv_prefix + "_meta") 434 for idx, disk in enumerate(disk_info): 435 disk_index = idx + base_index 436 data_vg = disk.get(constants.IDISK_VG, vgname) 437 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg) 438 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node, 439 disk[constants.IDISK_SIZE], 440 [data_vg, meta_vg], 441 names[idx * 2:idx * 2 + 2], 442 "disk/%d" % disk_index, 443 minors[idx * 2], minors[idx * 2 + 1]) 444 disk_dev.mode = disk[constants.IDISK_MODE] 445 disk_dev.name = disk.get(constants.IDISK_NAME, None) 446 disks.append(disk_dev) 447 else: 448 if secondary_nodes: 449 raise errors.ProgrammerError("Wrong template configuration") 450 451 if template_name == constants.DT_FILE: 452 _req_file_storage() 453 elif template_name == constants.DT_SHARED_FILE: 454 _req_shr_file_storage() 455 456 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None) 457 if name_prefix is None: 458 names = None 459 else: 460 names = _GenerateUniqueNames(lu, ["%s.disk%s" % 461 (name_prefix, base_index + i) 462 for i in range(disk_count)]) 463 464 if template_name == constants.DT_PLAIN: 465 466 def logical_id_fn(idx, _, disk): 467 vg = disk.get(constants.IDISK_VG, vgname) 468 return (vg, names[idx])
469 470 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE): 471 logical_id_fn = \ 472 lambda _, disk_index, disk: (file_driver, 473 "%s/disk%d" % (file_storage_dir, 474 disk_index)) 475 elif template_name == constants.DT_BLOCK: 476 logical_id_fn = \ 477 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL, 478 disk[constants.IDISK_ADOPT]) 479 elif template_name == constants.DT_RBD: 480 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx]) 481 elif template_name == constants.DT_EXT: 482 def logical_id_fn(idx, _, disk): 483 provider = disk.get(constants.IDISK_PROVIDER, None) 484 if provider is None: 485 raise errors.ProgrammerError("Disk template is %s, but '%s' is" 486 " not found", constants.DT_EXT, 487 constants.IDISK_PROVIDER) 488 return (provider, names[idx]) 489 else: 490 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) 491 492 dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name] 493 494 for idx, disk in enumerate(disk_info): 495 params = {} 496 # Only for the Ext template add disk_info to params 497 if template_name == constants.DT_EXT: 498 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER] 499 for key in disk: 500 if key not in constants.IDISK_PARAMS: 501 params[key] = disk[key] 502 disk_index = idx + base_index 503 size = disk[constants.IDISK_SIZE] 504 feedback_fn("* disk %s, size %s" % 505 (disk_index, utils.FormatUnit(size, "h"))) 506 disk_dev = objects.Disk(dev_type=dev_type, size=size, 507 logical_id=logical_id_fn(idx, disk_index, disk), 508 iv_name="disk/%d" % disk_index, 509 mode=disk[constants.IDISK_MODE], 510 params=params) 511 disk_dev.name = disk.get(constants.IDISK_NAME, None) 512 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 513 disks.append(disk_dev) 514 515 return disks 516
517 518 -class LUInstanceRecreateDisks(LogicalUnit):
519 """Recreate an instance's missing disks. 520 521 """ 522 HPATH = "instance-recreate-disks" 523 HTYPE = constants.HTYPE_INSTANCE 524 REQ_BGL = False 525 526 _MODIFYABLE = compat.UniqueFrozenset([ 527 constants.IDISK_SIZE, 528 constants.IDISK_MODE, 529 ]) 530 531 # New or changed disk parameters may have different semantics 532 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([ 533 constants.IDISK_ADOPT, 534 535 # TODO: Implement support changing VG while recreating 536 constants.IDISK_VG, 537 constants.IDISK_METAVG, 538 constants.IDISK_PROVIDER, 539 constants.IDISK_NAME, 540 ])) 541
542 - def _RunAllocator(self):
543 """Run the allocator based on input opcode. 544 545 """ 546 be_full = self.cfg.GetClusterInfo().FillBE(self.instance) 547 548 # FIXME 549 # The allocator should actually run in "relocate" mode, but current 550 # allocators don't support relocating all the nodes of an instance at 551 # the same time. As a workaround we use "allocate" mode, but this is 552 # suboptimal for two reasons: 553 # - The instance name passed to the allocator is present in the list of 554 # existing instances, so there could be a conflict within the 555 # internal structures of the allocator. This doesn't happen with the 556 # current allocators, but it's a liability. 557 # - The allocator counts the resources used by the instance twice: once 558 # because the instance exists already, and once because it tries to 559 # allocate a new instance. 560 # The allocator could choose some of the nodes on which the instance is 561 # running, but that's not a problem. If the instance nodes are broken, 562 # they should be already be marked as drained or offline, and hence 563 # skipped by the allocator. If instance disks have been lost for other 564 # reasons, then recreating the disks on the same nodes should be fine. 565 disk_template = self.instance.disk_template 566 spindle_use = be_full[constants.BE_SPINDLE_USE] 567 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name, 568 disk_template=disk_template, 569 tags=list(self.instance.GetTags()), 570 os=self.instance.os, 571 nics=[{}], 572 vcpus=be_full[constants.BE_VCPUS], 573 memory=be_full[constants.BE_MAXMEM], 574 spindle_use=spindle_use, 575 disks=[{constants.IDISK_SIZE: d.size, 576 constants.IDISK_MODE: d.mode} 577 for d in self.instance.disks], 578 hypervisor=self.instance.hypervisor, 579 node_whitelist=None) 580 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 581 582 ial.Run(self.op.iallocator) 583 584 assert req.RequiredNodes() == len(self.instance.all_nodes) 585 586 if not ial.success: 587 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 588 " %s" % (self.op.iallocator, ial.info), 589 errors.ECODE_NORES) 590 591 self.op.nodes = ial.result 592 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 593 self.op.instance_name, self.op.iallocator, 594 utils.CommaJoin(ial.result))
595
596 - def CheckArguments(self):
597 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]): 598 # Normalize and convert deprecated list of disk indices 599 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] 600 601 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks)) 602 if duplicates: 603 raise errors.OpPrereqError("Some disks have been specified more than" 604 " once: %s" % utils.CommaJoin(duplicates), 605 errors.ECODE_INVAL) 606 607 # We don't want _CheckIAllocatorOrNode selecting the default iallocator 608 # when neither iallocator nor nodes are specified 609 if self.op.iallocator or self.op.nodes: 610 CheckIAllocatorOrNode(self, "iallocator", "nodes") 611 612 for (idx, params) in self.op.disks: 613 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) 614 unsupported = frozenset(params.keys()) - self._MODIFYABLE 615 if unsupported: 616 raise errors.OpPrereqError("Parameters for disk %s try to change" 617 " unmodifyable parameter(s): %s" % 618 (idx, utils.CommaJoin(unsupported)), 619 errors.ECODE_INVAL)
620
621 - def ExpandNames(self):
622 self._ExpandAndLockInstance() 623 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 624 625 if self.op.nodes: 626 self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes] 627 self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) 628 else: 629 self.needed_locks[locking.LEVEL_NODE] = [] 630 if self.op.iallocator: 631 # iallocator will select a new node in the same group 632 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 633 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 634 635 self.needed_locks[locking.LEVEL_NODE_RES] = []
636
637 - def DeclareLocks(self, level):
638 if level == locking.LEVEL_NODEGROUP: 639 assert self.op.iallocator is not None 640 assert not self.op.nodes 641 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 642 self.share_locks[locking.LEVEL_NODEGROUP] = 1 643 # Lock the primary group used by the instance optimistically; this 644 # requires going via the node before it's locked, requiring 645 # verification later on 646 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 647 self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True) 648 649 elif level == locking.LEVEL_NODE: 650 # If an allocator is used, then we lock all the nodes in the current 651 # instance group, as we don't know yet which ones will be selected; 652 # if we replace the nodes without using an allocator, locks are 653 # already declared in ExpandNames; otherwise, we need to lock all the 654 # instance nodes for disk re-creation 655 if self.op.iallocator: 656 assert not self.op.nodes 657 assert not self.needed_locks[locking.LEVEL_NODE] 658 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1 659 660 # Lock member nodes of the group of the primary node 661 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP): 662 self.needed_locks[locking.LEVEL_NODE].extend( 663 self.cfg.GetNodeGroup(group_uuid).members) 664 665 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) 666 elif not self.op.nodes: 667 self._LockInstancesNodes(primary_only=False) 668 elif level == locking.LEVEL_NODE_RES: 669 # Copy node locks 670 self.needed_locks[locking.LEVEL_NODE_RES] = \ 671 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
672
673 - def BuildHooksEnv(self):
674 """Build hooks env. 675 676 This runs on master, primary and secondary nodes of the instance. 677 678 """ 679 return BuildInstanceHookEnvByObject(self, self.instance)
680
681 - def BuildHooksNodes(self):
682 """Build hooks nodes. 683 684 """ 685 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) 686 return (nl, nl)
687
688 - def CheckPrereq(self):
689 """Check prerequisites. 690 691 This checks that the instance is in the cluster and is not running. 692 693 """ 694 instance = self.cfg.GetInstanceInfo(self.op.instance_name) 695 assert instance is not None, \ 696 "Cannot retrieve locked instance %s" % self.op.instance_name 697 if self.op.nodes: 698 if len(self.op.nodes) != len(instance.all_nodes): 699 raise errors.OpPrereqError("Instance %s currently has %d nodes, but" 700 " %d replacement nodes were specified" % 701 (instance.name, len(instance.all_nodes), 702 len(self.op.nodes)), 703 errors.ECODE_INVAL) 704 assert instance.disk_template != constants.DT_DRBD8 or \ 705 len(self.op.nodes) == 2 706 assert instance.disk_template != constants.DT_PLAIN or \ 707 len(self.op.nodes) == 1 708 primary_node = self.op.nodes[0] 709 else: 710 primary_node = instance.primary_node 711 if not self.op.iallocator: 712 CheckNodeOnline(self, primary_node) 713 714 if instance.disk_template == constants.DT_DISKLESS: 715 raise errors.OpPrereqError("Instance '%s' has no disks" % 716 self.op.instance_name, errors.ECODE_INVAL) 717 718 # Verify if node group locks are still correct 719 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 720 if owned_groups: 721 # Node group locks are acquired only for the primary node (and only 722 # when the allocator is used) 723 CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups, 724 primary_only=True) 725 726 # if we replace nodes *and* the old primary is offline, we don't 727 # check the instance state 728 old_pnode = self.cfg.GetNodeInfo(instance.primary_node) 729 if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline): 730 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, 731 msg="cannot recreate disks") 732 733 if self.op.disks: 734 self.disks = dict(self.op.disks) 735 else: 736 self.disks = dict((idx, {}) for idx in range(len(instance.disks))) 737 738 maxidx = max(self.disks.keys()) 739 if maxidx >= len(instance.disks): 740 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, 741 errors.ECODE_INVAL) 742 743 if ((self.op.nodes or self.op.iallocator) and 744 sorted(self.disks.keys()) != range(len(instance.disks))): 745 raise errors.OpPrereqError("Can't recreate disks partially and" 746 " change the nodes at the same time", 747 errors.ECODE_INVAL) 748 749 self.instance = instance 750 751 if self.op.iallocator: 752 self._RunAllocator() 753 # Release unneeded node and node resource locks 754 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes) 755 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes) 756 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) 757 758 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
759
760 - def Exec(self, feedback_fn):
761 """Recreate the disks. 762 763 """ 764 instance = self.instance 765 766 assert (self.owned_locks(locking.LEVEL_NODE) == 767 self.owned_locks(locking.LEVEL_NODE_RES)) 768 769 to_skip = [] 770 mods = [] # keeps track of needed changes 771 772 for idx, disk in enumerate(instance.disks): 773 try: 774 changes = self.disks[idx] 775 except KeyError: 776 # Disk should not be recreated 777 to_skip.append(idx) 778 continue 779 780 # update secondaries for disks, if needed 781 if self.op.nodes and disk.dev_type == constants.LD_DRBD8: 782 # need to update the nodes and minors 783 assert len(self.op.nodes) == 2 784 assert len(disk.logical_id) == 6 # otherwise disk internals 785 # have changed 786 (_, _, old_port, _, _, old_secret) = disk.logical_id 787 new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name) 788 new_id = (self.op.nodes[0], self.op.nodes[1], old_port, 789 new_minors[0], new_minors[1], old_secret) 790 assert len(disk.logical_id) == len(new_id) 791 else: 792 new_id = None 793 794 mods.append((idx, new_id, changes)) 795 796 # now that we have passed all asserts above, we can apply the mods 797 # in a single run (to avoid partial changes) 798 for idx, new_id, changes in mods: 799 disk = instance.disks[idx] 800 if new_id is not None: 801 assert disk.dev_type == constants.LD_DRBD8 802 disk.logical_id = new_id 803 if changes: 804 disk.Update(size=changes.get(constants.IDISK_SIZE, None), 805 mode=changes.get(constants.IDISK_MODE, None)) 806 807 # change primary node, if needed 808 if self.op.nodes: 809 instance.primary_node = self.op.nodes[0] 810 self.LogWarning("Changing the instance's nodes, you will have to" 811 " remove any disks left on the older nodes manually") 812 813 if self.op.nodes: 814 self.cfg.Update(instance, feedback_fn) 815 816 # All touched nodes must be locked 817 mylocks = self.owned_locks(locking.LEVEL_NODE) 818 assert mylocks.issuperset(frozenset(instance.all_nodes)) 819 new_disks = CreateDisks(self, instance, to_skip=to_skip) 820 821 # TODO: Release node locks before wiping, or explain why it's not possible 822 if self.cfg.GetClusterInfo().prealloc_wipe_disks: 823 wipedisks = [(idx, disk, 0) 824 for (idx, disk) in enumerate(instance.disks) 825 if idx not in to_skip] 826 WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
827
828 829 -def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
830 """Checks if nodes have enough free disk space in the specified VG. 831 832 This function checks if all given nodes have the needed amount of 833 free disk. In case any node has less disk or we cannot get the 834 information from the node, this function raises an OpPrereqError 835 exception. 836 837 @type lu: C{LogicalUnit} 838 @param lu: a logical unit from which we get configuration data 839 @type nodenames: C{list} 840 @param nodenames: the list of node names to check 841 @type vg: C{str} 842 @param vg: the volume group to check 843 @type requested: C{int} 844 @param requested: the amount of disk in MiB to check for 845 @raise errors.OpPrereqError: if the node doesn't have enough disk, 846 or we cannot check the node 847 848 """ 849 es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames) 850 nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags) 851 for node in nodenames: 852 info = nodeinfo[node] 853 info.Raise("Cannot get current information from node %s" % node, 854 prereq=True, ecode=errors.ECODE_ENVIRON) 855 (_, (vg_info, ), _) = info.payload 856 vg_free = vg_info.get("vg_free", None) 857 if not isinstance(vg_free, int): 858 raise errors.OpPrereqError("Can't compute free disk space on node" 859 " %s for vg %s, result was '%s'" % 860 (node, vg, vg_free), errors.ECODE_ENVIRON) 861 if requested > vg_free: 862 raise errors.OpPrereqError("Not enough disk space on target node %s" 863 " vg %s: required %d MiB, available %d MiB" % 864 (node, vg, requested, vg_free), 865 errors.ECODE_NORES)
866
867 868 -def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
869 """Checks if nodes have enough free disk space in all the VGs. 870 871 This function checks if all given nodes have the needed amount of 872 free disk. In case any node has less disk or we cannot get the 873 information from the node, this function raises an OpPrereqError 874 exception. 875 876 @type lu: C{LogicalUnit} 877 @param lu: a logical unit from which we get configuration data 878 @type nodenames: C{list} 879 @param nodenames: the list of node names to check 880 @type req_sizes: C{dict} 881 @param req_sizes: the hash of vg and corresponding amount of disk in 882 MiB to check for 883 @raise errors.OpPrereqError: if the node doesn't have enough disk, 884 or we cannot check the node 885 886 """ 887 for vg, req_size in req_sizes.items(): 888 _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
889
890 891 -def _DiskSizeInBytesToMebibytes(lu, size):
892 """Converts a disk size in bytes to mebibytes. 893 894 Warns and rounds up if the size isn't an even multiple of 1 MiB. 895 896 """ 897 (mib, remainder) = divmod(size, 1024 * 1024) 898 899 if remainder != 0: 900 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up" 901 " to not overwrite existing data (%s bytes will not be" 902 " wiped)", (1024 * 1024) - remainder) 903 mib += 1 904 905 return mib
906
907 908 -def _CalcEta(time_taken, written, total_size):
909 """Calculates the ETA based on size written and total size. 910 911 @param time_taken: The time taken so far 912 @param written: amount written so far 913 @param total_size: The total size of data to be written 914 @return: The remaining time in seconds 915 916 """ 917 avg_time = time_taken / float(written) 918 return (total_size - written) * avg_time
919
920 921 -def WipeDisks(lu, instance, disks=None):
922 """Wipes instance disks. 923 924 @type lu: L{LogicalUnit} 925 @param lu: the logical unit on whose behalf we execute 926 @type instance: L{objects.Instance} 927 @param instance: the instance whose disks we should create 928 @type disks: None or list of tuple of (number, L{objects.Disk}, number) 929 @param disks: Disk details; tuple contains disk index, disk object and the 930 start offset 931 932 """ 933 node = instance.primary_node 934 935 if disks is None: 936 disks = [(idx, disk, 0) 937 for (idx, disk) in enumerate(instance.disks)] 938 939 for (_, device, _) in disks: 940 lu.cfg.SetDiskID(device, node) 941 942 logging.info("Pausing synchronization of disks of instance '%s'", 943 instance.name) 944 result = lu.rpc.call_blockdev_pause_resume_sync(node, 945 (map(compat.snd, disks), 946 instance), 947 True) 948 result.Raise("Failed to pause disk synchronization on node '%s'" % node) 949 950 for idx, success in enumerate(result.payload): 951 if not success: 952 logging.warn("Pausing synchronization of disk %s of instance '%s'" 953 " failed", idx, instance.name) 954 955 try: 956 for (idx, device, offset) in disks: 957 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but 958 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors. 959 wipe_chunk_size = \ 960 int(min(constants.MAX_WIPE_CHUNK, 961 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT)) 962 963 size = device.size 964 last_output = 0 965 start_time = time.time() 966 967 if offset == 0: 968 info_text = "" 969 else: 970 info_text = (" (from %s to %s)" % 971 (utils.FormatUnit(offset, "h"), 972 utils.FormatUnit(size, "h"))) 973 974 lu.LogInfo("* Wiping disk %s%s", idx, info_text) 975 976 logging.info("Wiping disk %d for instance %s on node %s using" 977 " chunk size %s", idx, instance.name, node, wipe_chunk_size) 978 979 while offset < size: 980 wipe_size = min(wipe_chunk_size, size - offset) 981 982 logging.debug("Wiping disk %d, offset %s, chunk %s", 983 idx, offset, wipe_size) 984 985 result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset, 986 wipe_size) 987 result.Raise("Could not wipe disk %d at offset %d for size %d" % 988 (idx, offset, wipe_size)) 989 990 now = time.time() 991 offset += wipe_size 992 if now - last_output >= 60: 993 eta = _CalcEta(now - start_time, offset, size) 994 lu.LogInfo(" - done: %.1f%% ETA: %s", 995 offset / float(size) * 100, utils.FormatSeconds(eta)) 996 last_output = now 997 finally: 998 logging.info("Resuming synchronization of disks for instance '%s'", 999 instance.name) 1000 1001 result = lu.rpc.call_blockdev_pause_resume_sync(node, 1002 (map(compat.snd, disks), 1003 instance), 1004 False) 1005 1006 if result.fail_msg: 1007 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s", 1008 node, result.fail_msg) 1009 else: 1010 for idx, success in enumerate(result.payload): 1011 if not success: 1012 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'" 1013 " failed", idx, instance.name)
1014
1015 1016 -def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1017 """Wrapper for L{WipeDisks} that handles errors. 1018 1019 @type lu: L{LogicalUnit} 1020 @param lu: the logical unit on whose behalf we execute 1021 @type instance: L{objects.Instance} 1022 @param instance: the instance whose disks we should wipe 1023 @param disks: see L{WipeDisks} 1024 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in 1025 case of error 1026 @raise errors.OpPrereqError: in case of failure 1027 1028 """ 1029 try: 1030 WipeDisks(lu, instance, disks=disks) 1031 except errors.OpExecError: 1032 logging.warning("Wiping disks for instance '%s' failed", 1033 instance.name) 1034 _UndoCreateDisks(lu, cleanup) 1035 raise
1036
1037 1038 -def ExpandCheckDisks(instance, disks):
1039 """Return the instance disks selected by the disks list 1040 1041 @type disks: list of L{objects.Disk} or None 1042 @param disks: selected disks 1043 @rtype: list of L{objects.Disk} 1044 @return: selected instance disks to act on 1045 1046 """ 1047 if disks is None: 1048 return instance.disks 1049 else: 1050 if not set(disks).issubset(instance.disks): 1051 raise errors.ProgrammerError("Can only act on disks belonging to the" 1052 " target instance: expected a subset of %r," 1053 " got %r" % (instance.disks, disks)) 1054 return disks
1055
1056 1057 -def WaitForSync(lu, instance, disks=None, oneshot=False):
1058 """Sleep and poll for an instance's disk to sync. 1059 1060 """ 1061 if not instance.disks or disks is not None and not disks: 1062 return True 1063 1064 disks = ExpandCheckDisks(instance, disks) 1065 1066 if not oneshot: 1067 lu.LogInfo("Waiting for instance %s to sync disks", instance.name) 1068 1069 node = instance.primary_node 1070 1071 for dev in disks: 1072 lu.cfg.SetDiskID(dev, node) 1073 1074 # TODO: Convert to utils.Retry 1075 1076 retries = 0 1077 degr_retries = 10 # in seconds, as we sleep 1 second each time 1078 while True: 1079 max_time = 0 1080 done = True 1081 cumul_degraded = False 1082 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance)) 1083 msg = rstats.fail_msg 1084 if msg: 1085 lu.LogWarning("Can't get any data from node %s: %s", node, msg) 1086 retries += 1 1087 if retries >= 10: 1088 raise errors.RemoteError("Can't contact node %s for mirror data," 1089 " aborting." % node) 1090 time.sleep(6) 1091 continue 1092 rstats = rstats.payload 1093 retries = 0 1094 for i, mstat in enumerate(rstats): 1095 if mstat is None: 1096 lu.LogWarning("Can't compute data for node %s/%s", 1097 node, disks[i].iv_name) 1098 continue 1099 1100 cumul_degraded = (cumul_degraded or 1101 (mstat.is_degraded and mstat.sync_percent is None)) 1102 if mstat.sync_percent is not None: 1103 done = False 1104 if mstat.estimated_time is not None: 1105 rem_time = ("%s remaining (estimated)" % 1106 utils.FormatSeconds(mstat.estimated_time)) 1107 max_time = mstat.estimated_time 1108 else: 1109 rem_time = "no time estimate" 1110 lu.LogInfo("- device %s: %5.2f%% done, %s", 1111 disks[i].iv_name, mstat.sync_percent, rem_time) 1112 1113 # if we're done but degraded, let's do a few small retries, to 1114 # make sure we see a stable and not transient situation; therefore 1115 # we force restart of the loop 1116 if (done or oneshot) and cumul_degraded and degr_retries > 0: 1117 logging.info("Degraded disks found, %d retries left", degr_retries) 1118 degr_retries -= 1 1119 time.sleep(1) 1120 continue 1121 1122 if done or oneshot: 1123 break 1124 1125 time.sleep(min(60, max_time)) 1126 1127 if done: 1128 lu.LogInfo("Instance %s's disks are in sync", instance.name) 1129 1130 return not cumul_degraded
1131
1132 1133 -def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1134 """Shutdown block devices of an instance. 1135 1136 This does the shutdown on all nodes of the instance. 1137 1138 If the ignore_primary is false, errors on the primary node are 1139 ignored. 1140 1141 """ 1142 lu.cfg.MarkInstanceDisksInactive(instance.name) 1143 all_result = True 1144 disks = ExpandCheckDisks(instance, disks) 1145 1146 for disk in disks: 1147 for node, top_disk in disk.ComputeNodeTree(instance.primary_node): 1148 lu.cfg.SetDiskID(top_disk, node) 1149 result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance)) 1150 msg = result.fail_msg 1151 if msg: 1152 lu.LogWarning("Could not shutdown block device %s on node %s: %s", 1153 disk.iv_name, node, msg) 1154 if ((node == instance.primary_node and not ignore_primary) or 1155 (node != instance.primary_node and not result.offline)): 1156 all_result = False 1157 return all_result
1158
1159 1160 -def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1161 """Shutdown block devices of an instance. 1162 1163 This function checks if an instance is running, before calling 1164 _ShutdownInstanceDisks. 1165 1166 """ 1167 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks") 1168 ShutdownInstanceDisks(lu, instance, disks=disks)
1169
1170 1171 -def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, 1172 ignore_size=False):
1173 """Prepare the block devices for an instance. 1174 1175 This sets up the block devices on all nodes. 1176 1177 @type lu: L{LogicalUnit} 1178 @param lu: the logical unit on whose behalf we execute 1179 @type instance: L{objects.Instance} 1180 @param instance: the instance for whose disks we assemble 1181 @type disks: list of L{objects.Disk} or None 1182 @param disks: which disks to assemble (or all, if None) 1183 @type ignore_secondaries: boolean 1184 @param ignore_secondaries: if true, errors on secondary nodes 1185 won't result in an error return from the function 1186 @type ignore_size: boolean 1187 @param ignore_size: if true, the current known size of the disk 1188 will not be used during the disk activation, useful for cases 1189 when the size is wrong 1190 @return: False if the operation failed, otherwise a list of 1191 (host, instance_visible_name, node_visible_name) 1192 with the mapping from node devices to instance devices 1193 1194 """ 1195 device_info = [] 1196 disks_ok = True 1197 iname = instance.name 1198 disks = ExpandCheckDisks(instance, disks) 1199 1200 # With the two passes mechanism we try to reduce the window of 1201 # opportunity for the race condition of switching DRBD to primary 1202 # before handshaking occured, but we do not eliminate it 1203 1204 # The proper fix would be to wait (with some limits) until the 1205 # connection has been made and drbd transitions from WFConnection 1206 # into any other network-connected state (Connected, SyncTarget, 1207 # SyncSource, etc.) 1208 1209 # mark instance disks as active before doing actual work, so watcher does 1210 # not try to shut them down erroneously 1211 lu.cfg.MarkInstanceDisksActive(iname) 1212 1213 # 1st pass, assemble on all nodes in secondary mode 1214 for idx, inst_disk in enumerate(disks): 1215 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): 1216 if ignore_size: 1217 node_disk = node_disk.Copy() 1218 node_disk.UnsetSize() 1219 lu.cfg.SetDiskID(node_disk, node) 1220 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname, 1221 False, idx) 1222 msg = result.fail_msg 1223 if msg: 1224 is_offline_secondary = (node in instance.secondary_nodes and 1225 result.offline) 1226 lu.LogWarning("Could not prepare block device %s on node %s" 1227 " (is_primary=False, pass=1): %s", 1228 inst_disk.iv_name, node, msg) 1229 if not (ignore_secondaries or is_offline_secondary): 1230 disks_ok = False 1231 1232 # FIXME: race condition on drbd migration to primary 1233 1234 # 2nd pass, do only the primary node 1235 for idx, inst_disk in enumerate(disks): 1236 dev_path = None 1237 1238 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): 1239 if node != instance.primary_node: 1240 continue 1241 if ignore_size: 1242 node_disk = node_disk.Copy() 1243 node_disk.UnsetSize() 1244 lu.cfg.SetDiskID(node_disk, node) 1245 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname, 1246 True, idx) 1247 msg = result.fail_msg 1248 if msg: 1249 lu.LogWarning("Could not prepare block device %s on node %s" 1250 " (is_primary=True, pass=2): %s", 1251 inst_disk.iv_name, node, msg) 1252 disks_ok = False 1253 else: 1254 dev_path = result.payload 1255 1256 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path)) 1257 1258 # leave the disks configured for the primary node 1259 # this is a workaround that would be fixed better by 1260 # improving the logical/physical id handling 1261 for disk in disks: 1262 lu.cfg.SetDiskID(disk, instance.primary_node) 1263 1264 if not disks_ok: 1265 lu.cfg.MarkInstanceDisksInactive(iname) 1266 1267 return disks_ok, device_info
1268
1269 1270 -def StartInstanceDisks(lu, instance, force):
1271 """Start the disks of an instance. 1272 1273 """ 1274 disks_ok, _ = AssembleInstanceDisks(lu, instance, 1275 ignore_secondaries=force) 1276 if not disks_ok: 1277 ShutdownInstanceDisks(lu, instance) 1278 if force is not None and not force: 1279 lu.LogWarning("", 1280 hint=("If the message above refers to a secondary node," 1281 " you can retry the operation using '--force'")) 1282 raise errors.OpExecError("Disk consistency error")
1283
1284 1285 -class LUInstanceGrowDisk(LogicalUnit):
1286 """Grow a disk of an instance. 1287 1288 """ 1289 HPATH = "disk-grow" 1290 HTYPE = constants.HTYPE_INSTANCE 1291 REQ_BGL = False 1292
1293 - def ExpandNames(self):
1294 self._ExpandAndLockInstance() 1295 self.needed_locks[locking.LEVEL_NODE] = [] 1296 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1297 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1298 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1299
1300 - def DeclareLocks(self, level):
1301 if level == locking.LEVEL_NODE: 1302 self._LockInstancesNodes() 1303 elif level == locking.LEVEL_NODE_RES: 1304 # Copy node locks 1305 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1306 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1307
1308 - def BuildHooksEnv(self):
1309 """Build hooks env. 1310 1311 This runs on the master, the primary and all the secondaries. 1312 1313 """ 1314 env = { 1315 "DISK": self.op.disk, 1316 "AMOUNT": self.op.amount, 1317 "ABSOLUTE": self.op.absolute, 1318 } 1319 env.update(BuildInstanceHookEnvByObject(self, self.instance)) 1320 return env
1321
1322 - def BuildHooksNodes(self):
1323 """Build hooks nodes. 1324 1325 """ 1326 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) 1327 return (nl, nl)
1328
1329 - def CheckPrereq(self):
1330 """Check prerequisites. 1331 1332 This checks that the instance is in the cluster. 1333 1334 """ 1335 instance = self.cfg.GetInstanceInfo(self.op.instance_name) 1336 assert instance is not None, \ 1337 "Cannot retrieve locked instance %s" % self.op.instance_name 1338 nodenames = list(instance.all_nodes) 1339 for node in nodenames: 1340 CheckNodeOnline(self, node) 1341 1342 self.instance = instance 1343 1344 if instance.disk_template not in constants.DTS_GROWABLE: 1345 raise errors.OpPrereqError("Instance's disk layout does not support" 1346 " growing", errors.ECODE_INVAL) 1347 1348 self.disk = instance.FindDisk(self.op.disk) 1349 1350 if self.op.absolute: 1351 self.target = self.op.amount 1352 self.delta = self.target - self.disk.size 1353 if self.delta < 0: 1354 raise errors.OpPrereqError("Requested size (%s) is smaller than " 1355 "current disk size (%s)" % 1356 (utils.FormatUnit(self.target, "h"), 1357 utils.FormatUnit(self.disk.size, "h")), 1358 errors.ECODE_STATE) 1359 else: 1360 self.delta = self.op.amount 1361 self.target = self.disk.size + self.delta 1362 if self.delta < 0: 1363 raise errors.OpPrereqError("Requested increment (%s) is negative" % 1364 utils.FormatUnit(self.delta, "h"), 1365 errors.ECODE_INVAL) 1366 1367 self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1368
1369 - def _CheckDiskSpace(self, nodenames, req_vgspace):
1370 template = self.instance.disk_template 1371 if template not in (constants.DTS_NO_FREE_SPACE_CHECK): 1372 # TODO: check the free disk space for file, when that feature will be 1373 # supported 1374 nodes = map(self.cfg.GetNodeInfo, nodenames) 1375 es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n), 1376 nodes) 1377 if es_nodes: 1378 # With exclusive storage we need to something smarter than just looking 1379 # at free space; for now, let's simply abort the operation. 1380 raise errors.OpPrereqError("Cannot grow disks when exclusive_storage" 1381 " is enabled", errors.ECODE_STATE) 1382 CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1383
1384 - def Exec(self, feedback_fn):
1385 """Execute disk grow. 1386 1387 """ 1388 instance = self.instance 1389 disk = self.disk 1390 1391 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) 1392 assert (self.owned_locks(locking.LEVEL_NODE) == 1393 self.owned_locks(locking.LEVEL_NODE_RES)) 1394 1395 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks 1396 1397 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk]) 1398 if not disks_ok: 1399 raise errors.OpExecError("Cannot activate block device to grow") 1400 1401 feedback_fn("Growing disk %s of instance '%s' by %s to %s" % 1402 (self.op.disk, instance.name, 1403 utils.FormatUnit(self.delta, "h"), 1404 utils.FormatUnit(self.target, "h"))) 1405 1406 # First run all grow ops in dry-run mode 1407 for node in instance.all_nodes: 1408 self.cfg.SetDiskID(disk, node) 1409 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, 1410 True, True) 1411 result.Raise("Dry-run grow request failed to node %s" % node) 1412 1413 if wipe_disks: 1414 # Get disk size from primary node for wiping 1415 self.cfg.SetDiskID(disk, instance.primary_node) 1416 result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk]) 1417 result.Raise("Failed to retrieve disk size from node '%s'" % 1418 instance.primary_node) 1419 1420 (disk_size_in_bytes, ) = result.payload 1421 1422 if disk_size_in_bytes is None: 1423 raise errors.OpExecError("Failed to retrieve disk size from primary" 1424 " node '%s'" % instance.primary_node) 1425 1426 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes) 1427 1428 assert old_disk_size >= disk.size, \ 1429 ("Retrieved disk size too small (got %s, should be at least %s)" % 1430 (old_disk_size, disk.size)) 1431 else: 1432 old_disk_size = None 1433 1434 # We know that (as far as we can test) operations across different 1435 # nodes will succeed, time to run it for real on the backing storage 1436 for node in instance.all_nodes: 1437 self.cfg.SetDiskID(disk, node) 1438 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, 1439 False, True) 1440 result.Raise("Grow request failed to node %s" % node) 1441 1442 # And now execute it for logical storage, on the primary node 1443 node = instance.primary_node 1444 self.cfg.SetDiskID(disk, node) 1445 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, 1446 False, False) 1447 result.Raise("Grow request failed to node %s" % node) 1448 1449 disk.RecordGrow(self.delta) 1450 self.cfg.Update(instance, feedback_fn) 1451 1452 # Changes have been recorded, release node lock 1453 ReleaseLocks(self, locking.LEVEL_NODE) 1454 1455 # Downgrade lock while waiting for sync 1456 self.glm.downgrade(locking.LEVEL_INSTANCE) 1457 1458 assert wipe_disks ^ (old_disk_size is None) 1459 1460 if wipe_disks: 1461 assert instance.disks[self.op.disk] == disk 1462 1463 # Wipe newly added disk space 1464 WipeDisks(self, instance, 1465 disks=[(self.op.disk, disk, old_disk_size)]) 1466 1467 if self.op.wait_for_sync: 1468 disk_abort = not WaitForSync(self, instance, disks=[disk]) 1469 if disk_abort: 1470 self.LogWarning("Disk syncing has not returned a good status; check" 1471 " the instance") 1472 if not instance.disks_active: 1473 _SafeShutdownInstanceDisks(self, instance, disks=[disk]) 1474 elif not instance.disks_active: 1475 self.LogWarning("Not shutting down the disk even if the instance is" 1476 " not supposed to be running because no wait for" 1477 " sync mode was requested") 1478 1479 assert self.owned_locks(locking.LEVEL_NODE_RES) 1480 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1481
1482 1483 -class LUInstanceReplaceDisks(LogicalUnit):
1484 """Replace the disks of an instance. 1485 1486 """ 1487 HPATH = "mirrors-replace" 1488 HTYPE = constants.HTYPE_INSTANCE 1489 REQ_BGL = False 1490
1491 - def CheckArguments(self):
1492 """Check arguments. 1493 1494 """ 1495 remote_node = self.op.remote_node 1496 ialloc = self.op.iallocator 1497 if self.op.mode == constants.REPLACE_DISK_CHG: 1498 if remote_node is None and ialloc is None: 1499 raise errors.OpPrereqError("When changing the secondary either an" 1500 " iallocator script must be used or the" 1501 " new node given", errors.ECODE_INVAL) 1502 else: 1503 CheckIAllocatorOrNode(self, "iallocator", "remote_node") 1504 1505 elif remote_node is not None or ialloc is not None: 1506 # Not replacing the secondary 1507 raise errors.OpPrereqError("The iallocator and new node options can" 1508 " only be used when changing the" 1509 " secondary node", errors.ECODE_INVAL)
1510
1511 - def ExpandNames(self):
1512 self._ExpandAndLockInstance() 1513 1514 assert locking.LEVEL_NODE not in self.needed_locks 1515 assert locking.LEVEL_NODE_RES not in self.needed_locks 1516 assert locking.LEVEL_NODEGROUP not in self.needed_locks 1517 1518 assert self.op.iallocator is None or self.op.remote_node is None, \ 1519 "Conflicting options" 1520 1521 if self.op.remote_node is not None: 1522 self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node) 1523 1524 # Warning: do not remove the locking of the new secondary here 1525 # unless DRBD8.AddChildren is changed to work in parallel; 1526 # currently it doesn't since parallel invocations of 1527 # FindUnusedMinor will conflict 1528 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node] 1529 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 1530 else: 1531 self.needed_locks[locking.LEVEL_NODE] = [] 1532 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1533 1534 if self.op.iallocator is not None: 1535 # iallocator will select a new node in the same group 1536 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 1537 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 1538 1539 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1540 1541 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, 1542 self.op.iallocator, self.op.remote_node, 1543 self.op.disks, self.op.early_release, 1544 self.op.ignore_ipolicy) 1545 1546 self.tasklets = [self.replacer]
1547
1548 - def DeclareLocks(self, level):
1549 if level == locking.LEVEL_NODEGROUP: 1550 assert self.op.remote_node is None 1551 assert self.op.iallocator is not None 1552 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 1553 1554 self.share_locks[locking.LEVEL_NODEGROUP] = 1 1555 # Lock all groups used by instance optimistically; this requires going 1556 # via the node before it's locked, requiring verification later on 1557 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1558 self.cfg.GetInstanceNodeGroups(self.op.instance_name) 1559 1560 elif level == locking.LEVEL_NODE: 1561 if self.op.iallocator is not None: 1562 assert self.op.remote_node is None 1563 assert not self.needed_locks[locking.LEVEL_NODE] 1564 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) 1565 1566 # Lock member nodes of all locked groups 1567 self.needed_locks[locking.LEVEL_NODE] = \ 1568 [node_name 1569 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 1570 for node_name in self.cfg.GetNodeGroup(group_uuid).members] 1571 else: 1572 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) 1573 1574 self._LockInstancesNodes() 1575 1576 elif level == locking.LEVEL_NODE_RES: 1577 # Reuse node locks 1578 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1579 self.needed_locks[locking.LEVEL_NODE]
1580
1581 - def BuildHooksEnv(self):
1582 """Build hooks env. 1583 1584 This runs on the master, the primary and all the secondaries. 1585 1586 """ 1587 instance = self.replacer.instance 1588 env = { 1589 "MODE": self.op.mode, 1590 "NEW_SECONDARY": self.op.remote_node, 1591 "OLD_SECONDARY": instance.secondary_nodes[0], 1592 } 1593 env.update(BuildInstanceHookEnvByObject(self, instance)) 1594 return env
1595
1596 - def BuildHooksNodes(self):
1597 """Build hooks nodes. 1598 1599 """ 1600 instance = self.replacer.instance 1601 nl = [ 1602 self.cfg.GetMasterNode(), 1603 instance.primary_node, 1604 ] 1605 if self.op.remote_node is not None: 1606 nl.append(self.op.remote_node) 1607 return nl, nl
1608
1609 - def CheckPrereq(self):
1610 """Check prerequisites. 1611 1612 """ 1613 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or 1614 self.op.iallocator is None) 1615 1616 # Verify if node group locks are still correct 1617 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 1618 if owned_groups: 1619 CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups) 1620 1621 return LogicalUnit.CheckPrereq(self)
1622
1623 1624 -class LUInstanceActivateDisks(NoHooksLU):
1625 """Bring up an instance's disks. 1626 1627 """ 1628 REQ_BGL = False 1629
1630 - def ExpandNames(self):
1631 self._ExpandAndLockInstance() 1632 self.needed_locks[locking.LEVEL_NODE] = [] 1633 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1634
1635 - def DeclareLocks(self, level):
1636 if level == locking.LEVEL_NODE: 1637 self._LockInstancesNodes()
1638
1639 - def CheckPrereq(self):
1640 """Check prerequisites. 1641 1642 This checks that the instance is in the cluster. 1643 1644 """ 1645 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) 1646 assert self.instance is not None, \ 1647 "Cannot retrieve locked instance %s" % self.op.instance_name 1648 CheckNodeOnline(self, self.instance.primary_node)
1649
1650 - def Exec(self, feedback_fn):
1651 """Activate the disks. 1652 1653 """ 1654 disks_ok, disks_info = \ 1655 AssembleInstanceDisks(self, self.instance, 1656 ignore_size=self.op.ignore_size) 1657 if not disks_ok: 1658 raise errors.OpExecError("Cannot activate block devices") 1659 1660 if self.op.wait_for_sync: 1661 if not WaitForSync(self, self.instance): 1662 self.cfg.MarkInstanceDisksInactive(self.instance.name) 1663 raise errors.OpExecError("Some disks of the instance are degraded!") 1664 1665 return disks_info
1666
1667 1668 -class LUInstanceDeactivateDisks(NoHooksLU):
1669 """Shutdown an instance's disks. 1670 1671 """ 1672 REQ_BGL = False 1673
1674 - def ExpandNames(self):
1675 self._ExpandAndLockInstance() 1676 self.needed_locks[locking.LEVEL_NODE] = [] 1677 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1678
1679 - def DeclareLocks(self, level):
1680 if level == locking.LEVEL_NODE: 1681 self._LockInstancesNodes()
1682
1683 - def CheckPrereq(self):
1684 """Check prerequisites. 1685 1686 This checks that the instance is in the cluster. 1687 1688 """ 1689 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) 1690 assert self.instance is not None, \ 1691 "Cannot retrieve locked instance %s" % self.op.instance_name
1692
1693 - def Exec(self, feedback_fn):
1694 """Deactivate the disks 1695 1696 """ 1697 instance = self.instance 1698 if self.op.force: 1699 ShutdownInstanceDisks(self, instance) 1700 else: 1701 _SafeShutdownInstanceDisks(self, instance)
1702
1703 1704 -def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary, 1705 ldisk=False):
1706 """Check that mirrors are not degraded. 1707 1708 @attention: The device has to be annotated already. 1709 1710 The ldisk parameter, if True, will change the test from the 1711 is_degraded attribute (which represents overall non-ok status for 1712 the device(s)) to the ldisk (representing the local storage status). 1713 1714 """ 1715 lu.cfg.SetDiskID(dev, node) 1716 1717 result = True 1718 1719 if on_primary or dev.AssembleOnSecondary(): 1720 rstats = lu.rpc.call_blockdev_find(node, dev) 1721 msg = rstats.fail_msg 1722 if msg: 1723 lu.LogWarning("Can't find disk on node %s: %s", node, msg) 1724 result = False 1725 elif not rstats.payload: 1726 lu.LogWarning("Can't find disk on node %s", node) 1727 result = False 1728 else: 1729 if ldisk: 1730 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY 1731 else: 1732 result = result and not rstats.payload.is_degraded 1733 1734 if dev.children: 1735 for child in dev.children: 1736 result = result and _CheckDiskConsistencyInner(lu, instance, child, node, 1737 on_primary) 1738 1739 return result
1740
1741 1742 -def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1743 """Wrapper around L{_CheckDiskConsistencyInner}. 1744 1745 """ 1746 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 1747 return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary, 1748 ldisk=ldisk)
1749
1750 1751 -def _BlockdevFind(lu, node, dev, instance):
1752 """Wrapper around call_blockdev_find to annotate diskparams. 1753 1754 @param lu: A reference to the lu object 1755 @param node: The node to call out 1756 @param dev: The device to find 1757 @param instance: The instance object the device belongs to 1758 @returns The result of the rpc call 1759 1760 """ 1761 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 1762 return lu.rpc.call_blockdev_find(node, disk)
1763
1764 1765 -def _GenerateUniqueNames(lu, exts):
1766 """Generate a suitable LV name. 1767 1768 This will generate a logical volume name for the given instance. 1769 1770 """ 1771 results = [] 1772 for val in exts: 1773 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 1774 results.append("%s%s" % (new_id, val)) 1775 return results
1776
1777 1778 -class TLReplaceDisks(Tasklet):
1779 """Replaces disks for an instance. 1780 1781 Note: Locking is not within the scope of this class. 1782 1783 """
1784 - def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, 1785 disks, early_release, ignore_ipolicy):
1786 """Initializes this class. 1787 1788 """ 1789 Tasklet.__init__(self, lu) 1790 1791 # Parameters 1792 self.instance_name = instance_name 1793 self.mode = mode 1794 self.iallocator_name = iallocator_name 1795 self.remote_node = remote_node 1796 self.disks = disks 1797 self.early_release = early_release 1798 self.ignore_ipolicy = ignore_ipolicy 1799 1800 # Runtime data 1801 self.instance = None 1802 self.new_node = None 1803 self.target_node = None 1804 self.other_node = None 1805 self.remote_node_info = None 1806 self.node_secondary_ip = None
1807 1808 @staticmethod
1809 - def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1810 """Compute a new secondary node using an IAllocator. 1811 1812 """ 1813 req = iallocator.IAReqRelocate(name=instance_name, 1814 relocate_from=list(relocate_from)) 1815 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req) 1816 1817 ial.Run(iallocator_name) 1818 1819 if not ial.success: 1820 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 1821 " %s" % (iallocator_name, ial.info), 1822 errors.ECODE_NORES) 1823 1824 remote_node_name = ial.result[0] 1825 1826 lu.LogInfo("Selected new secondary for instance '%s': %s", 1827 instance_name, remote_node_name) 1828 1829 return remote_node_name
1830
1831 - def _FindFaultyDisks(self, node_name):
1832 """Wrapper for L{FindFaultyInstanceDisks}. 1833 1834 """ 1835 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, 1836 node_name, True)
1837
1838 - def _CheckDisksActivated(self, instance):
1839 """Checks if the instance disks are activated. 1840 1841 @param instance: The instance to check disks 1842 @return: True if they are activated, False otherwise 1843 1844 """ 1845 nodes = instance.all_nodes 1846 1847 for idx, dev in enumerate(instance.disks): 1848 for node in nodes: 1849 self.lu.LogInfo("Checking disk/%d on %s", idx, node) 1850 self.cfg.SetDiskID(dev, node) 1851 1852 result = _BlockdevFind(self, node, dev, instance) 1853 1854 if result.offline: 1855 continue 1856 elif result.fail_msg or not result.payload: 1857 return False 1858 1859 return True
1860
1861 - def CheckPrereq(self):
1862 """Check prerequisites. 1863 1864 This checks that the instance is in the cluster. 1865 1866 """ 1867 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name) 1868 assert instance is not None, \ 1869 "Cannot retrieve locked instance %s" % self.instance_name 1870 1871 if instance.disk_template != constants.DT_DRBD8: 1872 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" 1873 " instances", errors.ECODE_INVAL) 1874 1875 if len(instance.secondary_nodes) != 1: 1876 raise errors.OpPrereqError("The instance has a strange layout," 1877 " expected one secondary but found %d" % 1878 len(instance.secondary_nodes), 1879 errors.ECODE_FAULT) 1880 1881 instance = self.instance 1882 secondary_node = instance.secondary_nodes[0] 1883 1884 if self.iallocator_name is None: 1885 remote_node = self.remote_node 1886 else: 1887 remote_node = self._RunAllocator(self.lu, self.iallocator_name, 1888 instance.name, instance.secondary_nodes) 1889 1890 if remote_node is None: 1891 self.remote_node_info = None 1892 else: 1893 assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \ 1894 "Remote node '%s' is not locked" % remote_node 1895 1896 self.remote_node_info = self.cfg.GetNodeInfo(remote_node) 1897 assert self.remote_node_info is not None, \ 1898 "Cannot retrieve locked node %s" % remote_node 1899 1900 if remote_node == self.instance.primary_node: 1901 raise errors.OpPrereqError("The specified node is the primary node of" 1902 " the instance", errors.ECODE_INVAL) 1903 1904 if remote_node == secondary_node: 1905 raise errors.OpPrereqError("The specified node is already the" 1906 " secondary node of the instance", 1907 errors.ECODE_INVAL) 1908 1909 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, 1910 constants.REPLACE_DISK_CHG): 1911 raise errors.OpPrereqError("Cannot specify disks to be replaced", 1912 errors.ECODE_INVAL) 1913 1914 if self.mode == constants.REPLACE_DISK_AUTO: 1915 if not self._CheckDisksActivated(instance): 1916 raise errors.OpPrereqError("Please run activate-disks on instance %s" 1917 " first" % self.instance_name, 1918 errors.ECODE_STATE) 1919 faulty_primary = self._FindFaultyDisks(instance.primary_node) 1920 faulty_secondary = self._FindFaultyDisks(secondary_node) 1921 1922 if faulty_primary and faulty_secondary: 1923 raise errors.OpPrereqError("Instance %s has faulty disks on more than" 1924 " one node and can not be repaired" 1925 " automatically" % self.instance_name, 1926 errors.ECODE_STATE) 1927 1928 if faulty_primary: 1929 self.disks = faulty_primary 1930 self.target_node = instance.primary_node 1931 self.other_node = secondary_node 1932 check_nodes = [self.target_node, self.other_node] 1933 elif faulty_secondary: 1934 self.disks = faulty_secondary 1935 self.target_node = secondary_node 1936 self.other_node = instance.primary_node 1937 check_nodes = [self.target_node, self.other_node] 1938 else: 1939 self.disks = [] 1940 check_nodes = [] 1941 1942 else: 1943 # Non-automatic modes 1944 if self.mode == constants.REPLACE_DISK_PRI: 1945 self.target_node = instance.primary_node 1946 self.other_node = secondary_node 1947 check_nodes = [self.target_node, self.other_node] 1948 1949 elif self.mode == constants.REPLACE_DISK_SEC: 1950 self.target_node = secondary_node 1951 self.other_node = instance.primary_node 1952 check_nodes = [self.target_node, self.other_node] 1953 1954 elif self.mode == constants.REPLACE_DISK_CHG: 1955 self.new_node = remote_node 1956 self.other_node = instance.primary_node 1957 self.target_node = secondary_node 1958 check_nodes = [self.new_node, self.other_node] 1959 1960 CheckNodeNotDrained(self.lu, remote_node) 1961 CheckNodeVmCapable(self.lu, remote_node) 1962 1963 old_node_info = self.cfg.GetNodeInfo(secondary_node) 1964 assert old_node_info is not None 1965 if old_node_info.offline and not self.early_release: 1966 # doesn't make sense to delay the release 1967 self.early_release = True 1968 self.lu.LogInfo("Old secondary %s is offline, automatically enabling" 1969 " early-release mode", secondary_node) 1970 1971 else: 1972 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % 1973 self.mode) 1974 1975 # If not specified all disks should be replaced 1976 if not self.disks: 1977 self.disks = range(len(self.instance.disks)) 1978 1979 # TODO: This is ugly, but right now we can't distinguish between internal 1980 # submitted opcode and external one. We should fix that. 1981 if self.remote_node_info: 1982 # We change the node, lets verify it still meets instance policy 1983 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) 1984 cluster = self.cfg.GetClusterInfo() 1985 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 1986 new_group_info) 1987 CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info, 1988 self.cfg, ignore=self.ignore_ipolicy) 1989 1990 for node in check_nodes: 1991 CheckNodeOnline(self.lu, node) 1992 1993 touched_nodes = frozenset(node_name for node_name in [self.new_node, 1994 self.other_node, 1995 self.target_node] 1996 if node_name is not None) 1997 1998 # Release unneeded node and node resource locks 1999 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) 2000 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) 2001 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) 2002 2003 # Release any owned node group 2004 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) 2005 2006 # Check whether disks are valid 2007 for disk_idx in self.disks: 2008 instance.FindDisk(disk_idx) 2009 2010 # Get secondary node IP addresses 2011 self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node) 2012 in self.cfg.GetMultiNodeInfo(touched_nodes))
2013
2014 - def Exec(self, feedback_fn):
2015 """Execute disk replacement. 2016 2017 This dispatches the disk replacement to the appropriate handler. 2018 2019 """ 2020 if __debug__: 2021 # Verify owned locks before starting operation 2022 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) 2023 assert set(owned_nodes) == set(self.node_secondary_ip), \ 2024 ("Incorrect node locks, owning %s, expected %s" % 2025 (owned_nodes, self.node_secondary_ip.keys())) 2026 assert (self.lu.owned_locks(locking.LEVEL_NODE) == 2027 self.lu.owned_locks(locking.LEVEL_NODE_RES)) 2028 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) 2029 2030 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) 2031 assert list(owned_instances) == [self.instance_name], \ 2032 "Instance '%s' not locked" % self.instance_name 2033 2034 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ 2035 "Should not own any node group lock at this point" 2036 2037 if not self.disks: 2038 feedback_fn("No disks need replacement for instance '%s'" % 2039 self.instance.name) 2040 return 2041 2042 feedback_fn("Replacing disk(s) %s for instance '%s'" % 2043 (utils.CommaJoin(self.disks), self.instance.name)) 2044 feedback_fn("Current primary node: %s" % self.instance.primary_node) 2045 feedback_fn("Current seconary node: %s" % 2046 utils.CommaJoin(self.instance.secondary_nodes)) 2047 2048 activate_disks = not self.instance.disks_active 2049 2050 # Activate the instance disks if we're replacing them on a down instance 2051 if activate_disks: 2052 StartInstanceDisks(self.lu, self.instance, True) 2053 2054 try: 2055 # Should we replace the secondary node? 2056 if self.new_node is not None: 2057 fn = self._ExecDrbd8Secondary 2058 else: 2059 fn = self._ExecDrbd8DiskOnly 2060 2061 result = fn(feedback_fn) 2062 finally: 2063 # Deactivate the instance disks if we're replacing them on a 2064 # down instance 2065 if activate_disks: 2066 _SafeShutdownInstanceDisks(self.lu, self.instance) 2067 2068 assert not self.lu.owned_locks(locking.LEVEL_NODE) 2069 2070 if __debug__: 2071 # Verify owned locks 2072 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES) 2073 nodes = frozenset(self.node_secondary_ip) 2074 assert ((self.early_release and not owned_nodes) or 2075 (not self.early_release and not (set(owned_nodes) - nodes))), \ 2076 ("Not owning the correct locks, early_release=%s, owned=%r," 2077 " nodes=%r" % (self.early_release, owned_nodes, nodes)) 2078 2079 return result
2080
2081 - def _CheckVolumeGroup(self, nodes):
2082 self.lu.LogInfo("Checking volume groups") 2083 2084 vgname = self.cfg.GetVGName() 2085 2086 # Make sure volume group exists on all involved nodes 2087 results = self.rpc.call_vg_list(nodes) 2088 if not results: 2089 raise errors.OpExecError("Can't list volume groups on the nodes") 2090 2091 for node in nodes: 2092 res = results[node] 2093 res.Raise("Error checking node %s" % node) 2094 if vgname not in res.payload: 2095 raise errors.OpExecError("Volume group '%s' not found on node %s" % 2096 (vgname, node))
2097
2098 - def _CheckDisksExistence(self, nodes):
2099 # Check disk existence 2100 for idx, dev in enumerate(self.instance.disks): 2101 if idx not in self.disks: 2102 continue 2103 2104 for node in nodes: 2105 self.lu.LogInfo("Checking disk/%d on %s", idx, node) 2106 self.cfg.SetDiskID(dev, node) 2107 2108 result = _BlockdevFind(self, node, dev, self.instance) 2109 2110 msg = result.fail_msg 2111 if msg or not result.payload: 2112 if not msg: 2113 msg = "disk not found" 2114 if not self._CheckDisksActivated(self.instance): 2115 extra_hint = ("\nDisks seem to be not properly activated. Try" 2116 " running activate-disks on the instance before" 2117 " using replace-disks.") 2118 else: 2119 extra_hint = "" 2120 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" % 2121 (idx, node, msg, extra_hint))
2122
2123 - def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2124 for idx, dev in enumerate(self.instance.disks): 2125 if idx not in self.disks: 2126 continue 2127 2128 self.lu.LogInfo("Checking disk/%d consistency on node %s" % 2129 (idx, node_name)) 2130 2131 if not CheckDiskConsistency(self.lu, self.instance, dev, node_name, 2132 on_primary, ldisk=ldisk): 2133 raise errors.OpExecError("Node %s has degraded storage, unsafe to" 2134 " replace disks for instance %s" % 2135 (node_name, self.instance.name))
2136
2137 - def _CreateNewStorage(self, node_name):
2138 """Create new storage on the primary or secondary node. 2139 2140 This is only used for same-node replaces, not for changing the 2141 secondary node, hence we don't want to modify the existing disk. 2142 2143 """ 2144 iv_names = {} 2145 2146 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) 2147 for idx, dev in enumerate(disks): 2148 if idx not in self.disks: 2149 continue 2150 2151 self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx) 2152 2153 self.cfg.SetDiskID(dev, node_name) 2154 2155 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] 2156 names = _GenerateUniqueNames(self.lu, lv_names) 2157 2158 (data_disk, meta_disk) = dev.children 2159 vg_data = data_disk.logical_id[0] 2160 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, 2161 logical_id=(vg_data, names[0]), 2162 params=data_disk.params) 2163 vg_meta = meta_disk.logical_id[0] 2164 lv_meta = objects.Disk(dev_type=constants.LD_LV, 2165 size=constants.DRBD_META_SIZE, 2166 logical_id=(vg_meta, names[1]), 2167 params=meta_disk.params) 2168 2169 new_lvs = [lv_data, lv_meta] 2170 old_lvs = [child.Copy() for child in dev.children] 2171 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) 2172 excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name) 2173 2174 # we pass force_create=True to force the LVM creation 2175 for new_lv in new_lvs: 2176 try: 2177 _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True, 2178 GetInstanceInfoText(self.instance), False, 2179 excl_stor) 2180 except errors.DeviceCreationError, e: 2181 raise errors.OpExecError("Can't create block device: %s" % e.message) 2182 2183 return iv_names
2184
2185 - def _CheckDevices(self, node_name, iv_names):
2186 for name, (dev, _, _) in iv_names.iteritems(): 2187 self.cfg.SetDiskID(dev, node_name) 2188 2189 result = _BlockdevFind(self, node_name, dev, self.instance) 2190 2191 msg = result.fail_msg 2192 if msg or not result.payload: 2193 if not msg: 2194 msg = "disk not found" 2195 raise errors.OpExecError("Can't find DRBD device %s: %s" % 2196 (name, msg)) 2197 2198 if result.payload.is_degraded: 2199 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2200
2201 - def _RemoveOldStorage(self, node_name, iv_names):
2202 for name, (_, old_lvs, _) in iv_names.iteritems(): 2203 self.lu.LogInfo("Remove logical volumes for %s", name) 2204 2205 for lv in old_lvs: 2206 self.cfg.SetDiskID(lv, node_name) 2207 2208 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg 2209 if msg: 2210 self.lu.LogWarning("Can't remove old LV: %s", msg, 2211 hint="remove unused LVs manually")
2212
2213 - def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2214 """Replace a disk on the primary or secondary for DRBD 8. 2215 2216 The algorithm for replace is quite complicated: 2217 2218 1. for each disk to be replaced: 2219 2220 1. create new LVs on the target node with unique names 2221 1. detach old LVs from the drbd device 2222 1. rename old LVs to name_replaced.<time_t> 2223 1. rename new LVs to old LVs 2224 1. attach the new LVs (with the old names now) to the drbd device 2225 2226 1. wait for sync across all devices 2227 2228 1. for each modified disk: 2229 2230 1. remove old LVs (which have the name name_replaces.<time_t>) 2231 2232 Failures are not very well handled. 2233 2234 """ 2235 steps_total = 6 2236 2237 # Step: check device activation 2238 self.lu.LogStep(1, steps_total, "Check device existence") 2239 self._CheckDisksExistence([self.other_node, self.target_node]) 2240 self._CheckVolumeGroup([self.target_node, self.other_node]) 2241 2242 # Step: check other node consistency 2243 self.lu.LogStep(2, steps_total, "Check peer consistency") 2244 self._CheckDisksConsistency(self.other_node, 2245 self.other_node == self.instance.primary_node, 2246 False) 2247 2248 # Step: create new storage 2249 self.lu.LogStep(3, steps_total, "Allocate new storage") 2250 iv_names = self._CreateNewStorage(self.target_node) 2251 2252 # Step: for each lv, detach+rename*2+attach 2253 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2254 for dev, old_lvs, new_lvs in iv_names.itervalues(): 2255 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name) 2256 2257 result = self.rpc.call_blockdev_removechildren(self.target_node, dev, 2258 old_lvs) 2259 result.Raise("Can't detach drbd from local storage on node" 2260 " %s for device %s" % (self.target_node, dev.iv_name)) 2261 #dev.children = [] 2262 #cfg.Update(instance) 2263 2264 # ok, we created the new LVs, so now we know we have the needed 2265 # storage; as such, we proceed on the target node to rename 2266 # old_lv to _old, and new_lv to old_lv; note that we rename LVs 2267 # using the assumption that logical_id == physical_id (which in 2268 # turn is the unique_id on that node) 2269 2270 # FIXME(iustin): use a better name for the replaced LVs 2271 temp_suffix = int(time.time()) 2272 ren_fn = lambda d, suff: (d.physical_id[0], 2273 d.physical_id[1] + "_replaced-%s" % suff) 2274 2275 # Build the rename list based on what LVs exist on the node 2276 rename_old_to_new = [] 2277 for to_ren in old_lvs: 2278 result = self.rpc.call_blockdev_find(self.target_node, to_ren) 2279 if not result.fail_msg and result.payload: 2280 # device exists 2281 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) 2282 2283 self.lu.LogInfo("Renaming the old LVs on the target node") 2284 result = self.rpc.call_blockdev_rename(self.target_node, 2285 rename_old_to_new) 2286 result.Raise("Can't rename old LVs on node %s" % self.target_node) 2287 2288 # Now we rename the new LVs to the old LVs 2289 self.lu.LogInfo("Renaming the new LVs on the target node") 2290 rename_new_to_old = [(new, old.physical_id) 2291 for old, new in zip(old_lvs, new_lvs)] 2292 result = self.rpc.call_blockdev_rename(self.target_node, 2293 rename_new_to_old) 2294 result.Raise("Can't rename new LVs on node %s" % self.target_node) 2295 2296 # Intermediate steps of in memory modifications 2297 for old, new in zip(old_lvs, new_lvs): 2298 new.logical_id = old.logical_id 2299 self.cfg.SetDiskID(new, self.target_node) 2300 2301 # We need to modify old_lvs so that removal later removes the 2302 # right LVs, not the newly added ones; note that old_lvs is a 2303 # copy here 2304 for disk in old_lvs: 2305 disk.logical_id = ren_fn(disk, temp_suffix) 2306 self.cfg.SetDiskID(disk, self.target_node) 2307 2308 # Now that the new lvs have the old name, we can add them to the device 2309 self.lu.LogInfo("Adding new mirror component on %s", self.target_node) 2310 result = self.rpc.call_blockdev_addchildren(self.target_node, 2311 (dev, self.instance), new_lvs) 2312 msg = result.fail_msg 2313 if msg: 2314 for new_lv in new_lvs: 2315 msg2 = self.rpc.call_blockdev_remove(self.target_node, 2316 new_lv).fail_msg 2317 if msg2: 2318 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, 2319 hint=("cleanup manually the unused logical" 2320 "volumes")) 2321 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) 2322 2323 cstep = itertools.count(5) 2324 2325 if self.early_release: 2326 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2327 self._RemoveOldStorage(self.target_node, iv_names) 2328 # TODO: Check if releasing locks early still makes sense 2329 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2330 else: 2331 # Release all resource locks except those used by the instance 2332 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2333 keep=self.node_secondary_ip.keys()) 2334 2335 # Release all node locks while waiting for sync 2336 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2337 2338 # TODO: Can the instance lock be downgraded here? Take the optional disk 2339 # shutdown in the caller into consideration. 2340 2341 # Wait for sync 2342 # This can fail as the old devices are degraded and _WaitForSync 2343 # does a combined result over all disks, so we don't check its return value 2344 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2345 WaitForSync(self.lu, self.instance) 2346 2347 # Check all devices manually 2348 self._CheckDevices(self.instance.primary_node, iv_names) 2349 2350 # Step: remove old storage 2351 if not self.early_release: 2352 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2353 self._RemoveOldStorage(self.target_node, iv_names)
2354
2355 - def _ExecDrbd8Secondary(self, feedback_fn):
2356 """Replace the secondary node for DRBD 8. 2357 2358 The algorithm for replace is quite complicated: 2359 - for all disks of the instance: 2360 - create new LVs on the new node with same names 2361 - shutdown the drbd device on the old secondary 2362 - disconnect the drbd network on the primary 2363 - create the drbd device on the new secondary 2364 - network attach the drbd on the primary, using an artifice: 2365 the drbd code for Attach() will connect to the network if it 2366 finds a device which is connected to the good local disks but 2367 not network enabled 2368 - wait for sync across all devices 2369 - remove all disks from the old secondary 2370 2371 Failures are not very well handled. 2372 2373 """ 2374 steps_total = 6 2375 2376 pnode = self.instance.primary_node 2377 2378 # Step: check device activation 2379 self.lu.LogStep(1, steps_total, "Check device existence") 2380 self._CheckDisksExistence([self.instance.primary_node]) 2381 self._CheckVolumeGroup([self.instance.primary_node]) 2382 2383 # Step: check other node consistency 2384 self.lu.LogStep(2, steps_total, "Check peer consistency") 2385 self._CheckDisksConsistency(self.instance.primary_node, True, True) 2386 2387 # Step: create new storage 2388 self.lu.LogStep(3, steps_total, "Allocate new storage") 2389 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) 2390 excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node) 2391 for idx, dev in enumerate(disks): 2392 self.lu.LogInfo("Adding new local storage on %s for disk/%d" % 2393 (self.new_node, idx)) 2394 # we pass force_create=True to force LVM creation 2395 for new_lv in dev.children: 2396 try: 2397 _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv, 2398 True, GetInstanceInfoText(self.instance), False, 2399 excl_stor) 2400 except errors.DeviceCreationError, e: 2401 raise errors.OpExecError("Can't create block device: %s" % e.message) 2402 2403 # Step 4: dbrd minors and drbd setups changes 2404 # after this, we must manually remove the drbd minors on both the 2405 # error and the success paths 2406 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2407 minors = self.cfg.AllocateDRBDMinor([self.new_node 2408 for dev in self.instance.disks], 2409 self.instance.name) 2410 logging.debug("Allocated minors %r", minors) 2411 2412 iv_names = {} 2413 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)): 2414 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % 2415 (self.new_node, idx)) 2416 # create new devices on new_node; note that we create two IDs: 2417 # one without port, so the drbd will be activated without 2418 # networking information on the new node at this stage, and one 2419 # with network, for the latter activation in step 4 2420 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id 2421 if self.instance.primary_node == o_node1: 2422 p_minor = o_minor1 2423 else: 2424 assert self.instance.primary_node == o_node2, "Three-node instance?" 2425 p_minor = o_minor2 2426 2427 new_alone_id = (self.instance.primary_node, self.new_node, None, 2428 p_minor, new_minor, o_secret) 2429 new_net_id = (self.instance.primary_node, self.new_node, o_port, 2430 p_minor, new_minor, o_secret) 2431 2432 iv_names[idx] = (dev, dev.children, new_net_id) 2433 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, 2434 new_net_id) 2435 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8, 2436 logical_id=new_alone_id, 2437 children=dev.children, 2438 size=dev.size, 2439 params={}) 2440 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd], 2441 self.cfg) 2442 try: 2443 CreateSingleBlockDev(self.lu, self.new_node, self.instance, 2444 anno_new_drbd, 2445 GetInstanceInfoText(self.instance), False, 2446 excl_stor) 2447 except errors.GenericError: 2448 self.cfg.ReleaseDRBDMinors(self.instance.name) 2449 raise 2450 2451 # We have new devices, shutdown the drbd on the old secondary 2452 for idx, dev in enumerate(self.instance.disks): 2453 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx) 2454 self.cfg.SetDiskID(dev, self.target_node) 2455 msg = self.rpc.call_blockdev_shutdown(self.target_node, 2456 (dev, self.instance)).fail_msg 2457 if msg: 2458 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" 2459 "node: %s" % (idx, msg), 2460 hint=("Please cleanup this device manually as" 2461 " soon as possible")) 2462 2463 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") 2464 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip, 2465 self.instance.disks)[pnode] 2466 2467 msg = result.fail_msg 2468 if msg: 2469 # detaches didn't succeed (unlikely) 2470 self.cfg.ReleaseDRBDMinors(self.instance.name) 2471 raise errors.OpExecError("Can't detach the disks from the network on" 2472 " old node: %s" % (msg,)) 2473 2474 # if we managed to detach at least one, we update all the disks of 2475 # the instance to point to the new secondary 2476 self.lu.LogInfo("Updating instance configuration") 2477 for dev, _, new_logical_id in iv_names.itervalues(): 2478 dev.logical_id = new_logical_id 2479 self.cfg.SetDiskID(dev, self.instance.primary_node) 2480 2481 self.cfg.Update(self.instance, feedback_fn) 2482 2483 # Release all node locks (the configuration has been updated) 2484 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2485 2486 # and now perform the drbd attach 2487 self.lu.LogInfo("Attaching primary drbds to new secondary" 2488 " (standalone => connected)") 2489 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, 2490 self.new_node], 2491 self.node_secondary_ip, 2492 (self.instance.disks, self.instance), 2493 self.instance.name, 2494 False) 2495 for to_node, to_result in result.items(): 2496 msg = to_result.fail_msg 2497 if msg: 2498 self.lu.LogWarning("Can't attach drbd disks on node %s: %s", 2499 to_node, msg, 2500 hint=("please do a gnt-instance info to see the" 2501 " status of disks")) 2502 2503 cstep = itertools.count(5) 2504 2505 if self.early_release: 2506 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2507 self._RemoveOldStorage(self.target_node, iv_names) 2508 # TODO: Check if releasing locks early still makes sense 2509 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2510 else: 2511 # Release all resource locks except those used by the instance 2512 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2513 keep=self.node_secondary_ip.keys()) 2514 2515 # TODO: Can the instance lock be downgraded here? Take the optional disk 2516 # shutdown in the caller into consideration. 2517 2518 # Wait for sync 2519 # This can fail as the old devices are degraded and _WaitForSync 2520 # does a combined result over all disks, so we don't check its return value 2521 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2522 WaitForSync(self.lu, self.instance) 2523 2524 # Check all devices manually 2525 self._CheckDevices(self.instance.primary_node, iv_names) 2526 2527 # Step: remove old storage 2528 if not self.early_release: 2529 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2530 self._RemoveOldStorage(self.target_node, iv_names)
2531