Package ganeti :: Package cmdlib :: Module instance_storage
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_storage

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with storage of instances.""" 
  32   
  33  import itertools 
  34  import logging 
  35  import os 
  36  import time 
  37   
  38  from ganeti import compat 
  39  from ganeti import constants 
  40  from ganeti import errors 
  41  from ganeti import ht 
  42  from ganeti import locking 
  43  from ganeti.masterd import iallocator 
  44  from ganeti import objects 
  45  from ganeti import utils 
  46  import ganeti.rpc.node as rpc 
  47  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet 
  48  from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \ 
  49    AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \ 
  50    ComputeIPolicyDiskSizesViolation, \ 
  51    CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \ 
  52    IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \ 
  53    CheckDiskTemplateEnabled 
  54  from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \ 
  55    CopyLockList, ReleaseLocks, CheckNodeVmCapable, \ 
  56    BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy 
  57   
  58  import ganeti.masterd.instance 
  59   
  60   
  61  _DISK_TEMPLATE_NAME_PREFIX = { 
  62    constants.DT_PLAIN: "", 
  63    constants.DT_RBD: ".rbd", 
  64    constants.DT_EXT: ".ext", 
  65    constants.DT_FILE: ".file", 
  66    constants.DT_SHARED_FILE: ".sharedfile", 
  67    } 
68 69 70 -def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open, 71 excl_stor):
72 """Create a single block device on a given node. 73 74 This will not recurse over children of the device, so they must be 75 created in advance. 76 77 @param lu: the lu on whose behalf we execute 78 @param node_uuid: the node on which to create the device 79 @type instance: L{objects.Instance} 80 @param instance: the instance which owns the device 81 @type device: L{objects.Disk} 82 @param device: the device to create 83 @param info: the extra 'metadata' we should attach to the device 84 (this will be represented as a LVM tag) 85 @type force_open: boolean 86 @param force_open: this parameter will be passes to the 87 L{backend.BlockdevCreate} function where it specifies 88 whether we run on primary or not, and it affects both 89 the child assembly and the device own Open() execution 90 @type excl_stor: boolean 91 @param excl_stor: Whether exclusive_storage is active for the node 92 93 """ 94 result = lu.rpc.call_blockdev_create(node_uuid, (device, instance), 95 device.size, instance.name, force_open, 96 info, excl_stor) 97 result.Raise("Can't create block device %s on" 98 " node %s for instance %s" % (device, 99 lu.cfg.GetNodeName(node_uuid), 100 instance.name))
101
102 103 -def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create, 104 info, force_open, excl_stor):
105 """Create a tree of block devices on a given node. 106 107 If this device type has to be created on secondaries, create it and 108 all its children. 109 110 If not, just recurse to children keeping the same 'force' value. 111 112 @attention: The device has to be annotated already. 113 114 @param lu: the lu on whose behalf we execute 115 @param node_uuid: the node on which to create the device 116 @type instance: L{objects.Instance} 117 @param instance: the instance which owns the device 118 @type device: L{objects.Disk} 119 @param device: the device to create 120 @type force_create: boolean 121 @param force_create: whether to force creation of this device; this 122 will be change to True whenever we find a device which has 123 CreateOnSecondary() attribute 124 @param info: the extra 'metadata' we should attach to the device 125 (this will be represented as a LVM tag) 126 @type force_open: boolean 127 @param force_open: this parameter will be passes to the 128 L{backend.BlockdevCreate} function where it specifies 129 whether we run on primary or not, and it affects both 130 the child assembly and the device own Open() execution 131 @type excl_stor: boolean 132 @param excl_stor: Whether exclusive_storage is active for the node 133 134 @return: list of created devices 135 """ 136 created_devices = [] 137 try: 138 if device.CreateOnSecondary(): 139 force_create = True 140 141 if device.children: 142 for child in device.children: 143 devs = _CreateBlockDevInner(lu, node_uuid, instance, child, 144 force_create, info, force_open, excl_stor) 145 created_devices.extend(devs) 146 147 if not force_create: 148 return created_devices 149 150 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open, 151 excl_stor) 152 # The device has been completely created, so there is no point in keeping 153 # its subdevices in the list. We just add the device itself instead. 154 created_devices = [(node_uuid, device)] 155 return created_devices 156 157 except errors.DeviceCreationError, e: 158 e.created_devices.extend(created_devices) 159 raise e 160 except errors.OpExecError, e: 161 raise errors.DeviceCreationError(str(e), created_devices)
162
163 164 -def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
165 """Whether exclusive_storage is in effect for the given node. 166 167 @type cfg: L{config.ConfigWriter} 168 @param cfg: The cluster configuration 169 @type node_uuid: string 170 @param node_uuid: The node UUID 171 @rtype: bool 172 @return: The effective value of exclusive_storage 173 @raise errors.OpPrereqError: if no node exists with the given name 174 175 """ 176 ni = cfg.GetNodeInfo(node_uuid) 177 if ni is None: 178 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid, 179 errors.ECODE_NOENT) 180 return IsExclusiveStorageEnabledNode(cfg, ni)
181
182 183 -def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info, 184 force_open):
185 """Wrapper around L{_CreateBlockDevInner}. 186 187 This method annotates the root device first. 188 189 """ 190 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg) 191 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid) 192 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info, 193 force_open, excl_stor)
194
195 196 -def _UndoCreateDisks(lu, disks_created, instance):
197 """Undo the work performed by L{CreateDisks}. 198 199 This function is called in case of an error to undo the work of 200 L{CreateDisks}. 201 202 @type lu: L{LogicalUnit} 203 @param lu: the logical unit on whose behalf we execute 204 @param disks_created: the result returned by L{CreateDisks} 205 @type instance: L{objects.Instance} 206 @param instance: the instance for which disks were created 207 208 """ 209 for (node_uuid, disk) in disks_created: 210 result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance)) 211 result.Warn("Failed to remove newly-created disk %s on node %s" % 212 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
213
214 215 -def CreateDisks(lu, instance, instance_disks=None, 216 to_skip=None, target_node_uuid=None, disks=None):
217 """Create all disks for an instance. 218 219 This abstracts away some work from AddInstance. 220 221 Since the instance may not have been saved to the config file yet, this 222 function can not query the config file for the instance's disks; in that 223 case they need to be passed as an argument. 224 225 @type lu: L{LogicalUnit} 226 @param lu: the logical unit on whose behalf we execute 227 @type instance: L{objects.Instance} 228 @param instance: the instance whose disks we should create 229 @type instance_disks: list of L{objects.Disk} 230 @param instance_disks: the disks that belong to the instance; if not 231 specified, retrieve them from config file 232 @type to_skip: list 233 @param to_skip: list of indices to skip 234 @type target_node_uuid: string 235 @param target_node_uuid: if passed, overrides the target node for creation 236 @type disks: list of {objects.Disk} 237 @param disks: the disks to create; if not specified, all the disks of the 238 instance are created 239 @return: information about the created disks, to be used to call 240 L{_UndoCreateDisks} 241 @raise errors.OpPrereqError: in case of error 242 243 """ 244 info = GetInstanceInfoText(instance) 245 if instance_disks is None: 246 instance_disks = lu.cfg.GetInstanceDisks(instance.uuid) 247 if target_node_uuid is None: 248 pnode_uuid = instance.primary_node 249 # We cannot use config's 'GetInstanceNodes' here as 'CreateDisks' 250 # is used by 'LUInstanceCreate' and the instance object is not 251 # stored in the config yet. 252 all_node_uuids = [] 253 for disk in instance_disks: 254 all_node_uuids.extend(disk.all_nodes) 255 all_node_uuids = set(all_node_uuids) 256 # ensure that primary node is always the first 257 all_node_uuids.discard(pnode_uuid) 258 all_node_uuids = [pnode_uuid] + list(all_node_uuids) 259 else: 260 pnode_uuid = target_node_uuid 261 all_node_uuids = [pnode_uuid] 262 263 if disks is None: 264 disks = instance_disks 265 266 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template) 267 268 if instance.disk_template in constants.DTS_FILEBASED: 269 file_storage_dir = os.path.dirname(instance_disks[0].logical_id[1]) 270 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir) 271 272 result.Raise("Failed to create directory '%s' on" 273 " node %s" % (file_storage_dir, 274 lu.cfg.GetNodeName(pnode_uuid))) 275 276 disks_created = [] 277 for idx, device in enumerate(disks): 278 if to_skip and idx in to_skip: 279 continue 280 logging.info("Creating disk %s for instance '%s'", idx, instance.name) 281 for node_uuid in all_node_uuids: 282 f_create = node_uuid == pnode_uuid 283 try: 284 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info, 285 f_create) 286 disks_created.append((node_uuid, device)) 287 except errors.DeviceCreationError, e: 288 logging.warning("Creating disk %s for instance '%s' failed", 289 idx, instance.name) 290 disks_created.extend(e.created_devices) 291 _UndoCreateDisks(lu, disks_created, instance) 292 raise errors.OpExecError(e.message) 293 return disks_created
294
295 296 -def ComputeDiskSizePerVG(disk_template, disks):
297 """Compute disk size requirements in the volume group 298 299 """ 300 def _compute(disks, payload): 301 """Universal algorithm. 302 303 """ 304 vgs = {} 305 for disk in disks: 306 vgs[disk[constants.IDISK_VG]] = \ 307 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload 308 309 return vgs
310 311 # Required free disk space as a function of disk and swap space 312 req_size_dict = { 313 constants.DT_DISKLESS: {}, 314 constants.DT_PLAIN: _compute(disks, 0), 315 # 128 MB are added for drbd metadata for each disk 316 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE), 317 constants.DT_FILE: {}, 318 constants.DT_SHARED_FILE: {}, 319 constants.DT_GLUSTER: {}, 320 } 321 322 if disk_template not in req_size_dict: 323 raise errors.ProgrammerError("Disk template '%s' size requirement" 324 " is unknown" % disk_template) 325 326 return req_size_dict[disk_template] 327
328 329 -def ComputeDisks(op, default_vg):
330 """Computes the instance disks. 331 332 @param op: The instance opcode 333 @param default_vg: The default_vg to assume 334 335 @return: The computed disks 336 337 """ 338 disks = [] 339 for disk in op.disks: 340 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR) 341 if mode not in constants.DISK_ACCESS_SET: 342 raise errors.OpPrereqError("Invalid disk access mode '%s'" % 343 mode, errors.ECODE_INVAL) 344 size = disk.get(constants.IDISK_SIZE, None) 345 if size is None: 346 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) 347 try: 348 size = int(size) 349 except (TypeError, ValueError): 350 raise errors.OpPrereqError("Invalid disk size '%s'" % size, 351 errors.ECODE_INVAL) 352 353 ext_provider = disk.get(constants.IDISK_PROVIDER, None) 354 if ext_provider and op.disk_template != constants.DT_EXT: 355 raise errors.OpPrereqError("The '%s' option is only valid for the %s" 356 " disk template, not %s" % 357 (constants.IDISK_PROVIDER, constants.DT_EXT, 358 op.disk_template), errors.ECODE_INVAL) 359 360 data_vg = disk.get(constants.IDISK_VG, default_vg) 361 name = disk.get(constants.IDISK_NAME, None) 362 if name is not None and name.lower() == constants.VALUE_NONE: 363 name = None 364 new_disk = { 365 constants.IDISK_SIZE: size, 366 constants.IDISK_MODE: mode, 367 constants.IDISK_VG: data_vg, 368 constants.IDISK_NAME: name, 369 } 370 371 for key in [ 372 constants.IDISK_METAVG, 373 constants.IDISK_ADOPT, 374 constants.IDISK_SPINDLES, 375 ]: 376 if key in disk: 377 new_disk[key] = disk[key] 378 379 # For extstorage, demand the `provider' option and add any 380 # additional parameters (ext-params) to the dict 381 if op.disk_template == constants.DT_EXT: 382 if ext_provider: 383 new_disk[constants.IDISK_PROVIDER] = ext_provider 384 for key in disk: 385 if key not in constants.IDISK_PARAMS: 386 new_disk[key] = disk[key] 387 else: 388 raise errors.OpPrereqError("Missing provider for template '%s'" % 389 constants.DT_EXT, errors.ECODE_INVAL) 390 391 disks.append(new_disk) 392 393 return disks
394
395 396 -def CheckRADOSFreeSpace():
397 """Compute disk size requirements inside the RADOS cluster. 398 399 """ 400 # For the RADOS cluster we assume there is always enough space. 401 pass
402
403 404 -def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names, 405 iv_name, p_minor, s_minor):
406 """Generate a drbd8 device complete with its children. 407 408 """ 409 assert len(vgnames) == len(names) == 2 410 port = lu.cfg.AllocatePort() 411 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) 412 413 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size, 414 logical_id=(vgnames[0], names[0]), 415 params={}) 416 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 417 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN, 418 size=constants.DRBD_META_SIZE, 419 logical_id=(vgnames[1], names[1]), 420 params={}) 421 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 422 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size, 423 logical_id=(primary_uuid, secondary_uuid, port, 424 p_minor, s_minor, 425 shared_secret), 426 children=[dev_data, dev_meta], 427 iv_name=iv_name, params={}) 428 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 429 return drbd_dev
430
431 432 -def GenerateDiskTemplate( 433 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids, 434 disk_info, file_storage_dir, file_driver, base_index, 435 feedback_fn, full_disk_params):
436 """Generate the entire disk layout for a given template type. 437 438 """ 439 vgname = lu.cfg.GetVGName() 440 disk_count = len(disk_info) 441 disks = [] 442 443 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name) 444 445 if template_name == constants.DT_DISKLESS: 446 pass 447 elif template_name == constants.DT_DRBD8: 448 if len(secondary_node_uuids) != 1: 449 raise errors.ProgrammerError("Wrong template configuration") 450 remote_node_uuid = secondary_node_uuids[0] 451 minors = lu.cfg.AllocateDRBDMinor( 452 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid) 453 454 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name, 455 full_disk_params) 456 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] 457 458 names = [] 459 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) 460 for i in range(disk_count)]): 461 names.append(lv_prefix + "_data") 462 names.append(lv_prefix + "_meta") 463 for idx, disk in enumerate(disk_info): 464 disk_index = idx + base_index 465 data_vg = disk.get(constants.IDISK_VG, vgname) 466 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg) 467 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid, 468 disk[constants.IDISK_SIZE], 469 [data_vg, meta_vg], 470 names[idx * 2:idx * 2 + 2], 471 "disk/%d" % disk_index, 472 minors[idx * 2], minors[idx * 2 + 1]) 473 disk_dev.mode = disk[constants.IDISK_MODE] 474 disk_dev.name = disk.get(constants.IDISK_NAME, None) 475 disks.append(disk_dev) 476 else: 477 if secondary_node_uuids: 478 raise errors.ProgrammerError("Wrong template configuration") 479 480 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None) 481 if name_prefix is None: 482 names = None 483 else: 484 names = _GenerateUniqueNames(lu, ["%s.disk%s" % 485 (name_prefix, base_index + i) 486 for i in range(disk_count)]) 487 488 if template_name == constants.DT_PLAIN: 489 490 def logical_id_fn(idx, _, disk): 491 vg = disk.get(constants.IDISK_VG, vgname) 492 return (vg, names[idx])
493 494 elif template_name == constants.DT_GLUSTER: 495 logical_id_fn = lambda _1, disk_index, _2: \ 496 (file_driver, "ganeti/%s.%d" % (instance_uuid, 497 disk_index)) 498 499 elif template_name in constants.DTS_FILEBASED: # Gluster handled above 500 logical_id_fn = \ 501 lambda _, disk_index, disk: (file_driver, 502 "%s/%s" % (file_storage_dir, 503 names[idx])) 504 elif template_name == constants.DT_BLOCK: 505 logical_id_fn = \ 506 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL, 507 disk[constants.IDISK_ADOPT]) 508 elif template_name == constants.DT_RBD: 509 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx]) 510 elif template_name == constants.DT_EXT: 511 def logical_id_fn(idx, _, disk): 512 provider = disk.get(constants.IDISK_PROVIDER, None) 513 if provider is None: 514 raise errors.ProgrammerError("Disk template is %s, but '%s' is" 515 " not found", constants.DT_EXT, 516 constants.IDISK_PROVIDER) 517 return (provider, names[idx]) 518 else: 519 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) 520 521 dev_type = template_name 522 523 for idx, disk in enumerate(disk_info): 524 params = {} 525 # Only for the Ext template add disk_info to params 526 if template_name == constants.DT_EXT: 527 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER] 528 for key in disk: 529 if key not in constants.IDISK_PARAMS: 530 params[key] = disk[key] 531 disk_index = idx + base_index 532 size = disk[constants.IDISK_SIZE] 533 feedback_fn("* disk %s, size %s" % 534 (disk_index, utils.FormatUnit(size, "h"))) 535 disk_dev = objects.Disk(dev_type=dev_type, size=size, 536 logical_id=logical_id_fn(idx, disk_index, disk), 537 iv_name="disk/%d" % disk_index, 538 mode=disk[constants.IDISK_MODE], 539 params=params, 540 spindles=disk.get(constants.IDISK_SPINDLES)) 541 disk_dev.name = disk.get(constants.IDISK_NAME, None) 542 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 543 disks.append(disk_dev) 544 545 return disks 546
547 548 -def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
549 """Check the presence of the spindle options with exclusive_storage. 550 551 @type diskdict: dict 552 @param diskdict: disk parameters 553 @type es_flag: bool 554 @param es_flag: the effective value of the exlusive_storage flag 555 @type required: bool 556 @param required: whether spindles are required or just optional 557 @raise errors.OpPrereqError when spindles are given and they should not 558 559 """ 560 if (not es_flag and constants.IDISK_SPINDLES in diskdict and 561 diskdict[constants.IDISK_SPINDLES] is not None): 562 raise errors.OpPrereqError("Spindles in instance disks cannot be specified" 563 " when exclusive storage is not active", 564 errors.ECODE_INVAL) 565 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or 566 diskdict[constants.IDISK_SPINDLES] is None)): 567 raise errors.OpPrereqError("You must specify spindles in instance disks" 568 " when exclusive storage is active", 569 errors.ECODE_INVAL)
570
571 572 -class LUInstanceRecreateDisks(LogicalUnit):
573 """Recreate an instance's missing disks. 574 575 """ 576 HPATH = "instance-recreate-disks" 577 HTYPE = constants.HTYPE_INSTANCE 578 REQ_BGL = False 579 580 _MODIFYABLE = compat.UniqueFrozenset([ 581 constants.IDISK_SIZE, 582 constants.IDISK_MODE, 583 constants.IDISK_SPINDLES, 584 ]) 585 586 # New or changed disk parameters may have different semantics 587 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([ 588 constants.IDISK_ADOPT, 589 590 # TODO: Implement support changing VG while recreating 591 constants.IDISK_VG, 592 constants.IDISK_METAVG, 593 constants.IDISK_PROVIDER, 594 constants.IDISK_NAME, 595 ])) 596
597 - def _RunAllocator(self):
598 """Run the allocator based on input opcode. 599 600 """ 601 be_full = self.cfg.GetClusterInfo().FillBE(self.instance) 602 603 # FIXME 604 # The allocator should actually run in "relocate" mode, but current 605 # allocators don't support relocating all the nodes of an instance at 606 # the same time. As a workaround we use "allocate" mode, but this is 607 # suboptimal for two reasons: 608 # - The instance name passed to the allocator is present in the list of 609 # existing instances, so there could be a conflict within the 610 # internal structures of the allocator. This doesn't happen with the 611 # current allocators, but it's a liability. 612 # - The allocator counts the resources used by the instance twice: once 613 # because the instance exists already, and once because it tries to 614 # allocate a new instance. 615 # The allocator could choose some of the nodes on which the instance is 616 # running, but that's not a problem. If the instance nodes are broken, 617 # they should be already be marked as drained or offline, and hence 618 # skipped by the allocator. If instance disks have been lost for other 619 # reasons, then recreating the disks on the same nodes should be fine. 620 disk_template = self.instance.disk_template 621 spindle_use = be_full[constants.BE_SPINDLE_USE] 622 disks = [{ 623 constants.IDISK_SIZE: d.size, 624 constants.IDISK_MODE: d.mode, 625 constants.IDISK_SPINDLES: d.spindles, 626 } for d in self.cfg.GetInstanceDisks(self.instance.uuid)] 627 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name, 628 disk_template=disk_template, 629 tags=list(self.instance.GetTags()), 630 os=self.instance.os, 631 nics=[{}], 632 vcpus=be_full[constants.BE_VCPUS], 633 memory=be_full[constants.BE_MAXMEM], 634 spindle_use=spindle_use, 635 disks=disks, 636 hypervisor=self.instance.hypervisor, 637 node_whitelist=None) 638 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 639 640 ial.Run(self.op.iallocator) 641 642 assert req.RequiredNodes() == \ 643 len(self.cfg.GetInstanceNodes(self.instance.uuid)) 644 645 if not ial.success: 646 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 647 " %s" % (self.op.iallocator, ial.info), 648 errors.ECODE_NORES) 649 650 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result) 651 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 652 self.op.instance_name, self.op.iallocator, 653 utils.CommaJoin(self.op.nodes))
654
655 - def CheckArguments(self):
656 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]): 657 # Normalize and convert deprecated list of disk indices 658 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] 659 660 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks)) 661 if duplicates: 662 raise errors.OpPrereqError("Some disks have been specified more than" 663 " once: %s" % utils.CommaJoin(duplicates), 664 errors.ECODE_INVAL) 665 666 # We don't want _CheckIAllocatorOrNode selecting the default iallocator 667 # when neither iallocator nor nodes are specified 668 if self.op.iallocator or self.op.nodes: 669 CheckIAllocatorOrNode(self, "iallocator", "nodes") 670 671 for (idx, params) in self.op.disks: 672 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) 673 unsupported = frozenset(params.keys()) - self._MODIFYABLE 674 if unsupported: 675 raise errors.OpPrereqError("Parameters for disk %s try to change" 676 " unmodifyable parameter(s): %s" % 677 (idx, utils.CommaJoin(unsupported)), 678 errors.ECODE_INVAL)
679
680 - def ExpandNames(self):
681 self._ExpandAndLockInstance() 682 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 683 684 if self.op.nodes: 685 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes) 686 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids) 687 else: 688 self.needed_locks[locking.LEVEL_NODE] = [] 689 if self.op.iallocator: 690 # iallocator will select a new node in the same group 691 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 692 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 693 694 self.needed_locks[locking.LEVEL_NODE_RES] = []
695
696 - def DeclareLocks(self, level):
697 if level == locking.LEVEL_NODEGROUP: 698 assert self.op.iallocator is not None 699 assert not self.op.nodes 700 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 701 self.share_locks[locking.LEVEL_NODEGROUP] = 1 702 # Lock the primary group used by the instance optimistically; this 703 # requires going via the node before it's locked, requiring 704 # verification later on 705 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 706 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True) 707 708 elif level == locking.LEVEL_NODE: 709 # If an allocator is used, then we lock all the nodes in the current 710 # instance group, as we don't know yet which ones will be selected; 711 # if we replace the nodes without using an allocator, locks are 712 # already declared in ExpandNames; otherwise, we need to lock all the 713 # instance nodes for disk re-creation 714 if self.op.iallocator: 715 assert not self.op.nodes 716 assert not self.needed_locks[locking.LEVEL_NODE] 717 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1 718 719 # Lock member nodes of the group of the primary node 720 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP): 721 self.needed_locks[locking.LEVEL_NODE].extend( 722 self.cfg.GetNodeGroup(group_uuid).members) 723 724 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) 725 elif not self.op.nodes: 726 self._LockInstancesNodes(primary_only=False) 727 elif level == locking.LEVEL_NODE_RES: 728 # Copy node locks 729 self.needed_locks[locking.LEVEL_NODE_RES] = \ 730 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
731
732 - def BuildHooksEnv(self):
733 """Build hooks env. 734 735 This runs on master, primary and secondary nodes of the instance. 736 737 """ 738 return BuildInstanceHookEnvByObject(self, self.instance)
739
740 - def BuildHooksNodes(self):
741 """Build hooks nodes. 742 743 """ 744 nl = [self.cfg.GetMasterNode()] + \ 745 list(self.cfg.GetInstanceNodes(self.instance.uuid)) 746 return (nl, nl)
747
748 - def CheckPrereq(self):
749 """Check prerequisites. 750 751 This checks that the instance is in the cluster and is not running. 752 753 """ 754 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 755 assert instance is not None, \ 756 "Cannot retrieve locked instance %s" % self.op.instance_name 757 if self.op.node_uuids: 758 inst_nodes = self.cfg.GetInstanceNodes(instance.uuid) 759 if len(self.op.node_uuids) != len(inst_nodes): 760 raise errors.OpPrereqError("Instance %s currently has %d nodes, but" 761 " %d replacement nodes were specified" % 762 (instance.name, len(inst_nodes), 763 len(self.op.node_uuids)), 764 errors.ECODE_INVAL) 765 assert instance.disk_template != constants.DT_DRBD8 or \ 766 len(self.op.node_uuids) == 2 767 assert instance.disk_template != constants.DT_PLAIN or \ 768 len(self.op.node_uuids) == 1 769 primary_node = self.op.node_uuids[0] 770 else: 771 primary_node = instance.primary_node 772 if not self.op.iallocator: 773 CheckNodeOnline(self, primary_node) 774 775 if instance.disk_template == constants.DT_DISKLESS: 776 raise errors.OpPrereqError("Instance '%s' has no disks" % 777 self.op.instance_name, errors.ECODE_INVAL) 778 779 # Verify if node group locks are still correct 780 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 781 if owned_groups: 782 # Node group locks are acquired only for the primary node (and only 783 # when the allocator is used) 784 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups, 785 primary_only=True) 786 787 # if we replace nodes *and* the old primary is offline, we don't 788 # check the instance state 789 old_pnode = self.cfg.GetNodeInfo(instance.primary_node) 790 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline): 791 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, 792 msg="cannot recreate disks") 793 794 if self.op.disks: 795 self.disks = dict(self.op.disks) 796 else: 797 self.disks = dict((idx, {}) for idx in range(len(instance.disks))) 798 799 maxidx = max(self.disks.keys()) 800 if maxidx >= len(instance.disks): 801 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, 802 errors.ECODE_INVAL) 803 804 if ((self.op.node_uuids or self.op.iallocator) and 805 sorted(self.disks.keys()) != range(len(instance.disks))): 806 raise errors.OpPrereqError("Can't recreate disks partially and" 807 " change the nodes at the same time", 808 errors.ECODE_INVAL) 809 810 self.instance = instance 811 812 if self.op.iallocator: 813 self._RunAllocator() 814 # Release unneeded node and node resource locks 815 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids) 816 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids) 817 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) 818 819 if self.op.node_uuids: 820 node_uuids = self.op.node_uuids 821 else: 822 node_uuids = self.cfg.GetInstanceNodes(instance.uuid) 823 excl_stor = compat.any( 824 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values() 825 ) 826 for new_params in self.disks.values(): 827 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
828
829 - def Exec(self, feedback_fn):
830 """Recreate the disks. 831 832 """ 833 assert (self.owned_locks(locking.LEVEL_NODE) == 834 self.owned_locks(locking.LEVEL_NODE_RES)) 835 836 to_skip = [] 837 mods = [] # keeps track of needed changes 838 839 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 840 for idx, disk in enumerate(inst_disks): 841 try: 842 changes = self.disks[idx] 843 except KeyError: 844 # Disk should not be recreated 845 to_skip.append(idx) 846 continue 847 848 # update secondaries for disks, if needed 849 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8: 850 # need to update the nodes and minors 851 assert len(self.op.node_uuids) == 2 852 assert len(disk.logical_id) == 6 # otherwise disk internals 853 # have changed 854 (_, _, old_port, _, _, old_secret) = disk.logical_id 855 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids, 856 self.instance.uuid) 857 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port, 858 new_minors[0], new_minors[1], old_secret) 859 assert len(disk.logical_id) == len(new_id) 860 else: 861 new_id = None 862 863 mods.append((idx, new_id, changes)) 864 865 # now that we have passed all asserts above, we can apply the mods 866 # in a single run (to avoid partial changes) 867 for idx, new_id, changes in mods: 868 disk = inst_disks[idx] 869 if new_id is not None: 870 assert disk.dev_type == constants.DT_DRBD8 871 disk.logical_id = new_id 872 if changes: 873 disk.Update(size=changes.get(constants.IDISK_SIZE, None), 874 mode=changes.get(constants.IDISK_MODE, None), 875 spindles=changes.get(constants.IDISK_SPINDLES, None)) 876 self.cfg.Update(disk, feedback_fn) 877 878 # change primary node, if needed 879 if self.op.node_uuids: 880 self.instance.primary_node = self.op.node_uuids[0] 881 self.LogWarning("Changing the instance's nodes, you will have to" 882 " remove any disks left on the older nodes manually") 883 884 if self.op.node_uuids: 885 self.cfg.Update(self.instance, feedback_fn) 886 887 # All touched nodes must be locked 888 mylocks = self.owned_locks(locking.LEVEL_NODE) 889 inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid) 890 assert mylocks.issuperset(frozenset(inst_nodes)) 891 new_disks = CreateDisks(self, self.instance, to_skip=to_skip) 892 893 # TODO: Release node locks before wiping, or explain why it's not possible 894 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 895 if self.cfg.GetClusterInfo().prealloc_wipe_disks: 896 wipedisks = [(idx, disk, 0) 897 for (idx, disk) in enumerate(inst_disks) 898 if idx not in to_skip] 899 WipeOrCleanupDisks(self, self.instance, disks=wipedisks, 900 cleanup=new_disks)
901
902 903 -def _PerformNodeInfoCall(lu, node_uuids, vg):
904 """Prepares the input and performs a node info call. 905 906 @type lu: C{LogicalUnit} 907 @param lu: a logical unit from which we get configuration data 908 @type node_uuids: list of string 909 @param node_uuids: list of node UUIDs to perform the call for 910 @type vg: string 911 @param vg: the volume group's name 912 913 """ 914 lvm_storage_units = [(constants.ST_LVM_VG, vg)] 915 storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units, 916 node_uuids) 917 hvname = lu.cfg.GetHypervisorType() 918 hvparams = lu.cfg.GetClusterInfo().hvparams 919 nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units, 920 [(hvname, hvparams[hvname])]) 921 return nodeinfo
922
923 924 -def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
925 """Checks the vg capacity for a given node. 926 927 @type node_info: tuple (_, list of dicts, _) 928 @param node_info: the result of the node info call for one node 929 @type node_name: string 930 @param node_name: the name of the node 931 @type vg: string 932 @param vg: volume group name 933 @type requested: int 934 @param requested: the amount of disk in MiB to check for 935 @raise errors.OpPrereqError: if the node doesn't have enough disk, 936 or we cannot check the node 937 938 """ 939 (_, space_info, _) = node_info 940 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType( 941 space_info, constants.ST_LVM_VG) 942 if not lvm_vg_info: 943 raise errors.OpPrereqError("Can't retrieve storage information for LVM") 944 vg_free = lvm_vg_info.get("storage_free", None) 945 if not isinstance(vg_free, int): 946 raise errors.OpPrereqError("Can't compute free disk space on node" 947 " %s for vg %s, result was '%s'" % 948 (node_name, vg, vg_free), errors.ECODE_ENVIRON) 949 if requested > vg_free: 950 raise errors.OpPrereqError("Not enough disk space on target node %s" 951 " vg %s: required %d MiB, available %d MiB" % 952 (node_name, vg, requested, vg_free), 953 errors.ECODE_NORES)
954
955 956 -def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
957 """Checks if nodes have enough free disk space in the specified VG. 958 959 This function checks if all given nodes have the needed amount of 960 free disk. In case any node has less disk or we cannot get the 961 information from the node, this function raises an OpPrereqError 962 exception. 963 964 @type lu: C{LogicalUnit} 965 @param lu: a logical unit from which we get configuration data 966 @type node_uuids: C{list} 967 @param node_uuids: the list of node UUIDs to check 968 @type vg: C{str} 969 @param vg: the volume group to check 970 @type requested: C{int} 971 @param requested: the amount of disk in MiB to check for 972 @raise errors.OpPrereqError: if the node doesn't have enough disk, 973 or we cannot check the node 974 975 """ 976 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg) 977 for node_uuid in node_uuids: 978 node_name = lu.cfg.GetNodeName(node_uuid) 979 info = nodeinfo[node_uuid] 980 info.Raise("Cannot get current information from node %s" % node_name, 981 prereq=True, ecode=errors.ECODE_ENVIRON) 982 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
983
984 985 -def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
986 """Checks if nodes have enough free disk space in all the VGs. 987 988 This function checks if all given nodes have the needed amount of 989 free disk. In case any node has less disk or we cannot get the 990 information from the node, this function raises an OpPrereqError 991 exception. 992 993 @type lu: C{LogicalUnit} 994 @param lu: a logical unit from which we get configuration data 995 @type node_uuids: C{list} 996 @param node_uuids: the list of node UUIDs to check 997 @type req_sizes: C{dict} 998 @param req_sizes: the hash of vg and corresponding amount of disk in 999 MiB to check for 1000 @raise errors.OpPrereqError: if the node doesn't have enough disk, 1001 or we cannot check the node 1002 1003 """ 1004 for vg, req_size in req_sizes.items(): 1005 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
1006
1007 1008 -def _DiskSizeInBytesToMebibytes(lu, size):
1009 """Converts a disk size in bytes to mebibytes. 1010 1011 Warns and rounds up if the size isn't an even multiple of 1 MiB. 1012 1013 """ 1014 (mib, remainder) = divmod(size, 1024 * 1024) 1015 1016 if remainder != 0: 1017 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up" 1018 " to not overwrite existing data (%s bytes will not be" 1019 " wiped)", (1024 * 1024) - remainder) 1020 mib += 1 1021 1022 return mib
1023
1024 1025 -def _CalcEta(time_taken, written, total_size):
1026 """Calculates the ETA based on size written and total size. 1027 1028 @param time_taken: The time taken so far 1029 @param written: amount written so far 1030 @param total_size: The total size of data to be written 1031 @return: The remaining time in seconds 1032 1033 """ 1034 avg_time = time_taken / float(written) 1035 return (total_size - written) * avg_time
1036
1037 1038 -def WipeDisks(lu, instance, disks=None):
1039 """Wipes instance disks. 1040 1041 @type lu: L{LogicalUnit} 1042 @param lu: the logical unit on whose behalf we execute 1043 @type instance: L{objects.Instance} 1044 @param instance: the instance whose disks we should create 1045 @type disks: None or list of tuple of (number, L{objects.Disk}, number) 1046 @param disks: Disk details; tuple contains disk index, disk object and the 1047 start offset 1048 1049 """ 1050 node_uuid = instance.primary_node 1051 node_name = lu.cfg.GetNodeName(node_uuid) 1052 1053 if disks is None: 1054 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1055 disks = [(idx, disk, 0) 1056 for (idx, disk) in enumerate(inst_disks)] 1057 1058 logging.info("Pausing synchronization of disks of instance '%s'", 1059 instance.name) 1060 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1061 (map(compat.snd, disks), 1062 instance), 1063 True) 1064 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name) 1065 1066 for idx, success in enumerate(result.payload): 1067 if not success: 1068 logging.warn("Pausing synchronization of disk %s of instance '%s'" 1069 " failed", idx, instance.name) 1070 1071 try: 1072 for (idx, device, offset) in disks: 1073 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but 1074 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors. 1075 wipe_chunk_size = \ 1076 int(min(constants.MAX_WIPE_CHUNK, 1077 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT)) 1078 1079 size = device.size 1080 last_output = 0 1081 start_time = time.time() 1082 1083 if offset == 0: 1084 info_text = "" 1085 else: 1086 info_text = (" (from %s to %s)" % 1087 (utils.FormatUnit(offset, "h"), 1088 utils.FormatUnit(size, "h"))) 1089 1090 lu.LogInfo("* Wiping disk %s%s", idx, info_text) 1091 1092 logging.info("Wiping disk %d for instance %s on node %s using" 1093 " chunk size %s", idx, instance.name, node_name, 1094 wipe_chunk_size) 1095 1096 while offset < size: 1097 wipe_size = min(wipe_chunk_size, size - offset) 1098 1099 logging.debug("Wiping disk %d, offset %s, chunk %s", 1100 idx, offset, wipe_size) 1101 1102 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance), 1103 offset, wipe_size) 1104 result.Raise("Could not wipe disk %d at offset %d for size %d" % 1105 (idx, offset, wipe_size)) 1106 1107 now = time.time() 1108 offset += wipe_size 1109 if now - last_output >= 60: 1110 eta = _CalcEta(now - start_time, offset, size) 1111 lu.LogInfo(" - done: %.1f%% ETA: %s", 1112 offset / float(size) * 100, utils.FormatSeconds(eta)) 1113 last_output = now 1114 finally: 1115 logging.info("Resuming synchronization of disks for instance '%s'", 1116 instance.name) 1117 1118 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1119 (map(compat.snd, disks), 1120 instance), 1121 False) 1122 1123 if result.fail_msg: 1124 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s", 1125 node_name, result.fail_msg) 1126 else: 1127 for idx, success in enumerate(result.payload): 1128 if not success: 1129 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'" 1130 " failed", idx, instance.name)
1131
1132 1133 -def ImageDisks(lu, instance, image, disks=None):
1134 """Dumps an image onto an instance disk. 1135 1136 @type lu: L{LogicalUnit} 1137 @param lu: the logical unit on whose behalf we execute 1138 @type instance: L{objects.Instance} 1139 @param instance: the instance whose disks we should create 1140 @type image: string 1141 @param image: the image whose disks we should create 1142 @type disks: None or list of ints 1143 @param disks: disk indices 1144 1145 """ 1146 node_uuid = instance.primary_node 1147 node_name = lu.cfg.GetNodeName(node_uuid) 1148 1149 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1150 if disks is None: 1151 disks = [(0, inst_disks[0])] 1152 else: 1153 disks = map(lambda idx: (idx, inst_disks[idx]), disks) 1154 1155 logging.info("Pausing synchronization of disks of instance '%s'", 1156 instance.name) 1157 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1158 (map(compat.snd, disks), 1159 instance), 1160 True) 1161 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name) 1162 1163 for idx, success in enumerate(result.payload): 1164 if not success: 1165 logging.warn("Pausing synchronization of disk %s of instance '%s'" 1166 " failed", idx, instance.name) 1167 1168 try: 1169 for (idx, device) in disks: 1170 lu.LogInfo("Imaging disk '%d' for instance '%s' on node '%s'", 1171 idx, instance.name, node_name) 1172 1173 result = lu.rpc.call_blockdev_image(node_uuid, (device, instance), 1174 image, device.size) 1175 result.Raise("Could not image disk '%d' for instance '%s' on node '%s'" % 1176 (idx, instance.name, node_name)) 1177 finally: 1178 logging.info("Resuming synchronization of disks for instance '%s'", 1179 instance.name) 1180 1181 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1182 (map(compat.snd, disks), 1183 instance), 1184 False) 1185 1186 if result.fail_msg: 1187 lu.LogWarning("Failed to resume disk synchronization for instance '%s' on" 1188 " node '%s'", node_name, result.fail_msg) 1189 else: 1190 for idx, success in enumerate(result.payload): 1191 if not success: 1192 lu.LogWarning("Failed to resume synchronization of disk '%d' of" 1193 " instance '%s'", idx, instance.name)
1194
1195 1196 -def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1197 """Wrapper for L{WipeDisks} that handles errors. 1198 1199 @type lu: L{LogicalUnit} 1200 @param lu: the logical unit on whose behalf we execute 1201 @type instance: L{objects.Instance} 1202 @param instance: the instance whose disks we should wipe 1203 @param disks: see L{WipeDisks} 1204 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in 1205 case of error 1206 @raise errors.OpPrereqError: in case of failure 1207 1208 """ 1209 try: 1210 WipeDisks(lu, instance, disks=disks) 1211 except errors.OpExecError: 1212 logging.warning("Wiping disks for instance '%s' failed", 1213 instance.name) 1214 _UndoCreateDisks(lu, cleanup, instance) 1215 raise
1216
1217 1218 -def ExpandCheckDisks(instance_disks, disks):
1219 """Return the instance disks selected by the disks list 1220 1221 @type disks: list of L{objects.Disk} or None 1222 @param disks: selected disks 1223 @rtype: list of L{objects.Disk} 1224 @return: selected instance disks to act on 1225 1226 """ 1227 if disks is None: 1228 return instance_disks 1229 else: 1230 inst_disks_uuids = [d.uuid for d in instance_disks] 1231 disks_uuids = [d.uuid for d in disks] 1232 if not set(disks_uuids).issubset(inst_disks_uuids): 1233 raise errors.ProgrammerError("Can only act on disks belonging to the" 1234 " target instance: expected a subset of %s," 1235 " got %s" % (inst_disks_uuids, disks_uuids)) 1236 return disks
1237
1238 1239 -def WaitForSync(lu, instance, disks=None, oneshot=False):
1240 """Sleep and poll for an instance's disk to sync. 1241 1242 """ 1243 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1244 if not inst_disks or disks is not None and not disks: 1245 return True 1246 1247 disks = ExpandCheckDisks(inst_disks, disks) 1248 1249 if not oneshot: 1250 lu.LogInfo("Waiting for instance %s to sync disks", instance.name) 1251 1252 node_uuid = instance.primary_node 1253 node_name = lu.cfg.GetNodeName(node_uuid) 1254 1255 # TODO: Convert to utils.Retry 1256 1257 retries = 0 1258 degr_retries = 10 # in seconds, as we sleep 1 second each time 1259 while True: 1260 max_time = 0 1261 done = True 1262 cumul_degraded = False 1263 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance)) 1264 msg = rstats.fail_msg 1265 if msg: 1266 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg) 1267 retries += 1 1268 if retries >= 10: 1269 raise errors.RemoteError("Can't contact node %s for mirror data," 1270 " aborting." % node_name) 1271 time.sleep(6) 1272 continue 1273 rstats = rstats.payload 1274 retries = 0 1275 for i, mstat in enumerate(rstats): 1276 if mstat is None: 1277 lu.LogWarning("Can't compute data for node %s/%s", 1278 node_name, disks[i].iv_name) 1279 continue 1280 1281 cumul_degraded = (cumul_degraded or 1282 (mstat.is_degraded and mstat.sync_percent is None)) 1283 if mstat.sync_percent is not None: 1284 done = False 1285 if mstat.estimated_time is not None: 1286 rem_time = ("%s remaining (estimated)" % 1287 utils.FormatSeconds(mstat.estimated_time)) 1288 max_time = mstat.estimated_time 1289 else: 1290 rem_time = "no time estimate" 1291 max_time = 5 # sleep at least a bit between retries 1292 lu.LogInfo("- device %s: %5.2f%% done, %s", 1293 disks[i].iv_name, mstat.sync_percent, rem_time) 1294 1295 # if we're done but degraded, let's do a few small retries, to 1296 # make sure we see a stable and not transient situation; therefore 1297 # we force restart of the loop 1298 if (done or oneshot) and cumul_degraded and degr_retries > 0: 1299 logging.info("Degraded disks found, %d retries left", degr_retries) 1300 degr_retries -= 1 1301 time.sleep(1) 1302 continue 1303 1304 if done or oneshot: 1305 break 1306 1307 time.sleep(min(60, max_time)) 1308 1309 if done: 1310 lu.LogInfo("Instance %s's disks are in sync", instance.name) 1311 1312 return not cumul_degraded
1313
1314 1315 -def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1316 """Shutdown block devices of an instance. 1317 1318 This does the shutdown on all nodes of the instance. 1319 1320 If the ignore_primary is false, errors on the primary node are 1321 ignored. 1322 1323 Modifies the configuration of the instance, so the caller should re-read the 1324 instance configuration, if needed. 1325 1326 """ 1327 all_result = True 1328 1329 if disks is None: 1330 # only mark instance disks as inactive if all disks are affected 1331 lu.cfg.MarkInstanceDisksInactive(instance.uuid) 1332 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1333 disks = ExpandCheckDisks(inst_disks, disks) 1334 1335 for disk in disks: 1336 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node): 1337 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance)) 1338 msg = result.fail_msg 1339 if msg: 1340 lu.LogWarning("Could not shutdown block device %s on node %s: %s", 1341 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1342 if ((node_uuid == instance.primary_node and not ignore_primary) or 1343 (node_uuid != instance.primary_node and not result.offline)): 1344 all_result = False 1345 return all_result
1346
1347 1348 -def _SafeShutdownInstanceDisks(lu, instance, disks=None, req_states=None):
1349 """Shutdown block devices of an instance. 1350 1351 This function checks if an instance is running, before calling 1352 _ShutdownInstanceDisks. 1353 1354 """ 1355 if req_states is None: 1356 req_states = INSTANCE_DOWN 1357 CheckInstanceState(lu, instance, req_states, msg="cannot shutdown disks") 1358 ShutdownInstanceDisks(lu, instance, disks=disks)
1359
1360 1361 -def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, 1362 ignore_size=False):
1363 """Prepare the block devices for an instance. 1364 1365 This sets up the block devices on all nodes. 1366 1367 Modifies the configuration of the instance, so the caller should re-read the 1368 instance configuration, if needed. 1369 1370 @type lu: L{LogicalUnit} 1371 @param lu: the logical unit on whose behalf we execute 1372 @type instance: L{objects.Instance} 1373 @param instance: the instance for whose disks we assemble 1374 @type disks: list of L{objects.Disk} or None 1375 @param disks: which disks to assemble (or all, if None) 1376 @type ignore_secondaries: boolean 1377 @param ignore_secondaries: if true, errors on secondary nodes 1378 won't result in an error return from the function 1379 @type ignore_size: boolean 1380 @param ignore_size: if true, the current known size of the disk 1381 will not be used during the disk activation, useful for cases 1382 when the size is wrong 1383 @return: False if the operation failed, otherwise a list of 1384 (host, instance_visible_name, node_visible_name) 1385 with the mapping from node devices to instance devices 1386 1387 """ 1388 device_info = [] 1389 disks_ok = True 1390 1391 if disks is None: 1392 # only mark instance disks as active if all disks are affected 1393 instance = lu.cfg.MarkInstanceDisksActive(instance.uuid) 1394 1395 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1396 disks = ExpandCheckDisks(inst_disks, disks) 1397 1398 # With the two passes mechanism we try to reduce the window of 1399 # opportunity for the race condition of switching DRBD to primary 1400 # before handshaking occured, but we do not eliminate it 1401 1402 # The proper fix would be to wait (with some limits) until the 1403 # connection has been made and drbd transitions from WFConnection 1404 # into any other network-connected state (Connected, SyncTarget, 1405 # SyncSource, etc.) 1406 1407 # 1st pass, assemble on all nodes in secondary mode 1408 for idx, inst_disk in enumerate(disks): 1409 for node_uuid, node_disk in inst_disk.ComputeNodeTree( 1410 instance.primary_node): 1411 if ignore_size: 1412 node_disk = node_disk.Copy() 1413 node_disk.UnsetSize() 1414 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance), 1415 instance, False, idx) 1416 msg = result.fail_msg 1417 if msg: 1418 secondary_nodes = lu.cfg.GetInstanceSecondaryNodes(instance.uuid) 1419 is_offline_secondary = (node_uuid in secondary_nodes and 1420 result.offline) 1421 lu.LogWarning("Could not prepare block device %s on node %s" 1422 " (is_primary=False, pass=1): %s", 1423 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1424 if not (ignore_secondaries or is_offline_secondary): 1425 disks_ok = False 1426 1427 # FIXME: race condition on drbd migration to primary 1428 1429 # 2nd pass, do only the primary node 1430 for idx, inst_disk in enumerate(disks): 1431 dev_path = None 1432 1433 for node_uuid, node_disk in inst_disk.ComputeNodeTree( 1434 instance.primary_node): 1435 if node_uuid != instance.primary_node: 1436 continue 1437 if ignore_size: 1438 node_disk = node_disk.Copy() 1439 node_disk.UnsetSize() 1440 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance), 1441 instance, True, idx) 1442 msg = result.fail_msg 1443 if msg: 1444 lu.LogWarning("Could not prepare block device %s on node %s" 1445 " (is_primary=True, pass=2): %s", 1446 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1447 disks_ok = False 1448 else: 1449 dev_path, _, __ = result.payload 1450 1451 device_info.append((lu.cfg.GetNodeName(instance.primary_node), 1452 inst_disk.iv_name, dev_path)) 1453 1454 if not disks_ok: 1455 lu.cfg.MarkInstanceDisksInactive(instance.uuid) 1456 1457 return disks_ok, device_info
1458
1459 1460 -def StartInstanceDisks(lu, instance, force):
1461 """Start the disks of an instance. 1462 1463 Modifies the configuration of the instance, so the caller should re-read the 1464 instance configuration, if needed. 1465 1466 """ 1467 disks_ok, _ = AssembleInstanceDisks(lu, instance, 1468 ignore_secondaries=force) 1469 if not disks_ok: 1470 ShutdownInstanceDisks(lu, instance) 1471 if force is not None and not force: 1472 lu.LogWarning("", 1473 hint=("If the message above refers to a secondary node," 1474 " you can retry the operation using '--force'")) 1475 raise errors.OpExecError("Disk consistency error")
1476
1477 1478 -class LUInstanceGrowDisk(LogicalUnit):
1479 """Grow a disk of an instance. 1480 1481 """ 1482 HPATH = "disk-grow" 1483 HTYPE = constants.HTYPE_INSTANCE 1484 REQ_BGL = False 1485
1486 - def ExpandNames(self):
1487 self._ExpandAndLockInstance() 1488 self.needed_locks[locking.LEVEL_NODE] = [] 1489 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1490 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1491 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1492
1493 - def DeclareLocks(self, level):
1494 if level == locking.LEVEL_NODE: 1495 self._LockInstancesNodes() 1496 elif level == locking.LEVEL_NODE_RES: 1497 # Copy node locks 1498 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1499 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1500
1501 - def BuildHooksEnv(self):
1502 """Build hooks env. 1503 1504 This runs on the master, the primary and all the secondaries. 1505 1506 """ 1507 env = { 1508 "DISK": self.op.disk, 1509 "AMOUNT": self.op.amount, 1510 "ABSOLUTE": self.op.absolute, 1511 } 1512 env.update(BuildInstanceHookEnvByObject(self, self.instance)) 1513 return env
1514
1515 - def BuildHooksNodes(self):
1516 """Build hooks nodes. 1517 1518 """ 1519 nl = [self.cfg.GetMasterNode()] + \ 1520 list(self.cfg.GetInstanceNodes(self.instance.uuid)) 1521 return (nl, nl)
1522
1523 - def CheckPrereq(self):
1524 """Check prerequisites. 1525 1526 This checks that the instance is in the cluster. 1527 1528 """ 1529 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1530 assert self.instance is not None, \ 1531 "Cannot retrieve locked instance %s" % self.op.instance_name 1532 node_uuids = list(self.cfg.GetInstanceNodes(self.instance.uuid)) 1533 for node_uuid in node_uuids: 1534 CheckNodeOnline(self, node_uuid) 1535 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids) 1536 1537 if self.instance.disk_template not in constants.DTS_GROWABLE: 1538 raise errors.OpPrereqError("Instance's disk layout does not support" 1539 " growing", errors.ECODE_INVAL) 1540 1541 self.disk = self.cfg.GetDiskInfo(self.instance.FindDisk(self.op.disk)) 1542 1543 if self.op.absolute: 1544 self.target = self.op.amount 1545 self.delta = self.target - self.disk.size 1546 if self.delta < 0: 1547 raise errors.OpPrereqError("Requested size (%s) is smaller than " 1548 "current disk size (%s)" % 1549 (utils.FormatUnit(self.target, "h"), 1550 utils.FormatUnit(self.disk.size, "h")), 1551 errors.ECODE_STATE) 1552 else: 1553 self.delta = self.op.amount 1554 self.target = self.disk.size + self.delta 1555 if self.delta < 0: 1556 raise errors.OpPrereqError("Requested increment (%s) is negative" % 1557 utils.FormatUnit(self.delta, "h"), 1558 errors.ECODE_INVAL) 1559 1560 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta)) 1561 1562 self._CheckIPolicy(self.target)
1563
1564 - def _CheckDiskSpace(self, node_uuids, req_vgspace):
1565 template = self.instance.disk_template 1566 if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and 1567 not any(self.node_es_flags.values())): 1568 # TODO: check the free disk space for file, when that feature will be 1569 # supported 1570 # With exclusive storage we need to do something smarter than just looking 1571 # at free space, which, in the end, is basically a dry run. So we rely on 1572 # the dry run performed in Exec() instead. 1573 CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1574
1575 - def _CheckIPolicy(self, target_size):
1576 cluster = self.cfg.GetClusterInfo() 1577 group_uuid = list(self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, 1578 primary_only=True))[0] 1579 group_info = self.cfg.GetNodeGroup(group_uuid) 1580 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 1581 group_info) 1582 1583 disk_sizes = [disk.size if disk.uuid != self.disk.uuid else target_size 1584 for disk in self.cfg.GetInstanceDisks(self.op.instance_uuid)] 1585 1586 # The ipolicy checker below ignores None, so we only give it the disk size 1587 res = ComputeIPolicyDiskSizesViolation(ipolicy, disk_sizes, 1588 self.instance.disk_template) 1589 if res: 1590 msg = ("Growing disk %s violates policy: %s" % 1591 (self.op.disk, 1592 utils.CommaJoin(res))) 1593 if self.op.ignore_ipolicy: 1594 self.LogWarning(msg) 1595 else: 1596 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1597
1598 - def Exec(self, feedback_fn):
1599 """Execute disk grow. 1600 1601 """ 1602 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) 1603 assert (self.owned_locks(locking.LEVEL_NODE) == 1604 self.owned_locks(locking.LEVEL_NODE_RES)) 1605 1606 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks 1607 1608 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk]) 1609 if not disks_ok: 1610 raise errors.OpExecError("Cannot activate block device to grow") 1611 1612 feedback_fn("Growing disk %s of instance '%s' by %s to %s" % 1613 (self.op.disk, self.instance.name, 1614 utils.FormatUnit(self.delta, "h"), 1615 utils.FormatUnit(self.target, "h"))) 1616 1617 # First run all grow ops in dry-run mode 1618 inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid) 1619 for node_uuid in inst_nodes: 1620 result = self.rpc.call_blockdev_grow(node_uuid, 1621 (self.disk, self.instance), 1622 self.delta, True, True, 1623 self.node_es_flags[node_uuid]) 1624 result.Raise("Dry-run grow request failed to node %s" % 1625 self.cfg.GetNodeName(node_uuid)) 1626 1627 if wipe_disks: 1628 # Get disk size from primary node for wiping 1629 result = self.rpc.call_blockdev_getdimensions( 1630 self.instance.primary_node, [([self.disk], self.instance)]) 1631 result.Raise("Failed to retrieve disk size from node '%s'" % 1632 self.instance.primary_node) 1633 1634 (disk_dimensions, ) = result.payload 1635 1636 if disk_dimensions is None: 1637 raise errors.OpExecError("Failed to retrieve disk size from primary" 1638 " node '%s'" % self.instance.primary_node) 1639 (disk_size_in_bytes, _) = disk_dimensions 1640 1641 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes) 1642 1643 assert old_disk_size >= self.disk.size, \ 1644 ("Retrieved disk size too small (got %s, should be at least %s)" % 1645 (old_disk_size, self.disk.size)) 1646 else: 1647 old_disk_size = None 1648 1649 # We know that (as far as we can test) operations across different 1650 # nodes will succeed, time to run it for real on the backing storage 1651 for node_uuid in inst_nodes: 1652 result = self.rpc.call_blockdev_grow(node_uuid, 1653 (self.disk, self.instance), 1654 self.delta, False, True, 1655 self.node_es_flags[node_uuid]) 1656 result.Raise("Grow request failed to node %s" % 1657 self.cfg.GetNodeName(node_uuid)) 1658 1659 # And now execute it for logical storage, on the primary node 1660 node_uuid = self.instance.primary_node 1661 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance), 1662 self.delta, False, False, 1663 self.node_es_flags[node_uuid]) 1664 result.Raise("Grow request failed to node %s" % 1665 self.cfg.GetNodeName(node_uuid)) 1666 1667 self.disk.RecordGrow(self.delta) 1668 self.cfg.Update(self.instance, feedback_fn) 1669 self.cfg.Update(self.disk, feedback_fn) 1670 1671 # Changes have been recorded, release node lock 1672 ReleaseLocks(self, locking.LEVEL_NODE) 1673 1674 # Downgrade lock while waiting for sync 1675 self.WConfdClient().DownGradeLocksLevel( 1676 locking.LEVEL_NAMES[locking.LEVEL_INSTANCE]) 1677 1678 assert wipe_disks ^ (old_disk_size is None) 1679 1680 if wipe_disks: 1681 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 1682 assert inst_disks[self.op.disk] == self.disk 1683 1684 # Wipe newly added disk space 1685 WipeDisks(self, self.instance, 1686 disks=[(self.op.disk, self.disk, old_disk_size)]) 1687 1688 if self.op.wait_for_sync: 1689 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk]) 1690 if disk_abort: 1691 self.LogWarning("Disk syncing has not returned a good status; check" 1692 " the instance") 1693 if not self.instance.disks_active: 1694 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk]) 1695 elif not self.instance.disks_active: 1696 self.LogWarning("Not shutting down the disk even if the instance is" 1697 " not supposed to be running because no wait for" 1698 " sync mode was requested") 1699 1700 assert self.owned_locks(locking.LEVEL_NODE_RES) 1701 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1702
1703 1704 -class LUInstanceReplaceDisks(LogicalUnit):
1705 """Replace the disks of an instance. 1706 1707 """ 1708 HPATH = "mirrors-replace" 1709 HTYPE = constants.HTYPE_INSTANCE 1710 REQ_BGL = False 1711
1712 - def CheckArguments(self):
1713 """Check arguments. 1714 1715 """ 1716 if self.op.mode == constants.REPLACE_DISK_CHG: 1717 if self.op.remote_node is None and self.op.iallocator is None: 1718 raise errors.OpPrereqError("When changing the secondary either an" 1719 " iallocator script must be used or the" 1720 " new node given", errors.ECODE_INVAL) 1721 else: 1722 CheckIAllocatorOrNode(self, "iallocator", "remote_node") 1723 1724 elif self.op.remote_node is not None or self.op.iallocator is not None: 1725 # Not replacing the secondary 1726 raise errors.OpPrereqError("The iallocator and new node options can" 1727 " only be used when changing the" 1728 " secondary node", errors.ECODE_INVAL)
1729
1730 - def ExpandNames(self):
1731 self._ExpandAndLockInstance() 1732 1733 assert locking.LEVEL_NODE not in self.needed_locks 1734 assert locking.LEVEL_NODE_RES not in self.needed_locks 1735 assert locking.LEVEL_NODEGROUP not in self.needed_locks 1736 1737 assert self.op.iallocator is None or self.op.remote_node is None, \ 1738 "Conflicting options" 1739 1740 if self.op.remote_node is not None: 1741 (self.op.remote_node_uuid, self.op.remote_node) = \ 1742 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 1743 self.op.remote_node) 1744 1745 # Warning: do not remove the locking of the new secondary here 1746 # unless DRBD8Dev.AddChildren is changed to work in parallel; 1747 # currently it doesn't since parallel invocations of 1748 # FindUnusedMinor will conflict 1749 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid] 1750 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 1751 else: 1752 self.needed_locks[locking.LEVEL_NODE] = [] 1753 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1754 1755 if self.op.iallocator is not None: 1756 # iallocator will select a new node in the same group 1757 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 1758 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 1759 1760 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1761 1762 self.replacer = TLReplaceDisks(self, self.op.instance_uuid, 1763 self.op.instance_name, self.op.mode, 1764 self.op.iallocator, self.op.remote_node_uuid, 1765 self.op.disks, self.op.early_release, 1766 self.op.ignore_ipolicy) 1767 1768 self.tasklets = [self.replacer]
1769
1770 - def DeclareLocks(self, level):
1771 if level == locking.LEVEL_NODEGROUP: 1772 assert self.op.remote_node_uuid is None 1773 assert self.op.iallocator is not None 1774 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 1775 1776 self.share_locks[locking.LEVEL_NODEGROUP] = 1 1777 # Lock all groups used by instance optimistically; this requires going 1778 # via the node before it's locked, requiring verification later on 1779 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1780 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid) 1781 1782 elif level == locking.LEVEL_NODE: 1783 if self.op.iallocator is not None: 1784 assert self.op.remote_node_uuid is None 1785 assert not self.needed_locks[locking.LEVEL_NODE] 1786 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) 1787 1788 # Lock member nodes of all locked groups 1789 self.needed_locks[locking.LEVEL_NODE] = \ 1790 [node_uuid 1791 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 1792 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members] 1793 else: 1794 self._LockInstancesNodes() 1795 1796 elif level == locking.LEVEL_NODE_RES: 1797 # Reuse node locks 1798 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1799 self.needed_locks[locking.LEVEL_NODE]
1800
1801 - def BuildHooksEnv(self):
1802 """Build hooks env. 1803 1804 This runs on the master, the primary and all the secondaries. 1805 1806 """ 1807 instance = self.replacer.instance 1808 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 1809 env = { 1810 "MODE": self.op.mode, 1811 "NEW_SECONDARY": self.op.remote_node, 1812 "OLD_SECONDARY": self.cfg.GetNodeName(secondary_nodes[0]), 1813 } 1814 env.update(BuildInstanceHookEnvByObject(self, instance)) 1815 return env
1816
1817 - def BuildHooksNodes(self):
1818 """Build hooks nodes. 1819 1820 """ 1821 instance = self.replacer.instance 1822 nl = [ 1823 self.cfg.GetMasterNode(), 1824 instance.primary_node, 1825 ] 1826 if self.op.remote_node_uuid is not None: 1827 nl.append(self.op.remote_node_uuid) 1828 return nl, nl
1829
1830 - def CheckPrereq(self):
1831 """Check prerequisites. 1832 1833 """ 1834 # Verify if node group locks are still correct 1835 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 1836 if owned_groups: 1837 CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups) 1838 1839 return LogicalUnit.CheckPrereq(self)
1840
1841 1842 -class LUInstanceActivateDisks(NoHooksLU):
1843 """Bring up an instance's disks. 1844 1845 """ 1846 REQ_BGL = False 1847
1848 - def ExpandNames(self):
1849 self._ExpandAndLockInstance() 1850 self.needed_locks[locking.LEVEL_NODE] = [] 1851 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1852
1853 - def DeclareLocks(self, level):
1854 if level == locking.LEVEL_NODE: 1855 self._LockInstancesNodes()
1856
1857 - def CheckPrereq(self):
1858 """Check prerequisites. 1859 1860 This checks that the instance is in the cluster. 1861 1862 """ 1863 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1864 assert self.instance is not None, \ 1865 "Cannot retrieve locked instance %s" % self.op.instance_name 1866 CheckNodeOnline(self, self.instance.primary_node)
1867
1868 - def Exec(self, feedback_fn):
1869 """Activate the disks. 1870 1871 """ 1872 disks_ok, disks_info = \ 1873 AssembleInstanceDisks(self, self.instance, 1874 ignore_size=self.op.ignore_size) 1875 if not disks_ok: 1876 raise errors.OpExecError("Cannot activate block devices") 1877 1878 if self.op.wait_for_sync: 1879 if not WaitForSync(self, self.instance): 1880 self.cfg.MarkInstanceDisksInactive(self.instance.uuid) 1881 raise errors.OpExecError("Some disks of the instance are degraded!") 1882 1883 return disks_info
1884
1885 1886 -class LUInstanceDeactivateDisks(NoHooksLU):
1887 """Shutdown an instance's disks. 1888 1889 """ 1890 REQ_BGL = False 1891
1892 - def ExpandNames(self):
1893 self._ExpandAndLockInstance() 1894 self.needed_locks[locking.LEVEL_NODE] = [] 1895 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1896
1897 - def DeclareLocks(self, level):
1898 if level == locking.LEVEL_NODE: 1899 self._LockInstancesNodes()
1900
1901 - def CheckPrereq(self):
1902 """Check prerequisites. 1903 1904 This checks that the instance is in the cluster. 1905 1906 """ 1907 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1908 assert self.instance is not None, \ 1909 "Cannot retrieve locked instance %s" % self.op.instance_name
1910
1911 - def Exec(self, feedback_fn):
1912 """Deactivate the disks 1913 1914 """ 1915 if self.op.force: 1916 ShutdownInstanceDisks(self, self.instance) 1917 else: 1918 _SafeShutdownInstanceDisks(self, self.instance)
1919
1920 1921 -def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary, 1922 ldisk=False):
1923 """Check that mirrors are not degraded. 1924 1925 @attention: The device has to be annotated already. 1926 1927 The ldisk parameter, if True, will change the test from the 1928 is_degraded attribute (which represents overall non-ok status for 1929 the device(s)) to the ldisk (representing the local storage status). 1930 1931 """ 1932 result = True 1933 1934 if on_primary or dev.AssembleOnSecondary(): 1935 rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance)) 1936 msg = rstats.fail_msg 1937 if msg: 1938 lu.LogWarning("Can't find disk on node %s: %s", 1939 lu.cfg.GetNodeName(node_uuid), msg) 1940 result = False 1941 elif not rstats.payload: 1942 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid)) 1943 result = False 1944 else: 1945 if ldisk: 1946 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY 1947 else: 1948 result = result and not rstats.payload.is_degraded 1949 1950 if dev.children: 1951 for child in dev.children: 1952 result = result and _CheckDiskConsistencyInner(lu, instance, child, 1953 node_uuid, on_primary) 1954 1955 return result
1956
1957 1958 -def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1959 """Wrapper around L{_CheckDiskConsistencyInner}. 1960 1961 """ 1962 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 1963 return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary, 1964 ldisk=ldisk)
1965
1966 1967 -def _BlockdevFind(lu, node_uuid, dev, instance):
1968 """Wrapper around call_blockdev_find to annotate diskparams. 1969 1970 @param lu: A reference to the lu object 1971 @param node_uuid: The node to call out 1972 @param dev: The device to find 1973 @param instance: The instance object the device belongs to 1974 @returns The result of the rpc call 1975 1976 """ 1977 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 1978 return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1979
1980 1981 -def _GenerateUniqueNames(lu, exts):
1982 """Generate a suitable LV name. 1983 1984 This will generate a logical volume name for the given instance. 1985 1986 """ 1987 results = [] 1988 for val in exts: 1989 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 1990 results.append("%s%s" % (new_id, val)) 1991 return results
1992
1993 1994 -class TLReplaceDisks(Tasklet):
1995 """Replaces disks for an instance. 1996 1997 Note: Locking is not within the scope of this class. 1998 1999 """
2000 - def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name, 2001 remote_node_uuid, disks, early_release, ignore_ipolicy):
2002 """Initializes this class. 2003 2004 """ 2005 Tasklet.__init__(self, lu) 2006 2007 # Parameters 2008 self.instance_uuid = instance_uuid 2009 self.instance_name = instance_name 2010 self.mode = mode 2011 self.iallocator_name = iallocator_name 2012 self.remote_node_uuid = remote_node_uuid 2013 self.disks = disks 2014 self.early_release = early_release 2015 self.ignore_ipolicy = ignore_ipolicy 2016 2017 # Runtime data 2018 self.instance = None 2019 self.new_node_uuid = None 2020 self.target_node_uuid = None 2021 self.other_node_uuid = None 2022 self.remote_node_info = None 2023 self.node_secondary_ip = None
2024 2025 @staticmethod
2026 - def _RunAllocator(lu, iallocator_name, instance_uuid, 2027 relocate_from_node_uuids):
2028 """Compute a new secondary node using an IAllocator. 2029 2030 """ 2031 req = iallocator.IAReqRelocate( 2032 inst_uuid=instance_uuid, 2033 relocate_from_node_uuids=list(relocate_from_node_uuids)) 2034 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req) 2035 2036 ial.Run(iallocator_name) 2037 2038 if not ial.success: 2039 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 2040 " %s" % (iallocator_name, ial.info), 2041 errors.ECODE_NORES) 2042 2043 remote_node_name = ial.result[0] 2044 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name) 2045 2046 if remote_node is None: 2047 raise errors.OpPrereqError("Node %s not found in configuration" % 2048 remote_node_name, errors.ECODE_NOENT) 2049 2050 lu.LogInfo("Selected new secondary for instance '%s': %s", 2051 instance_uuid, remote_node_name) 2052 2053 return remote_node.uuid
2054
2055 - def _FindFaultyDisks(self, node_uuid):
2056 """Wrapper for L{FindFaultyInstanceDisks}. 2057 2058 """ 2059 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, 2060 node_uuid, True)
2061
2062 - def _CheckDisksActivated(self, instance):
2063 """Checks if the instance disks are activated. 2064 2065 @param instance: The instance to check disks 2066 @return: True if they are activated, False otherwise 2067 2068 """ 2069 node_uuids = self.cfg.GetInstanceNodes(instance.uuid) 2070 2071 for idx, dev in enumerate(self.cfg.GetInstanceDisks(instance.uuid)): 2072 for node_uuid in node_uuids: 2073 self.lu.LogInfo("Checking disk/%d on %s", idx, 2074 self.cfg.GetNodeName(node_uuid)) 2075 2076 result = _BlockdevFind(self, node_uuid, dev, instance) 2077 2078 if result.offline: 2079 continue 2080 elif result.fail_msg or not result.payload: 2081 return False 2082 2083 return True
2084
2085 - def CheckPrereq(self):
2086 """Check prerequisites. 2087 2088 This checks that the instance is in the cluster. 2089 2090 """ 2091 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 2092 assert self.instance is not None, \ 2093 "Cannot retrieve locked instance %s" % self.instance_name 2094 2095 if self.instance.disk_template != constants.DT_DRBD8: 2096 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" 2097 " instances", errors.ECODE_INVAL) 2098 2099 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 2100 if len(secondary_nodes) != 1: 2101 raise errors.OpPrereqError("The instance has a strange layout," 2102 " expected one secondary but found %d" % 2103 len(secondary_nodes), 2104 errors.ECODE_FAULT) 2105 2106 secondary_node_uuid = secondary_nodes[0] 2107 2108 if self.iallocator_name is None: 2109 remote_node_uuid = self.remote_node_uuid 2110 else: 2111 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name, 2112 self.instance.uuid, 2113 secondary_nodes) 2114 2115 if remote_node_uuid is None: 2116 self.remote_node_info = None 2117 else: 2118 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \ 2119 "Remote node '%s' is not locked" % remote_node_uuid 2120 2121 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid) 2122 assert self.remote_node_info is not None, \ 2123 "Cannot retrieve locked node %s" % remote_node_uuid 2124 2125 if remote_node_uuid == self.instance.primary_node: 2126 raise errors.OpPrereqError("The specified node is the primary node of" 2127 " the instance", errors.ECODE_INVAL) 2128 2129 if remote_node_uuid == secondary_node_uuid: 2130 raise errors.OpPrereqError("The specified node is already the" 2131 " secondary node of the instance", 2132 errors.ECODE_INVAL) 2133 2134 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, 2135 constants.REPLACE_DISK_CHG): 2136 raise errors.OpPrereqError("Cannot specify disks to be replaced", 2137 errors.ECODE_INVAL) 2138 2139 if self.mode == constants.REPLACE_DISK_AUTO: 2140 if not self._CheckDisksActivated(self.instance): 2141 raise errors.OpPrereqError("Please run activate-disks on instance %s" 2142 " first" % self.instance_name, 2143 errors.ECODE_STATE) 2144 faulty_primary = self._FindFaultyDisks(self.instance.primary_node) 2145 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid) 2146 2147 if faulty_primary and faulty_secondary: 2148 raise errors.OpPrereqError("Instance %s has faulty disks on more than" 2149 " one node and can not be repaired" 2150 " automatically" % self.instance_name, 2151 errors.ECODE_STATE) 2152 2153 if faulty_primary: 2154 self.disks = faulty_primary 2155 self.target_node_uuid = self.instance.primary_node 2156 self.other_node_uuid = secondary_node_uuid 2157 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2158 elif faulty_secondary: 2159 self.disks = faulty_secondary 2160 self.target_node_uuid = secondary_node_uuid 2161 self.other_node_uuid = self.instance.primary_node 2162 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2163 else: 2164 self.disks = [] 2165 check_nodes = [] 2166 2167 else: 2168 # Non-automatic modes 2169 if self.mode == constants.REPLACE_DISK_PRI: 2170 self.target_node_uuid = self.instance.primary_node 2171 self.other_node_uuid = secondary_node_uuid 2172 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2173 2174 elif self.mode == constants.REPLACE_DISK_SEC: 2175 self.target_node_uuid = secondary_node_uuid 2176 self.other_node_uuid = self.instance.primary_node 2177 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2178 2179 elif self.mode == constants.REPLACE_DISK_CHG: 2180 self.new_node_uuid = remote_node_uuid 2181 self.other_node_uuid = self.instance.primary_node 2182 self.target_node_uuid = secondary_node_uuid 2183 check_nodes = [self.new_node_uuid, self.other_node_uuid] 2184 2185 CheckNodeNotDrained(self.lu, remote_node_uuid) 2186 CheckNodeVmCapable(self.lu, remote_node_uuid) 2187 2188 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid) 2189 assert old_node_info is not None 2190 if old_node_info.offline and not self.early_release: 2191 # doesn't make sense to delay the release 2192 self.early_release = True 2193 self.lu.LogInfo("Old secondary %s is offline, automatically enabling" 2194 " early-release mode", secondary_node_uuid) 2195 2196 else: 2197 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % 2198 self.mode) 2199 2200 # If not specified all disks should be replaced 2201 if not self.disks: 2202 self.disks = range(len(self.instance.disks)) 2203 2204 # TODO: This is ugly, but right now we can't distinguish between internal 2205 # submitted opcode and external one. We should fix that. 2206 if self.remote_node_info: 2207 # We change the node, lets verify it still meets instance policy 2208 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) 2209 cluster = self.cfg.GetClusterInfo() 2210 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 2211 new_group_info) 2212 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, 2213 self.remote_node_info, self.cfg, 2214 ignore=self.ignore_ipolicy) 2215 2216 for node_uuid in check_nodes: 2217 CheckNodeOnline(self.lu, node_uuid) 2218 2219 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid, 2220 self.other_node_uuid, 2221 self.target_node_uuid] 2222 if node_uuid is not None) 2223 2224 # Release unneeded node and node resource locks 2225 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) 2226 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) 2227 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) 2228 2229 # Release any owned node group 2230 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) 2231 2232 # Check whether disks are valid 2233 for disk_idx in self.disks: 2234 self.instance.FindDisk(disk_idx) 2235 2236 # Get secondary node IP addresses 2237 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node) 2238 in self.cfg.GetMultiNodeInfo(touched_nodes))
2239
2240 - def Exec(self, feedback_fn):
2241 """Execute disk replacement. 2242 2243 This dispatches the disk replacement to the appropriate handler. 2244 2245 """ 2246 if __debug__: 2247 # Verify owned locks before starting operation 2248 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) 2249 assert set(owned_nodes) == set(self.node_secondary_ip), \ 2250 ("Incorrect node locks, owning %s, expected %s" % 2251 (owned_nodes, self.node_secondary_ip.keys())) 2252 assert (self.lu.owned_locks(locking.LEVEL_NODE) == 2253 self.lu.owned_locks(locking.LEVEL_NODE_RES)) 2254 2255 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) 2256 assert list(owned_instances) == [self.instance_name], \ 2257 "Instance '%s' not locked" % self.instance_name 2258 2259 if not self.disks: 2260 feedback_fn("No disks need replacement for instance '%s'" % 2261 self.instance.name) 2262 return 2263 2264 feedback_fn("Replacing disk(s) %s for instance '%s'" % 2265 (utils.CommaJoin(self.disks), self.instance.name)) 2266 feedback_fn("Current primary node: %s" % 2267 self.cfg.GetNodeName(self.instance.primary_node)) 2268 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 2269 feedback_fn("Current secondary node: %s" % 2270 utils.CommaJoin(self.cfg.GetNodeNames(secondary_nodes))) 2271 2272 activate_disks = not self.instance.disks_active 2273 2274 # Activate the instance disks if we're replacing them on a down instance 2275 if activate_disks: 2276 StartInstanceDisks(self.lu, self.instance, True) 2277 # Re-read the instance object modified by the previous call 2278 self.instance = self.cfg.GetInstanceInfo(self.instance.uuid) 2279 2280 try: 2281 # Should we replace the secondary node? 2282 if self.new_node_uuid is not None: 2283 fn = self._ExecDrbd8Secondary 2284 else: 2285 fn = self._ExecDrbd8DiskOnly 2286 2287 result = fn(feedback_fn) 2288 finally: 2289 # Deactivate the instance disks if we're replacing them on a 2290 # down instance 2291 if activate_disks: 2292 _SafeShutdownInstanceDisks(self.lu, self.instance, 2293 req_states=INSTANCE_NOT_RUNNING) 2294 2295 assert not self.lu.owned_locks(locking.LEVEL_NODE) 2296 2297 if __debug__: 2298 # Verify owned locks 2299 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES) 2300 nodes = frozenset(self.node_secondary_ip) 2301 assert ((self.early_release and not owned_nodes) or 2302 (not self.early_release and not (set(owned_nodes) - nodes))), \ 2303 ("Not owning the correct locks, early_release=%s, owned=%r," 2304 " nodes=%r" % (self.early_release, owned_nodes, nodes)) 2305 2306 return result
2307
2308 - def _CheckVolumeGroup(self, node_uuids):
2309 self.lu.LogInfo("Checking volume groups") 2310 2311 vgname = self.cfg.GetVGName() 2312 2313 # Make sure volume group exists on all involved nodes 2314 results = self.rpc.call_vg_list(node_uuids) 2315 if not results: 2316 raise errors.OpExecError("Can't list volume groups on the nodes") 2317 2318 for node_uuid in node_uuids: 2319 res = results[node_uuid] 2320 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid)) 2321 if vgname not in res.payload: 2322 raise errors.OpExecError("Volume group '%s' not found on node %s" % 2323 (vgname, self.cfg.GetNodeName(node_uuid)))
2324
2325 - def _CheckDisksExistence(self, node_uuids):
2326 # Check disk existence 2327 for idx, dev in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)): 2328 if idx not in self.disks: 2329 continue 2330 2331 for node_uuid in node_uuids: 2332 self.lu.LogInfo("Checking disk/%d on %s", idx, 2333 self.cfg.GetNodeName(node_uuid)) 2334 2335 result = _BlockdevFind(self, node_uuid, dev, self.instance) 2336 2337 msg = result.fail_msg 2338 if msg or not result.payload: 2339 if not msg: 2340 msg = "disk not found" 2341 if not self._CheckDisksActivated(self.instance): 2342 extra_hint = ("\nDisks seem to be not properly activated. Try" 2343 " running activate-disks on the instance before" 2344 " using replace-disks.") 2345 else: 2346 extra_hint = "" 2347 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" % 2348 (idx, self.cfg.GetNodeName(node_uuid), msg, 2349 extra_hint))
2350
2351 - def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2352 for idx, dev in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)): 2353 if idx not in self.disks: 2354 continue 2355 2356 self.lu.LogInfo("Checking disk/%d consistency on node %s" % 2357 (idx, self.cfg.GetNodeName(node_uuid))) 2358 2359 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid, 2360 on_primary, ldisk=ldisk): 2361 raise errors.OpExecError("Node %s has degraded storage, unsafe to" 2362 " replace disks for instance %s" % 2363 (self.cfg.GetNodeName(node_uuid), 2364 self.instance.name))
2365
2366 - def _CreateNewStorage(self, node_uuid):
2367 """Create new storage on the primary or secondary node. 2368 2369 This is only used for same-node replaces, not for changing the 2370 secondary node, hence we don't want to modify the existing disk. 2371 2372 """ 2373 iv_names = {} 2374 2375 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 2376 disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg) 2377 for idx, dev in enumerate(disks): 2378 if idx not in self.disks: 2379 continue 2380 2381 self.lu.LogInfo("Adding storage on %s for disk/%d", 2382 self.cfg.GetNodeName(node_uuid), idx) 2383 2384 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] 2385 names = _GenerateUniqueNames(self.lu, lv_names) 2386 2387 (data_disk, meta_disk) = dev.children 2388 vg_data = data_disk.logical_id[0] 2389 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size, 2390 logical_id=(vg_data, names[0]), 2391 params=data_disk.params) 2392 vg_meta = meta_disk.logical_id[0] 2393 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN, 2394 size=constants.DRBD_META_SIZE, 2395 logical_id=(vg_meta, names[1]), 2396 params=meta_disk.params) 2397 2398 new_lvs = [lv_data, lv_meta] 2399 old_lvs = [child.Copy() for child in dev.children] 2400 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) 2401 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid) 2402 2403 # we pass force_create=True to force the LVM creation 2404 for new_lv in new_lvs: 2405 try: 2406 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True, 2407 GetInstanceInfoText(self.instance), False, 2408 excl_stor) 2409 except errors.DeviceCreationError, e: 2410 raise errors.OpExecError("Can't create block device: %s" % e.message) 2411 2412 return iv_names
2413
2414 - def _CheckDevices(self, node_uuid, iv_names):
2415 for name, (dev, _, _) in iv_names.iteritems(): 2416 result = _BlockdevFind(self, node_uuid, dev, self.instance) 2417 2418 msg = result.fail_msg 2419 if msg or not result.payload: 2420 if not msg: 2421 msg = "disk not found" 2422 raise errors.OpExecError("Can't find DRBD device %s: %s" % 2423 (name, msg)) 2424 2425 if result.payload.is_degraded: 2426 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2427
2428 - def _RemoveOldStorage(self, node_uuid, iv_names):
2429 for name, (_, old_lvs, _) in iv_names.iteritems(): 2430 self.lu.LogInfo("Remove logical volumes for %s", name) 2431 2432 for lv in old_lvs: 2433 msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \ 2434 .fail_msg 2435 if msg: 2436 self.lu.LogWarning("Can't remove old LV: %s", msg, 2437 hint="remove unused LVs manually")
2438
2439 - def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2440 """Replace a disk on the primary or secondary for DRBD 8. 2441 2442 The algorithm for replace is quite complicated: 2443 2444 1. for each disk to be replaced: 2445 2446 1. create new LVs on the target node with unique names 2447 1. detach old LVs from the drbd device 2448 1. rename old LVs to name_replaced.<time_t> 2449 1. rename new LVs to old LVs 2450 1. attach the new LVs (with the old names now) to the drbd device 2451 2452 1. wait for sync across all devices 2453 2454 1. for each modified disk: 2455 2456 1. remove old LVs (which have the name name_replaces.<time_t>) 2457 2458 Failures are not very well handled. 2459 2460 """ 2461 steps_total = 6 2462 2463 # Step: check device activation 2464 self.lu.LogStep(1, steps_total, "Check device existence") 2465 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid]) 2466 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid]) 2467 2468 # Step: check other node consistency 2469 self.lu.LogStep(2, steps_total, "Check peer consistency") 2470 self._CheckDisksConsistency( 2471 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node, 2472 False) 2473 2474 # Step: create new storage 2475 self.lu.LogStep(3, steps_total, "Allocate new storage") 2476 iv_names = self._CreateNewStorage(self.target_node_uuid) 2477 2478 # Step: for each lv, detach+rename*2+attach 2479 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2480 for dev, old_lvs, new_lvs in iv_names.itervalues(): 2481 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name) 2482 2483 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, 2484 (dev, self.instance), 2485 (old_lvs, self.instance)) 2486 result.Raise("Can't detach drbd from local storage on node" 2487 " %s for device %s" % 2488 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name)) 2489 #dev.children = [] 2490 #cfg.Update(instance) 2491 2492 # ok, we created the new LVs, so now we know we have the needed 2493 # storage; as such, we proceed on the target node to rename 2494 # old_lv to _old, and new_lv to old_lv; note that we rename LVs 2495 # using the assumption that logical_id == unique_id on that node 2496 2497 # FIXME(iustin): use a better name for the replaced LVs 2498 temp_suffix = int(time.time()) 2499 ren_fn = lambda d, suff: (d.logical_id[0], 2500 d.logical_id[1] + "_replaced-%s" % suff) 2501 2502 # Build the rename list based on what LVs exist on the node 2503 rename_old_to_new = [] 2504 for to_ren in old_lvs: 2505 result = self.rpc.call_blockdev_find(self.target_node_uuid, 2506 (to_ren, self.instance)) 2507 if not result.fail_msg and result.payload: 2508 # device exists 2509 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) 2510 2511 self.lu.LogInfo("Renaming the old LVs on the target node") 2512 result = self.rpc.call_blockdev_rename(self.target_node_uuid, 2513 rename_old_to_new) 2514 result.Raise("Can't rename old LVs on node %s" % 2515 self.cfg.GetNodeName(self.target_node_uuid)) 2516 2517 # Now we rename the new LVs to the old LVs 2518 self.lu.LogInfo("Renaming the new LVs on the target node") 2519 rename_new_to_old = [(new, old.logical_id) 2520 for old, new in zip(old_lvs, new_lvs)] 2521 result = self.rpc.call_blockdev_rename(self.target_node_uuid, 2522 rename_new_to_old) 2523 result.Raise("Can't rename new LVs on node %s" % 2524 self.cfg.GetNodeName(self.target_node_uuid)) 2525 2526 # Intermediate steps of in memory modifications 2527 for old, new in zip(old_lvs, new_lvs): 2528 new.logical_id = old.logical_id 2529 2530 # We need to modify old_lvs so that removal later removes the 2531 # right LVs, not the newly added ones; note that old_lvs is a 2532 # copy here 2533 for disk in old_lvs: 2534 disk.logical_id = ren_fn(disk, temp_suffix) 2535 2536 # Now that the new lvs have the old name, we can add them to the device 2537 self.lu.LogInfo("Adding new mirror component on %s", 2538 self.cfg.GetNodeName(self.target_node_uuid)) 2539 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid, 2540 (dev, self.instance), 2541 (new_lvs, self.instance)) 2542 msg = result.fail_msg 2543 if msg: 2544 for new_lv in new_lvs: 2545 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid, 2546 (new_lv, self.instance)).fail_msg 2547 if msg2: 2548 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, 2549 hint=("cleanup manually the unused logical" 2550 "volumes")) 2551 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) 2552 2553 cstep = itertools.count(5) 2554 2555 if self.early_release: 2556 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2557 self._RemoveOldStorage(self.target_node_uuid, iv_names) 2558 # TODO: Check if releasing locks early still makes sense 2559 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2560 else: 2561 # Release all resource locks except those used by the instance 2562 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2563 keep=self.node_secondary_ip.keys()) 2564 2565 # Release all node locks while waiting for sync 2566 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2567 2568 # TODO: Can the instance lock be downgraded here? Take the optional disk 2569 # shutdown in the caller into consideration. 2570 2571 # Wait for sync 2572 # This can fail as the old devices are degraded and _WaitForSync 2573 # does a combined result over all disks, so we don't check its return value 2574 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2575 WaitForSync(self.lu, self.instance) 2576 2577 # Check all devices manually 2578 self._CheckDevices(self.instance.primary_node, iv_names) 2579 2580 # Step: remove old storage 2581 if not self.early_release: 2582 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2583 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2584
2585 - def _ExecDrbd8Secondary(self, feedback_fn):
2586 """Replace the secondary node for DRBD 8. 2587 2588 The algorithm for replace is quite complicated: 2589 - for all disks of the instance: 2590 - create new LVs on the new node with same names 2591 - shutdown the drbd device on the old secondary 2592 - disconnect the drbd network on the primary 2593 - create the drbd device on the new secondary 2594 - network attach the drbd on the primary, using an artifice: 2595 the drbd code for Attach() will connect to the network if it 2596 finds a device which is connected to the good local disks but 2597 not network enabled 2598 - wait for sync across all devices 2599 - remove all disks from the old secondary 2600 2601 Failures are not very well handled. 2602 2603 """ 2604 steps_total = 6 2605 2606 pnode = self.instance.primary_node 2607 2608 # Step: check device activation 2609 self.lu.LogStep(1, steps_total, "Check device existence") 2610 self._CheckDisksExistence([self.instance.primary_node]) 2611 self._CheckVolumeGroup([self.instance.primary_node]) 2612 2613 # Step: check other node consistency 2614 self.lu.LogStep(2, steps_total, "Check peer consistency") 2615 self._CheckDisksConsistency(self.instance.primary_node, True, True) 2616 2617 # Step: create new storage 2618 self.lu.LogStep(3, steps_total, "Allocate new storage") 2619 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 2620 disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg) 2621 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, 2622 self.new_node_uuid) 2623 for idx, dev in enumerate(disks): 2624 self.lu.LogInfo("Adding new local storage on %s for disk/%d" % 2625 (self.cfg.GetNodeName(self.new_node_uuid), idx)) 2626 # we pass force_create=True to force LVM creation 2627 for new_lv in dev.children: 2628 try: 2629 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance, 2630 new_lv, True, GetInstanceInfoText(self.instance), 2631 False, excl_stor) 2632 except errors.DeviceCreationError, e: 2633 raise errors.OpExecError("Can't create block device: %s" % e.message) 2634 2635 # Step 4: dbrd minors and drbd setups changes 2636 # after this, we must manually remove the drbd minors on both the 2637 # error and the success paths 2638 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2639 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid 2640 for _ in inst_disks], 2641 self.instance.uuid) 2642 logging.debug("Allocated minors %r", minors) 2643 2644 iv_names = {} 2645 for idx, (dev, new_minor) in enumerate(zip(inst_disks, minors)): 2646 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % 2647 (self.cfg.GetNodeName(self.new_node_uuid), idx)) 2648 # create new devices on new_node; note that we create two IDs: 2649 # one without port, so the drbd will be activated without 2650 # networking information on the new node at this stage, and one 2651 # with network, for the latter activation in step 4 2652 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id 2653 if self.instance.primary_node == o_node1: 2654 p_minor = o_minor1 2655 else: 2656 assert self.instance.primary_node == o_node2, "Three-node instance?" 2657 p_minor = o_minor2 2658 2659 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None, 2660 p_minor, new_minor, o_secret) 2661 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port, 2662 p_minor, new_minor, o_secret) 2663 2664 iv_names[idx] = (dev, dev.children, new_net_id) 2665 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, 2666 new_net_id) 2667 new_drbd = objects.Disk(dev_type=constants.DT_DRBD8, 2668 logical_id=new_alone_id, 2669 children=dev.children, 2670 size=dev.size, 2671 params={}) 2672 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd], 2673 self.cfg) 2674 try: 2675 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance, 2676 anno_new_drbd, 2677 GetInstanceInfoText(self.instance), False, 2678 excl_stor) 2679 except errors.GenericError: 2680 self.cfg.ReleaseDRBDMinors(self.instance.uuid) 2681 raise 2682 2683 # We have new devices, shutdown the drbd on the old secondary 2684 for idx, dev in enumerate(inst_disks): 2685 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx) 2686 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid, 2687 (dev, self.instance)).fail_msg 2688 if msg: 2689 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" 2690 "node: %s" % (idx, msg), 2691 hint=("Please cleanup this device manually as" 2692 " soon as possible")) 2693 2694 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") 2695 result = self.rpc.call_drbd_disconnect_net( 2696 [pnode], (inst_disks, self.instance))[pnode] 2697 2698 msg = result.fail_msg 2699 if msg: 2700 # detaches didn't succeed (unlikely) 2701 self.cfg.ReleaseDRBDMinors(self.instance.uuid) 2702 raise errors.OpExecError("Can't detach the disks from the network on" 2703 " old node: %s" % (msg,)) 2704 2705 # if we managed to detach at least one, we update all the disks of 2706 # the instance to point to the new secondary 2707 self.lu.LogInfo("Updating instance configuration") 2708 for dev, _, new_logical_id in iv_names.itervalues(): 2709 dev.logical_id = new_logical_id 2710 self.cfg.Update(dev, feedback_fn) 2711 2712 self.cfg.Update(self.instance, feedback_fn) 2713 2714 # Release all node locks (the configuration has been updated) 2715 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2716 2717 # and now perform the drbd attach 2718 self.lu.LogInfo("Attaching primary drbds to new secondary" 2719 " (standalone => connected)") 2720 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 2721 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, 2722 self.new_node_uuid], 2723 (inst_disks, self.instance), 2724 self.instance.name, 2725 False) 2726 for to_node, to_result in result.items(): 2727 msg = to_result.fail_msg 2728 if msg: 2729 raise errors.OpExecError( 2730 "Can't attach drbd disks on node %s: %s (please do a gnt-instance " 2731 "info %s to see the status of disks)" % 2732 (self.cfg.GetNodeName(to_node), msg, self.instance.name)) 2733 2734 cstep = itertools.count(5) 2735 2736 if self.early_release: 2737 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2738 self._RemoveOldStorage(self.target_node_uuid, iv_names) 2739 # TODO: Check if releasing locks early still makes sense 2740 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2741 else: 2742 # Release all resource locks except those used by the instance 2743 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2744 keep=self.node_secondary_ip.keys()) 2745 2746 # TODO: Can the instance lock be downgraded here? Take the optional disk 2747 # shutdown in the caller into consideration. 2748 2749 # Wait for sync 2750 # This can fail as the old devices are degraded and _WaitForSync 2751 # does a combined result over all disks, so we don't check its return value 2752 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2753 WaitForSync(self.lu, self.instance) 2754 2755 # Check all devices manually 2756 self._CheckDevices(self.instance.primary_node, iv_names) 2757 2758 # Step: remove old storage 2759 if not self.early_release: 2760 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2761 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2762
2763 2764 -class TemporaryDisk():
2765 """ Creates a new temporary bootable disk, and makes sure it is destroyed. 2766 2767 Is a context manager, and should be used with the ``with`` statement as such. 2768 2769 The disk is guaranteed to be created at index 0, shifting any other disks of 2770 the instance by one place, and allowing the instance to be booted with the 2771 content of the disk. 2772 2773 """ 2774
2775 - def __init__(self, lu, instance, disks, feedback_fn, 2776 shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
2777 """ Constructor storing arguments until used later. 2778 2779 @type lu: L{ganeti.cmdlib.base.LogicalUnit} 2780 @param lu: The LU within which this disk is created. 2781 2782 @type instance: L{ganeti.objects.Instance} 2783 @param instance: The instance to which the disk should be added 2784 2785 @type disks: list of triples (disk template, disk access mode, int) 2786 @param disks: 2787 disk specification, which is a list of triples containing the 2788 disk template (e.g., L{constants.DT_PLAIN}), the disk access 2789 mode (i.e., L{constants.DISK_RDONLY} or L{constants.DISK_RDWR}), 2790 and size in MiB. 2791 2792 @type feedback_fn: function 2793 @param feedback_fn: Function used to log progress 2794 2795 """ 2796 self._lu = lu 2797 self._instance = instance 2798 self._disks = disks 2799 self._feedback_fn = feedback_fn 2800 self._shutdown_timeout = shutdown_timeout
2801
2802 - def _EnsureInstanceDiskState(self):
2803 """ Ensures that the instance is down, and its disks inactive. 2804 2805 All the operations related to the creation and destruction of disks require 2806 that the instance is down and that the disks are inactive. This function is 2807 invoked to make it so. 2808 2809 """ 2810 # The instance needs to be down before any of these actions occur 2811 # Whether it is must be checked manually through a RPC - configuration 2812 # reflects only the desired state 2813 self._feedback_fn("Shutting down instance") 2814 result = self._lu.rpc.call_instance_shutdown(self._instance.primary_node, 2815 self._instance, 2816 self._shutdown_timeout, 2817 self._lu.op.reason) 2818 result.Raise("Shutdown of instance '%s' while removing temporary disk " 2819 "failed" % self._instance.name) 2820 2821 # Disks need to be deactivated prior to being removed 2822 # The disks_active configuration entry should match the actual state 2823 if self._instance.disks_active: 2824 self._feedback_fn("Deactivating disks") 2825 ShutdownInstanceDisks(self._lu, self._instance)
2826
2827 - def __enter__(self):
2828 """ Context manager entry function, creating the disk. 2829 2830 @rtype: L{ganeti.objects.Disk} 2831 @return: The disk object created. 2832 2833 """ 2834 self._EnsureInstanceDiskState() 2835 2836 new_disks = [] 2837 2838 # The iv_name of the disk intentionally diverges from Ganeti's standards, as 2839 # this disk should be very temporary and its presence should be reported. 2840 # With the special iv_name, gnt-cluster verify detects the disk and warns 2841 # the user of its presence. Removing the disk restores the instance to its 2842 # proper state, despite an error that appears when the removal is performed. 2843 for idx, (disk_template, disk_access, disk_size) in enumerate(self._disks): 2844 new_disk = objects.Disk() 2845 new_disk.dev_type = disk_template 2846 new_disk.mode = disk_access 2847 new_disk.uuid = self._lu.cfg.GenerateUniqueID(self._lu.proc.GetECId()) 2848 new_disk.logical_id = (self._lu.cfg.GetVGName(), new_disk.uuid) 2849 new_disk.params = {} 2850 new_disk.size = disk_size 2851 2852 new_disks.append(new_disk) 2853 2854 self._feedback_fn("Attempting to create temporary disk") 2855 2856 self._undoing_info = CreateDisks(self._lu, self._instance, disks=new_disks) 2857 for idx, new_disk in enumerate(new_disks): 2858 self._lu.cfg.AddInstanceDisk(self._instance.uuid, new_disk, idx=idx) 2859 self._instance = self._lu.cfg.GetInstanceInfo(self._instance.uuid) 2860 2861 self._feedback_fn("Temporary disk created") 2862 2863 self._new_disks = new_disks 2864 2865 return new_disks
2866
2867 - def __exit__(self, exc_type, _value, _traceback):
2868 """ Context manager exit function, destroying the disk. 2869 2870 """ 2871 if exc_type: 2872 self._feedback_fn("Exception raised, cleaning up temporary disk") 2873 else: 2874 self._feedback_fn("Regular cleanup of temporary disk") 2875 2876 try: 2877 self._EnsureInstanceDiskState() 2878 2879 _UndoCreateDisks(self._lu, self._undoing_info, self._instance) 2880 2881 for disk in self._new_disks: 2882 self._lu.cfg.RemoveInstanceDisk(self._instance.uuid, disk.uuid) 2883 self._instance = self._lu.cfg.GetInstanceInfo(self._instance.uuid) 2884 2885 self._feedback_fn("Temporary disk removed") 2886 except: 2887 self._feedback_fn("Disk cleanup failed; it will have to be removed " 2888 "manually") 2889 raise
2890