Package ganeti :: Package cmdlib :: Module instance_storage
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_storage

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with storage of instances.""" 
  32   
  33  import itertools 
  34  import logging 
  35  import os 
  36  import time 
  37   
  38  from ganeti import compat 
  39  from ganeti import constants 
  40  from ganeti import errors 
  41  from ganeti import ht 
  42  from ganeti import locking 
  43  from ganeti.masterd import iallocator 
  44  from ganeti import objects 
  45  from ganeti import utils 
  46  import ganeti.rpc.node as rpc 
  47  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet 
  48  from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \ 
  49    AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \ 
  50    ComputeIPolicyDiskSizesViolation, \ 
  51    CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \ 
  52    IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \ 
  53    CheckDiskTemplateEnabled 
  54  from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \ 
  55    CopyLockList, ReleaseLocks, CheckNodeVmCapable, \ 
  56    BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy 
  57   
  58  import ganeti.masterd.instance 
  59   
  60   
  61  _DISK_TEMPLATE_NAME_PREFIX = { 
  62    constants.DT_PLAIN: "", 
  63    constants.DT_RBD: ".rbd", 
  64    constants.DT_EXT: ".ext", 
  65    constants.DT_FILE: ".file", 
  66    constants.DT_SHARED_FILE: ".sharedfile", 
  67    } 
68 69 70 -def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open, 71 excl_stor):
72 """Create a single block device on a given node. 73 74 This will not recurse over children of the device, so they must be 75 created in advance. 76 77 @param lu: the lu on whose behalf we execute 78 @param node_uuid: the node on which to create the device 79 @type instance: L{objects.Instance} 80 @param instance: the instance which owns the device 81 @type device: L{objects.Disk} 82 @param device: the device to create 83 @param info: the extra 'metadata' we should attach to the device 84 (this will be represented as a LVM tag) 85 @type force_open: boolean 86 @param force_open: this parameter will be passes to the 87 L{backend.BlockdevCreate} function where it specifies 88 whether we run on primary or not, and it affects both 89 the child assembly and the device own Open() execution 90 @type excl_stor: boolean 91 @param excl_stor: Whether exclusive_storage is active for the node 92 93 """ 94 result = lu.rpc.call_blockdev_create(node_uuid, (device, instance), 95 device.size, instance.name, force_open, 96 info, excl_stor) 97 result.Raise("Can't create block device %s on" 98 " node %s for instance %s" % (device, 99 lu.cfg.GetNodeName(node_uuid), 100 instance.name))
101
102 103 -def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create, 104 info, force_open, excl_stor):
105 """Create a tree of block devices on a given node. 106 107 If this device type has to be created on secondaries, create it and 108 all its children. 109 110 If not, just recurse to children keeping the same 'force' value. 111 112 @attention: The device has to be annotated already. 113 114 @param lu: the lu on whose behalf we execute 115 @param node_uuid: the node on which to create the device 116 @type instance: L{objects.Instance} 117 @param instance: the instance which owns the device 118 @type device: L{objects.Disk} 119 @param device: the device to create 120 @type force_create: boolean 121 @param force_create: whether to force creation of this device; this 122 will be change to True whenever we find a device which has 123 CreateOnSecondary() attribute 124 @param info: the extra 'metadata' we should attach to the device 125 (this will be represented as a LVM tag) 126 @type force_open: boolean 127 @param force_open: this parameter will be passes to the 128 L{backend.BlockdevCreate} function where it specifies 129 whether we run on primary or not, and it affects both 130 the child assembly and the device own Open() execution 131 @type excl_stor: boolean 132 @param excl_stor: Whether exclusive_storage is active for the node 133 134 @return: list of created devices 135 """ 136 created_devices = [] 137 try: 138 if device.CreateOnSecondary(): 139 force_create = True 140 141 if device.children: 142 for child in device.children: 143 devs = _CreateBlockDevInner(lu, node_uuid, instance, child, 144 force_create, info, force_open, excl_stor) 145 created_devices.extend(devs) 146 147 if not force_create: 148 return created_devices 149 150 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open, 151 excl_stor) 152 # The device has been completely created, so there is no point in keeping 153 # its subdevices in the list. We just add the device itself instead. 154 created_devices = [(node_uuid, device)] 155 return created_devices 156 157 except errors.DeviceCreationError, e: 158 e.created_devices.extend(created_devices) 159 raise e 160 except errors.OpExecError, e: 161 raise errors.DeviceCreationError(str(e), created_devices)
162
163 164 -def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
165 """Whether exclusive_storage is in effect for the given node. 166 167 @type cfg: L{config.ConfigWriter} 168 @param cfg: The cluster configuration 169 @type node_uuid: string 170 @param node_uuid: The node UUID 171 @rtype: bool 172 @return: The effective value of exclusive_storage 173 @raise errors.OpPrereqError: if no node exists with the given name 174 175 """ 176 ni = cfg.GetNodeInfo(node_uuid) 177 if ni is None: 178 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid, 179 errors.ECODE_NOENT) 180 return IsExclusiveStorageEnabledNode(cfg, ni)
181
182 183 -def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info, 184 force_open):
185 """Wrapper around L{_CreateBlockDevInner}. 186 187 This method annotates the root device first. 188 189 """ 190 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg) 191 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid) 192 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info, 193 force_open, excl_stor)
194
195 196 -def _UndoCreateDisks(lu, disks_created, instance):
197 """Undo the work performed by L{CreateDisks}. 198 199 This function is called in case of an error to undo the work of 200 L{CreateDisks}. 201 202 @type lu: L{LogicalUnit} 203 @param lu: the logical unit on whose behalf we execute 204 @param disks_created: the result returned by L{CreateDisks} 205 @type instance: L{objects.Instance} 206 @param instance: the instance for which disks were created 207 208 """ 209 for (node_uuid, disk) in disks_created: 210 result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance)) 211 result.Warn("Failed to remove newly-created disk %s on node %s" % 212 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
213
214 215 -def CreateDisks(lu, instance, disk_template=None, 216 to_skip=None, target_node_uuid=None, disks=None):
217 """Create all disks for an instance. 218 219 This abstracts away some work from AddInstance. 220 221 Since the instance may not have been saved to the config file yet, this 222 function can not query the config file for the instance's disks; in that 223 case they need to be passed as an argument. 224 225 This function is also used by the disk template conversion mechanism to 226 create the new disks of the instance. Since the instance will have the 227 old template at the time we create the new disks, the new template must 228 be passed as an extra argument. 229 230 @type lu: L{LogicalUnit} 231 @param lu: the logical unit on whose behalf we execute 232 @type instance: L{objects.Instance} 233 @param instance: the instance whose disks we should create 234 @type disk_template: string 235 @param disk_template: if provided, overrides the instance's disk_template 236 @type to_skip: list 237 @param to_skip: list of indices to skip 238 @type target_node_uuid: string 239 @param target_node_uuid: if passed, overrides the target node for creation 240 @type disks: list of {objects.Disk} 241 @param disks: the disks to create; if not specified, all the disks of the 242 instance are created 243 @return: information about the created disks, to be used to call 244 L{_UndoCreateDisks} 245 @raise errors.OpPrereqError: in case of error 246 247 """ 248 info = GetInstanceInfoText(instance) 249 250 if disks is None: 251 disks = lu.cfg.GetInstanceDisks(instance.uuid) 252 253 if target_node_uuid is None: 254 pnode_uuid = instance.primary_node 255 # We cannot use config's 'GetInstanceNodes' here as 'CreateDisks' 256 # is used by 'LUInstanceCreate' and the instance object is not 257 # stored in the config yet. 258 all_node_uuids = [] 259 for disk in disks: 260 all_node_uuids.extend(disk.all_nodes) 261 all_node_uuids = set(all_node_uuids) 262 # ensure that primary node is always the first 263 all_node_uuids.discard(pnode_uuid) 264 all_node_uuids = [pnode_uuid] + list(all_node_uuids) 265 else: 266 pnode_uuid = target_node_uuid 267 all_node_uuids = [pnode_uuid] 268 269 if disk_template is None: 270 disk_template = instance.disk_template 271 272 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), disk_template) 273 274 if disk_template in constants.DTS_FILEBASED: 275 file_storage_dir = os.path.dirname(disks[0].logical_id[1]) 276 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir) 277 278 result.Raise("Failed to create directory '%s' on" 279 " node %s" % (file_storage_dir, 280 lu.cfg.GetNodeName(pnode_uuid))) 281 282 disks_created = [] 283 for idx, device in enumerate(disks): 284 if to_skip and idx in to_skip: 285 continue 286 logging.info("Creating disk %s for instance '%s'", idx, instance.name) 287 for node_uuid in all_node_uuids: 288 f_create = node_uuid == pnode_uuid 289 try: 290 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info, 291 f_create) 292 disks_created.append((node_uuid, device)) 293 except errors.DeviceCreationError, e: 294 logging.warning("Creating disk %s for instance '%s' failed", 295 idx, instance.name) 296 disks_created.extend(e.created_devices) 297 _UndoCreateDisks(lu, disks_created, instance) 298 raise errors.OpExecError(e.message) 299 return disks_created
300
301 302 -def ComputeDiskSizePerVG(disk_template, disks):
303 """Compute disk size requirements in the volume group 304 305 """ 306 def _compute(disks, payload): 307 """Universal algorithm. 308 309 """ 310 vgs = {} 311 for disk in disks: 312 vg_name = disk[constants.IDISK_VG] 313 vgs[vg_name] = \ 314 vgs.get(vg_name, 0) + disk[constants.IDISK_SIZE] + payload 315 316 return vgs
317 318 # Required free disk space as a function of disk and swap space 319 req_size_dict = { 320 constants.DT_DISKLESS: {}, 321 constants.DT_PLAIN: _compute(disks, 0), 322 # 128 MB are added for drbd metadata for each disk 323 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE), 324 constants.DT_FILE: {}, 325 constants.DT_SHARED_FILE: {}, 326 constants.DT_GLUSTER: {}, 327 } 328 329 if disk_template not in req_size_dict: 330 raise errors.ProgrammerError("Disk template '%s' size requirement" 331 " is unknown" % disk_template) 332 333 return req_size_dict[disk_template] 334
335 336 -def ComputeDisks(disks, disk_template, default_vg):
337 """Computes the instance disks. 338 339 @type disks: list of dictionaries 340 @param disks: The disks' input dictionary 341 @type disk_template: string 342 @param disk_template: The disk template of the instance 343 @type default_vg: string 344 @param default_vg: The default_vg to assume 345 346 @return: The computed disks 347 348 """ 349 new_disks = [] 350 for disk in disks: 351 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR) 352 if mode not in constants.DISK_ACCESS_SET: 353 raise errors.OpPrereqError("Invalid disk access mode '%s'" % 354 mode, errors.ECODE_INVAL) 355 size = disk.get(constants.IDISK_SIZE, None) 356 if size is None: 357 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) 358 try: 359 size = int(size) 360 except (TypeError, ValueError): 361 raise errors.OpPrereqError("Invalid disk size '%s'" % size, 362 errors.ECODE_INVAL) 363 364 ext_provider = disk.get(constants.IDISK_PROVIDER, None) 365 if ext_provider and disk_template != constants.DT_EXT: 366 raise errors.OpPrereqError("The '%s' option is only valid for the %s" 367 " disk template, not %s" % 368 (constants.IDISK_PROVIDER, constants.DT_EXT, 369 disk_template), errors.ECODE_INVAL) 370 371 data_vg = disk.get(constants.IDISK_VG, default_vg) 372 name = disk.get(constants.IDISK_NAME, None) 373 if name is not None and name.lower() == constants.VALUE_NONE: 374 name = None 375 new_disk = { 376 constants.IDISK_SIZE: size, 377 constants.IDISK_MODE: mode, 378 constants.IDISK_VG: data_vg, 379 constants.IDISK_NAME: name, 380 } 381 382 for key in [ 383 constants.IDISK_METAVG, 384 constants.IDISK_ADOPT, 385 constants.IDISK_SPINDLES, 386 ]: 387 if key in disk: 388 new_disk[key] = disk[key] 389 390 # Add IDISK_ACCESS parameter for disk templates that support it 391 if (disk_template in constants.DTS_HAVE_ACCESS and 392 constants.IDISK_ACCESS in disk): 393 new_disk[constants.IDISK_ACCESS] = disk[constants.IDISK_ACCESS] 394 395 # For extstorage, demand the `provider' option and add any 396 # additional parameters (ext-params) to the dict 397 if disk_template == constants.DT_EXT: 398 if ext_provider: 399 new_disk[constants.IDISK_PROVIDER] = ext_provider 400 for key in disk: 401 if key not in constants.IDISK_PARAMS: 402 new_disk[key] = disk[key] 403 else: 404 raise errors.OpPrereqError("Missing provider for template '%s'" % 405 constants.DT_EXT, errors.ECODE_INVAL) 406 407 new_disks.append(new_disk) 408 409 return new_disks
410
411 412 -def ComputeDisksInfo(disks, disk_template, default_vg, ext_params):
413 """Computes the new instance's disks for the template conversion. 414 415 This method is used by the disks template conversion mechanism. Using the 416 'ComputeDisks' method as an auxiliary method computes the disks that will be 417 used for generating the new disk template of the instance. It computes the 418 size, mode, and name parameters from the instance's current disks, such as 419 the volume group and the access parameters for the templates that support 420 them. For conversions targeting an extstorage template, the mandatory 421 provider's name or any user-provided extstorage parameters will also be 422 included in the result. 423 424 @type disks: list of {objects.Disk} 425 @param disks: The current disks of the instance 426 @type disk_template: string 427 @param disk_template: The disk template of the instance 428 @type default_vg: string 429 @param default_vg: The default volume group to assume 430 @type ext_params: dict 431 @param ext_params: The extstorage parameters 432 433 @rtype: list of dictionaries 434 @return: The computed disks' information for the new template 435 436 """ 437 # Ensure 'ext_params' does not violate existing disks' params 438 for key in ext_params.keys(): 439 if key != constants.IDISK_PROVIDER: 440 assert key not in constants.IDISK_PARAMS, \ 441 "Invalid extstorage parameter '%s'" % key 442 443 # Prepare the disks argument for the 'ComputeDisks' method. 444 inst_disks = [dict((key, value) for key, value in disk.iteritems() 445 if key in constants.IDISK_PARAMS) 446 for disk in map(objects.Disk.ToDict, disks)] 447 448 # Update disks with the user-provided 'ext_params'. 449 for disk in inst_disks: 450 disk.update(ext_params) 451 452 # Compute the new disks' information. 453 new_disks = ComputeDisks(inst_disks, disk_template, default_vg) 454 455 # Add missing parameters to the previously computed disks. 456 for disk, new_disk in zip(disks, new_disks): 457 # Conversions between ExtStorage templates allowed only for different 458 # providers. 459 if (disk.dev_type == disk_template and 460 disk_template == constants.DT_EXT): 461 provider = new_disk[constants.IDISK_PROVIDER] 462 if provider == disk.params[constants.IDISK_PROVIDER]: 463 raise errors.OpPrereqError("Not converting, '%s' of type ExtStorage" 464 " already using provider '%s'" % 465 (disk.iv_name, provider), errors.ECODE_INVAL) 466 467 # Add IDISK_ACCESS parameter for conversions between disk templates that 468 # support it. 469 if (disk_template in constants.DTS_HAVE_ACCESS and 470 constants.IDISK_ACCESS in disk.params): 471 new_disk[constants.IDISK_ACCESS] = disk.params[constants.IDISK_ACCESS] 472 473 # For LVM-based conversions (plain <-> drbd) use the same volume group. 474 if disk_template in constants.DTS_LVM: 475 if disk.dev_type == constants.DT_PLAIN: 476 new_disk[constants.IDISK_VG] = disk.logical_id[0] 477 elif disk.dev_type == constants.DT_DRBD8: 478 new_disk[constants.IDISK_VG] = disk.children[0].logical_id[0] 479 480 return new_disks
481
482 483 -def CalculateFileStorageDir(lu):
484 """Calculate final instance file storage dir. 485 486 @type lu: L{LogicalUnit} 487 @param lu: the logical unit on whose behalf we execute 488 489 @rtype: string 490 @return: The file storage directory for the instance 491 492 """ 493 # file storage dir calculation/check 494 instance_file_storage_dir = None 495 if lu.op.disk_template in constants.DTS_FILEBASED: 496 # build the full file storage dir path 497 joinargs = [] 498 499 cfg_storage = None 500 if lu.op.disk_template == constants.DT_FILE: 501 cfg_storage = lu.cfg.GetFileStorageDir() 502 elif lu.op.disk_template == constants.DT_SHARED_FILE: 503 cfg_storage = lu.cfg.GetSharedFileStorageDir() 504 elif lu.op.disk_template == constants.DT_GLUSTER: 505 cfg_storage = lu.cfg.GetGlusterStorageDir() 506 507 if not cfg_storage: 508 raise errors.OpPrereqError( 509 "Cluster file storage dir for {tpl} storage type not defined".format( 510 tpl=repr(lu.op.disk_template) 511 ), 512 errors.ECODE_STATE 513 ) 514 515 joinargs.append(cfg_storage) 516 517 if lu.op.file_storage_dir is not None: 518 joinargs.append(lu.op.file_storage_dir) 519 520 if lu.op.disk_template != constants.DT_GLUSTER: 521 joinargs.append(lu.op.instance_name) 522 523 if len(joinargs) > 1: 524 # pylint: disable=W0142 525 instance_file_storage_dir = utils.PathJoin(*joinargs) 526 else: 527 instance_file_storage_dir = joinargs[0] 528 529 return instance_file_storage_dir
530
531 532 -def CheckRADOSFreeSpace():
533 """Compute disk size requirements inside the RADOS cluster. 534 535 """ 536 # For the RADOS cluster we assume there is always enough space. 537 pass
538
539 540 -def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names, 541 iv_name, p_minor, s_minor):
542 """Generate a drbd8 device complete with its children. 543 544 """ 545 assert len(vgnames) == len(names) == 2 546 port = lu.cfg.AllocatePort() 547 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) 548 549 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size, 550 logical_id=(vgnames[0], names[0]), 551 params={}) 552 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 553 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN, 554 size=constants.DRBD_META_SIZE, 555 logical_id=(vgnames[1], names[1]), 556 params={}) 557 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 558 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size, 559 logical_id=(primary_uuid, secondary_uuid, port, 560 p_minor, s_minor, 561 shared_secret), 562 children=[dev_data, dev_meta], 563 iv_name=iv_name, params={}) 564 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 565 return drbd_dev
566
567 568 -def GenerateDiskTemplate( 569 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids, 570 disk_info, file_storage_dir, file_driver, base_index, 571 feedback_fn, full_disk_params):
572 """Generate the entire disk layout for a given template type. 573 574 """ 575 vgname = lu.cfg.GetVGName() 576 disk_count = len(disk_info) 577 disks = [] 578 579 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name) 580 581 if template_name == constants.DT_DISKLESS: 582 pass 583 elif template_name == constants.DT_DRBD8: 584 if len(secondary_node_uuids) != 1: 585 raise errors.ProgrammerError("Wrong template configuration") 586 remote_node_uuid = secondary_node_uuids[0] 587 minors = lu.cfg.AllocateDRBDMinor( 588 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid) 589 590 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name, 591 full_disk_params) 592 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] 593 594 names = [] 595 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) 596 for i in range(disk_count)]): 597 names.append(lv_prefix + "_data") 598 names.append(lv_prefix + "_meta") 599 for idx, disk in enumerate(disk_info): 600 disk_index = idx + base_index 601 data_vg = disk.get(constants.IDISK_VG, vgname) 602 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg) 603 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid, 604 disk[constants.IDISK_SIZE], 605 [data_vg, meta_vg], 606 names[idx * 2:idx * 2 + 2], 607 "disk/%d" % disk_index, 608 minors[idx * 2], minors[idx * 2 + 1]) 609 disk_dev.mode = disk[constants.IDISK_MODE] 610 disk_dev.name = disk.get(constants.IDISK_NAME, None) 611 disks.append(disk_dev) 612 else: 613 if secondary_node_uuids: 614 raise errors.ProgrammerError("Wrong template configuration") 615 616 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None) 617 if name_prefix is None: 618 names = None 619 else: 620 names = _GenerateUniqueNames(lu, ["%s.disk%s" % 621 (name_prefix, base_index + i) 622 for i in range(disk_count)]) 623 624 if template_name == constants.DT_PLAIN: 625 626 def logical_id_fn(idx, _, disk): 627 vg = disk.get(constants.IDISK_VG, vgname) 628 return (vg, names[idx])
629 630 elif template_name == constants.DT_GLUSTER: 631 logical_id_fn = lambda _1, disk_index, _2: \ 632 (file_driver, "ganeti/%s.%d" % (instance_uuid, 633 disk_index)) 634 635 elif template_name in constants.DTS_FILEBASED: # Gluster handled above 636 logical_id_fn = \ 637 lambda _, disk_index, disk: (file_driver, 638 "%s/%s" % (file_storage_dir, 639 names[idx])) 640 elif template_name == constants.DT_BLOCK: 641 logical_id_fn = \ 642 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL, 643 disk[constants.IDISK_ADOPT]) 644 elif template_name == constants.DT_RBD: 645 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx]) 646 elif template_name == constants.DT_EXT: 647 def logical_id_fn(idx, _, disk): 648 provider = disk.get(constants.IDISK_PROVIDER, None) 649 if provider is None: 650 raise errors.ProgrammerError("Disk template is %s, but '%s' is" 651 " not found", constants.DT_EXT, 652 constants.IDISK_PROVIDER) 653 return (provider, names[idx]) 654 else: 655 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) 656 657 dev_type = template_name 658 659 for idx, disk in enumerate(disk_info): 660 params = {} 661 # Only for the Ext template add disk_info to params 662 if template_name == constants.DT_EXT: 663 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER] 664 for key in disk: 665 if key not in constants.IDISK_PARAMS: 666 params[key] = disk[key] 667 # Add IDISK_ACCESS param to disk params 668 if (template_name in constants.DTS_HAVE_ACCESS and 669 constants.IDISK_ACCESS in disk): 670 params[constants.IDISK_ACCESS] = disk[constants.IDISK_ACCESS] 671 disk_index = idx + base_index 672 size = disk[constants.IDISK_SIZE] 673 feedback_fn("* disk %s, size %s" % 674 (disk_index, utils.FormatUnit(size, "h"))) 675 disk_dev = objects.Disk(dev_type=dev_type, size=size, 676 logical_id=logical_id_fn(idx, disk_index, disk), 677 iv_name="disk/%d" % disk_index, 678 mode=disk[constants.IDISK_MODE], 679 params=params, 680 spindles=disk.get(constants.IDISK_SPINDLES)) 681 disk_dev.name = disk.get(constants.IDISK_NAME, None) 682 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 683 disks.append(disk_dev) 684 685 return disks 686
687 688 -def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
689 """Check the presence of the spindle options with exclusive_storage. 690 691 @type diskdict: dict 692 @param diskdict: disk parameters 693 @type es_flag: bool 694 @param es_flag: the effective value of the exlusive_storage flag 695 @type required: bool 696 @param required: whether spindles are required or just optional 697 @raise errors.OpPrereqError when spindles are given and they should not 698 699 """ 700 if (not es_flag and constants.IDISK_SPINDLES in diskdict and 701 diskdict[constants.IDISK_SPINDLES] is not None): 702 raise errors.OpPrereqError("Spindles in instance disks cannot be specified" 703 " when exclusive storage is not active", 704 errors.ECODE_INVAL) 705 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or 706 diskdict[constants.IDISK_SPINDLES] is None)): 707 raise errors.OpPrereqError("You must specify spindles in instance disks" 708 " when exclusive storage is active", 709 errors.ECODE_INVAL)
710
711 712 -class LUInstanceRecreateDisks(LogicalUnit):
713 """Recreate an instance's missing disks. 714 715 """ 716 HPATH = "instance-recreate-disks" 717 HTYPE = constants.HTYPE_INSTANCE 718 REQ_BGL = False 719 720 _MODIFYABLE = compat.UniqueFrozenset([ 721 constants.IDISK_SIZE, 722 constants.IDISK_MODE, 723 constants.IDISK_SPINDLES, 724 ]) 725 726 # New or changed disk parameters may have different semantics 727 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([ 728 constants.IDISK_ADOPT, 729 730 # TODO: Implement support changing VG while recreating 731 constants.IDISK_VG, 732 constants.IDISK_METAVG, 733 constants.IDISK_PROVIDER, 734 constants.IDISK_NAME, 735 constants.IDISK_ACCESS, 736 ])) 737
738 - def _RunAllocator(self):
739 """Run the allocator based on input opcode. 740 741 """ 742 be_full = self.cfg.GetClusterInfo().FillBE(self.instance) 743 744 # FIXME 745 # The allocator should actually run in "relocate" mode, but current 746 # allocators don't support relocating all the nodes of an instance at 747 # the same time. As a workaround we use "allocate" mode, but this is 748 # suboptimal for two reasons: 749 # - The instance name passed to the allocator is present in the list of 750 # existing instances, so there could be a conflict within the 751 # internal structures of the allocator. This doesn't happen with the 752 # current allocators, but it's a liability. 753 # - The allocator counts the resources used by the instance twice: once 754 # because the instance exists already, and once because it tries to 755 # allocate a new instance. 756 # The allocator could choose some of the nodes on which the instance is 757 # running, but that's not a problem. If the instance nodes are broken, 758 # they should be already be marked as drained or offline, and hence 759 # skipped by the allocator. If instance disks have been lost for other 760 # reasons, then recreating the disks on the same nodes should be fine. 761 disk_template = self.instance.disk_template 762 spindle_use = be_full[constants.BE_SPINDLE_USE] 763 disks = [{ 764 constants.IDISK_SIZE: d.size, 765 constants.IDISK_MODE: d.mode, 766 constants.IDISK_SPINDLES: d.spindles, 767 } for d in self.cfg.GetInstanceDisks(self.instance.uuid)] 768 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name, 769 disk_template=disk_template, 770 group_name=None, 771 tags=list(self.instance.GetTags()), 772 os=self.instance.os, 773 nics=[{}], 774 vcpus=be_full[constants.BE_VCPUS], 775 memory=be_full[constants.BE_MAXMEM], 776 spindle_use=spindle_use, 777 disks=disks, 778 hypervisor=self.instance.hypervisor, 779 node_whitelist=None) 780 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 781 782 ial.Run(self.op.iallocator) 783 784 assert req.RequiredNodes() == \ 785 len(self.cfg.GetInstanceNodes(self.instance.uuid)) 786 787 if not ial.success: 788 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 789 " %s" % (self.op.iallocator, ial.info), 790 errors.ECODE_NORES) 791 792 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result) 793 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 794 self.op.instance_name, self.op.iallocator, 795 utils.CommaJoin(self.op.nodes))
796
797 - def CheckArguments(self):
798 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]): 799 # Normalize and convert deprecated list of disk indices 800 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] 801 802 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks)) 803 if duplicates: 804 raise errors.OpPrereqError("Some disks have been specified more than" 805 " once: %s" % utils.CommaJoin(duplicates), 806 errors.ECODE_INVAL) 807 808 # We don't want _CheckIAllocatorOrNode selecting the default iallocator 809 # when neither iallocator nor nodes are specified 810 if self.op.iallocator or self.op.nodes: 811 CheckIAllocatorOrNode(self, "iallocator", "nodes") 812 813 for (idx, params) in self.op.disks: 814 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) 815 unsupported = frozenset(params.keys()) - self._MODIFYABLE 816 if unsupported: 817 raise errors.OpPrereqError("Parameters for disk %s try to change" 818 " unmodifyable parameter(s): %s" % 819 (idx, utils.CommaJoin(unsupported)), 820 errors.ECODE_INVAL)
821
822 - def ExpandNames(self):
823 self._ExpandAndLockInstance() 824 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 825 826 if self.op.nodes: 827 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes) 828 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids) 829 else: 830 self.needed_locks[locking.LEVEL_NODE] = [] 831 if self.op.iallocator: 832 # iallocator will select a new node in the same group 833 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 834 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 835 836 self.needed_locks[locking.LEVEL_NODE_RES] = [] 837 838 self.dont_collate_locks[locking.LEVEL_NODEGROUP] = True 839 self.dont_collate_locks[locking.LEVEL_NODE] = True 840 self.dont_collate_locks[locking.LEVEL_NODE_RES] = True
841
842 - def DeclareLocks(self, level):
843 if level == locking.LEVEL_NODEGROUP: 844 assert self.op.iallocator is not None 845 assert not self.op.nodes 846 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 847 self.share_locks[locking.LEVEL_NODEGROUP] = 1 848 # Lock the primary group used by the instance optimistically; this 849 # requires going via the node before it's locked, requiring 850 # verification later on 851 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 852 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True) 853 854 elif level == locking.LEVEL_NODE: 855 # If an allocator is used, then we lock all the nodes in the current 856 # instance group, as we don't know yet which ones will be selected; 857 # if we replace the nodes without using an allocator, locks are 858 # already declared in ExpandNames; otherwise, we need to lock all the 859 # instance nodes for disk re-creation 860 if self.op.iallocator: 861 assert not self.op.nodes 862 assert not self.needed_locks[locking.LEVEL_NODE] 863 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1 864 865 # Lock member nodes of the group of the primary node 866 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP): 867 self.needed_locks[locking.LEVEL_NODE].extend( 868 self.cfg.GetNodeGroup(group_uuid).members) 869 870 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) 871 elif not self.op.nodes: 872 self._LockInstancesNodes(primary_only=False) 873 elif level == locking.LEVEL_NODE_RES: 874 # Copy node locks 875 self.needed_locks[locking.LEVEL_NODE_RES] = \ 876 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
877
878 - def BuildHooksEnv(self):
879 """Build hooks env. 880 881 This runs on master, primary and secondary nodes of the instance. 882 883 """ 884 return BuildInstanceHookEnvByObject(self, self.instance)
885
886 - def BuildHooksNodes(self):
887 """Build hooks nodes. 888 889 """ 890 nl = [self.cfg.GetMasterNode()] + \ 891 list(self.cfg.GetInstanceNodes(self.instance.uuid)) 892 return (nl, nl)
893
894 - def CheckPrereq(self):
895 """Check prerequisites. 896 897 This checks that the instance is in the cluster and is not running. 898 899 """ 900 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 901 assert instance is not None, \ 902 "Cannot retrieve locked instance %s" % self.op.instance_name 903 if self.op.node_uuids: 904 inst_nodes = self.cfg.GetInstanceNodes(instance.uuid) 905 if len(self.op.node_uuids) != len(inst_nodes): 906 raise errors.OpPrereqError("Instance %s currently has %d nodes, but" 907 " %d replacement nodes were specified" % 908 (instance.name, len(inst_nodes), 909 len(self.op.node_uuids)), 910 errors.ECODE_INVAL) 911 assert instance.disk_template != constants.DT_DRBD8 or \ 912 len(self.op.node_uuids) == 2 913 assert instance.disk_template != constants.DT_PLAIN or \ 914 len(self.op.node_uuids) == 1 915 primary_node = self.op.node_uuids[0] 916 else: 917 primary_node = instance.primary_node 918 if not self.op.iallocator: 919 CheckNodeOnline(self, primary_node) 920 921 if instance.disk_template == constants.DT_DISKLESS: 922 raise errors.OpPrereqError("Instance '%s' has no disks" % 923 self.op.instance_name, errors.ECODE_INVAL) 924 925 # Verify if node group locks are still correct 926 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 927 if owned_groups: 928 # Node group locks are acquired only for the primary node (and only 929 # when the allocator is used) 930 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups, 931 primary_only=True) 932 933 # if we replace nodes *and* the old primary is offline, we don't 934 # check the instance state 935 old_pnode = self.cfg.GetNodeInfo(instance.primary_node) 936 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline): 937 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, 938 msg="cannot recreate disks") 939 940 if self.op.disks: 941 self.disks = dict(self.op.disks) 942 else: 943 self.disks = dict((idx, {}) for idx in range(len(instance.disks))) 944 945 maxidx = max(self.disks.keys()) 946 if maxidx >= len(instance.disks): 947 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, 948 errors.ECODE_INVAL) 949 950 if ((self.op.node_uuids or self.op.iallocator) and 951 sorted(self.disks.keys()) != range(len(instance.disks))): 952 raise errors.OpPrereqError("Can't recreate disks partially and" 953 " change the nodes at the same time", 954 errors.ECODE_INVAL) 955 956 self.instance = instance 957 958 if self.op.iallocator: 959 self._RunAllocator() 960 # Release unneeded node and node resource locks 961 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids) 962 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids) 963 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) 964 965 if self.op.node_uuids: 966 node_uuids = self.op.node_uuids 967 else: 968 node_uuids = self.cfg.GetInstanceNodes(instance.uuid) 969 excl_stor = compat.any( 970 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values() 971 ) 972 for new_params in self.disks.values(): 973 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
974
975 - def Exec(self, feedback_fn):
976 """Recreate the disks. 977 978 """ 979 assert (self.owned_locks(locking.LEVEL_NODE) == 980 self.owned_locks(locking.LEVEL_NODE_RES)) 981 982 to_skip = [] 983 mods = [] # keeps track of needed changes 984 985 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 986 for idx, disk in enumerate(inst_disks): 987 try: 988 changes = self.disks[idx] 989 except KeyError: 990 # Disk should not be recreated 991 to_skip.append(idx) 992 continue 993 994 # update secondaries for disks, if needed 995 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8: 996 # need to update the nodes and minors 997 assert len(self.op.node_uuids) == 2 998 assert len(disk.logical_id) == 6 # otherwise disk internals 999 # have changed 1000 (_, _, old_port, _, _, old_secret) = disk.logical_id 1001 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids, 1002 self.instance.uuid) 1003 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port, 1004 new_minors[0], new_minors[1], old_secret) 1005 assert len(disk.logical_id) == len(new_id) 1006 else: 1007 new_id = None 1008 1009 mods.append((idx, new_id, changes)) 1010 1011 # now that we have passed all asserts above, we can apply the mods 1012 # in a single run (to avoid partial changes) 1013 for idx, new_id, changes in mods: 1014 disk = inst_disks[idx] 1015 if new_id is not None: 1016 assert disk.dev_type == constants.DT_DRBD8 1017 disk.logical_id = new_id 1018 if changes: 1019 disk.Update(size=changes.get(constants.IDISK_SIZE, None), 1020 mode=changes.get(constants.IDISK_MODE, None), 1021 spindles=changes.get(constants.IDISK_SPINDLES, None)) 1022 self.cfg.Update(disk, feedback_fn) 1023 1024 # change primary node, if needed 1025 if self.op.node_uuids: 1026 self.instance.primary_node = self.op.node_uuids[0] 1027 self.LogWarning("Changing the instance's nodes, you will have to" 1028 " remove any disks left on the older nodes manually") 1029 1030 if self.op.node_uuids: 1031 self.cfg.Update(self.instance, feedback_fn) 1032 1033 # All touched nodes must be locked 1034 mylocks = self.owned_locks(locking.LEVEL_NODE) 1035 inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid) 1036 assert mylocks.issuperset(frozenset(inst_nodes)) 1037 new_disks = CreateDisks(self, self.instance, to_skip=to_skip) 1038 1039 # TODO: Release node locks before wiping, or explain why it's not possible 1040 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 1041 if self.cfg.GetClusterInfo().prealloc_wipe_disks: 1042 wipedisks = [(idx, disk, 0) 1043 for (idx, disk) in enumerate(inst_disks) 1044 if idx not in to_skip] 1045 WipeOrCleanupDisks(self, self.instance, disks=wipedisks, 1046 cleanup=new_disks)
1047
1048 1049 -def _PerformNodeInfoCall(lu, node_uuids, vg):
1050 """Prepares the input and performs a node info call. 1051 1052 @type lu: C{LogicalUnit} 1053 @param lu: a logical unit from which we get configuration data 1054 @type node_uuids: list of string 1055 @param node_uuids: list of node UUIDs to perform the call for 1056 @type vg: string 1057 @param vg: the volume group's name 1058 1059 """ 1060 lvm_storage_units = [(constants.ST_LVM_VG, vg)] 1061 storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units, 1062 node_uuids) 1063 hvname = lu.cfg.GetHypervisorType() 1064 hvparams = lu.cfg.GetClusterInfo().hvparams 1065 nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units, 1066 [(hvname, hvparams[hvname])]) 1067 return nodeinfo
1068
1069 1070 -def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
1071 """Checks the vg capacity for a given node. 1072 1073 @type node_info: tuple (_, list of dicts, _) 1074 @param node_info: the result of the node info call for one node 1075 @type node_name: string 1076 @param node_name: the name of the node 1077 @type vg: string 1078 @param vg: volume group name 1079 @type requested: int 1080 @param requested: the amount of disk in MiB to check for 1081 @raise errors.OpPrereqError: if the node doesn't have enough disk, 1082 or we cannot check the node 1083 1084 """ 1085 (_, space_info, _) = node_info 1086 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType( 1087 space_info, constants.ST_LVM_VG) 1088 if not lvm_vg_info: 1089 raise errors.OpPrereqError("Can't retrieve storage information for LVM") 1090 vg_free = lvm_vg_info.get("storage_free", None) 1091 if not isinstance(vg_free, int): 1092 raise errors.OpPrereqError("Can't compute free disk space on node" 1093 " %s for vg %s, result was '%s'" % 1094 (node_name, vg, vg_free), errors.ECODE_ENVIRON) 1095 if requested > vg_free: 1096 raise errors.OpPrereqError("Not enough disk space on target node %s" 1097 " vg %s: required %d MiB, available %d MiB" % 1098 (node_name, vg, requested, vg_free), 1099 errors.ECODE_NORES)
1100
1101 1102 -def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
1103 """Checks if nodes have enough free disk space in the specified VG. 1104 1105 This function checks if all given nodes have the needed amount of 1106 free disk. In case any node has less disk or we cannot get the 1107 information from the node, this function raises an OpPrereqError 1108 exception. 1109 1110 @type lu: C{LogicalUnit} 1111 @param lu: a logical unit from which we get configuration data 1112 @type node_uuids: C{list} 1113 @param node_uuids: the list of node UUIDs to check 1114 @type vg: C{str} 1115 @param vg: the volume group to check 1116 @type requested: C{int} 1117 @param requested: the amount of disk in MiB to check for 1118 @raise errors.OpPrereqError: if the node doesn't have enough disk, 1119 or we cannot check the node 1120 1121 """ 1122 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg) 1123 for node_uuid in node_uuids: 1124 node_name = lu.cfg.GetNodeName(node_uuid) 1125 info = nodeinfo[node_uuid] 1126 info.Raise("Cannot get current information from node %s" % node_name, 1127 prereq=True, ecode=errors.ECODE_ENVIRON) 1128 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
1129
1130 1131 -def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
1132 """Checks if nodes have enough free disk space in all the VGs. 1133 1134 This function checks if all given nodes have the needed amount of 1135 free disk. In case any node has less disk or we cannot get the 1136 information from the node, this function raises an OpPrereqError 1137 exception. 1138 1139 @type lu: C{LogicalUnit} 1140 @param lu: a logical unit from which we get configuration data 1141 @type node_uuids: C{list} 1142 @param node_uuids: the list of node UUIDs to check 1143 @type req_sizes: C{dict} 1144 @param req_sizes: the hash of vg and corresponding amount of disk in 1145 MiB to check for 1146 @raise errors.OpPrereqError: if the node doesn't have enough disk, 1147 or we cannot check the node 1148 1149 """ 1150 for vg, req_size in req_sizes.items(): 1151 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
1152
1153 1154 -def _DiskSizeInBytesToMebibytes(lu, size):
1155 """Converts a disk size in bytes to mebibytes. 1156 1157 Warns and rounds up if the size isn't an even multiple of 1 MiB. 1158 1159 """ 1160 (mib, remainder) = divmod(size, 1024 * 1024) 1161 1162 if remainder != 0: 1163 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up" 1164 " to not overwrite existing data (%s bytes will not be" 1165 " wiped)", (1024 * 1024) - remainder) 1166 mib += 1 1167 1168 return mib
1169
1170 1171 -def _CalcEta(time_taken, written, total_size):
1172 """Calculates the ETA based on size written and total size. 1173 1174 @param time_taken: The time taken so far 1175 @param written: amount written so far 1176 @param total_size: The total size of data to be written 1177 @return: The remaining time in seconds 1178 1179 """ 1180 avg_time = time_taken / float(written) 1181 return (total_size - written) * avg_time
1182
1183 1184 -def WipeDisks(lu, instance, disks=None):
1185 """Wipes instance disks. 1186 1187 @type lu: L{LogicalUnit} 1188 @param lu: the logical unit on whose behalf we execute 1189 @type instance: L{objects.Instance} 1190 @param instance: the instance whose disks we should create 1191 @type disks: None or list of tuple of (number, L{objects.Disk}, number) 1192 @param disks: Disk details; tuple contains disk index, disk object and the 1193 start offset 1194 1195 """ 1196 node_uuid = instance.primary_node 1197 node_name = lu.cfg.GetNodeName(node_uuid) 1198 1199 if disks is None: 1200 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1201 disks = [(idx, disk, 0) 1202 for (idx, disk) in enumerate(inst_disks)] 1203 1204 logging.info("Pausing synchronization of disks of instance '%s'", 1205 instance.name) 1206 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1207 (map(compat.snd, disks), 1208 instance), 1209 True) 1210 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name) 1211 1212 for idx, success in enumerate(result.payload): 1213 if not success: 1214 logging.warn("Pausing synchronization of disk %s of instance '%s'" 1215 " failed", idx, instance.name) 1216 1217 try: 1218 for (idx, device, offset) in disks: 1219 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but 1220 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors. 1221 wipe_chunk_size = \ 1222 int(min(constants.MAX_WIPE_CHUNK, 1223 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT)) 1224 1225 size = device.size 1226 last_output = 0 1227 start_time = time.time() 1228 1229 if offset == 0: 1230 info_text = "" 1231 else: 1232 info_text = (" (from %s to %s)" % 1233 (utils.FormatUnit(offset, "h"), 1234 utils.FormatUnit(size, "h"))) 1235 1236 lu.LogInfo("* Wiping disk %s%s", idx, info_text) 1237 1238 logging.info("Wiping disk %d for instance %s on node %s using" 1239 " chunk size %s", idx, instance.name, node_name, 1240 wipe_chunk_size) 1241 1242 while offset < size: 1243 wipe_size = min(wipe_chunk_size, size - offset) 1244 1245 logging.debug("Wiping disk %d, offset %s, chunk %s", 1246 idx, offset, wipe_size) 1247 1248 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance), 1249 offset, wipe_size) 1250 result.Raise("Could not wipe disk %d at offset %d for size %d" % 1251 (idx, offset, wipe_size)) 1252 1253 now = time.time() 1254 offset += wipe_size 1255 if now - last_output >= 60: 1256 eta = _CalcEta(now - start_time, offset, size) 1257 lu.LogInfo(" - done: %.1f%% ETA: %s", 1258 offset / float(size) * 100, utils.FormatSeconds(eta)) 1259 last_output = now 1260 finally: 1261 logging.info("Resuming synchronization of disks for instance '%s'", 1262 instance.name) 1263 1264 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1265 (map(compat.snd, disks), 1266 instance), 1267 False) 1268 1269 if result.fail_msg: 1270 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s", 1271 node_name, result.fail_msg) 1272 else: 1273 for idx, success in enumerate(result.payload): 1274 if not success: 1275 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'" 1276 " failed", idx, instance.name)
1277
1278 1279 -def ImageDisks(lu, instance, image, disks=None):
1280 """Dumps an image onto an instance disk. 1281 1282 @type lu: L{LogicalUnit} 1283 @param lu: the logical unit on whose behalf we execute 1284 @type instance: L{objects.Instance} 1285 @param instance: the instance whose disks we should create 1286 @type image: string 1287 @param image: the image whose disks we should create 1288 @type disks: None or list of ints 1289 @param disks: disk indices 1290 1291 """ 1292 node_uuid = instance.primary_node 1293 node_name = lu.cfg.GetNodeName(node_uuid) 1294 1295 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1296 if disks is None: 1297 disks = [(0, inst_disks[0])] 1298 else: 1299 disks = map(lambda idx: (idx, inst_disks[idx]), disks) 1300 1301 logging.info("Pausing synchronization of disks of instance '%s'", 1302 instance.name) 1303 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1304 (map(compat.snd, disks), 1305 instance), 1306 True) 1307 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name) 1308 1309 for idx, success in enumerate(result.payload): 1310 if not success: 1311 logging.warn("Pausing synchronization of disk %s of instance '%s'" 1312 " failed", idx, instance.name) 1313 1314 try: 1315 for (idx, device) in disks: 1316 lu.LogInfo("Imaging disk '%d' for instance '%s' on node '%s'", 1317 idx, instance.name, node_name) 1318 1319 result = lu.rpc.call_blockdev_image(node_uuid, (device, instance), 1320 image, device.size) 1321 result.Raise("Could not image disk '%d' for instance '%s' on node '%s'" % 1322 (idx, instance.name, node_name)) 1323 finally: 1324 logging.info("Resuming synchronization of disks for instance '%s'", 1325 instance.name) 1326 1327 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1328 (map(compat.snd, disks), 1329 instance), 1330 False) 1331 1332 if result.fail_msg: 1333 lu.LogWarning("Failed to resume disk synchronization for instance '%s' on" 1334 " node '%s'", node_name, result.fail_msg) 1335 else: 1336 for idx, success in enumerate(result.payload): 1337 if not success: 1338 lu.LogWarning("Failed to resume synchronization of disk '%d' of" 1339 " instance '%s'", idx, instance.name)
1340
1341 1342 -def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1343 """Wrapper for L{WipeDisks} that handles errors. 1344 1345 @type lu: L{LogicalUnit} 1346 @param lu: the logical unit on whose behalf we execute 1347 @type instance: L{objects.Instance} 1348 @param instance: the instance whose disks we should wipe 1349 @param disks: see L{WipeDisks} 1350 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in 1351 case of error 1352 @raise errors.OpPrereqError: in case of failure 1353 1354 """ 1355 try: 1356 WipeDisks(lu, instance, disks=disks) 1357 except errors.OpExecError: 1358 logging.warning("Wiping disks for instance '%s' failed", 1359 instance.name) 1360 _UndoCreateDisks(lu, cleanup, instance) 1361 raise
1362
1363 1364 -def ExpandCheckDisks(instance_disks, disks):
1365 """Return the instance disks selected by the disks list 1366 1367 @type disks: list of L{objects.Disk} or None 1368 @param disks: selected disks 1369 @rtype: list of L{objects.Disk} 1370 @return: selected instance disks to act on 1371 1372 """ 1373 if disks is None: 1374 return instance_disks 1375 else: 1376 inst_disks_uuids = [d.uuid for d in instance_disks] 1377 disks_uuids = [d.uuid for d in disks] 1378 if not set(disks_uuids).issubset(inst_disks_uuids): 1379 raise errors.ProgrammerError("Can only act on disks belonging to the" 1380 " target instance: expected a subset of %s," 1381 " got %s" % (inst_disks_uuids, disks_uuids)) 1382 return disks
1383
1384 1385 -def WaitForSync(lu, instance, disks=None, oneshot=False):
1386 """Sleep and poll for an instance's disk to sync. 1387 1388 """ 1389 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1390 if not inst_disks or disks is not None and not disks: 1391 return True 1392 1393 disks = ExpandCheckDisks(inst_disks, disks) 1394 1395 if not oneshot: 1396 lu.LogInfo("Waiting for instance %s to sync disks", instance.name) 1397 1398 node_uuid = instance.primary_node 1399 node_name = lu.cfg.GetNodeName(node_uuid) 1400 1401 # TODO: Convert to utils.Retry 1402 1403 retries = 0 1404 degr_retries = 10 # in seconds, as we sleep 1 second each time 1405 while True: 1406 max_time = 0 1407 done = True 1408 cumul_degraded = False 1409 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance)) 1410 msg = rstats.fail_msg 1411 if msg: 1412 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg) 1413 retries += 1 1414 if retries >= 10: 1415 raise errors.RemoteError("Can't contact node %s for mirror data," 1416 " aborting." % node_name) 1417 time.sleep(6) 1418 continue 1419 rstats = rstats.payload 1420 retries = 0 1421 for i, mstat in enumerate(rstats): 1422 if mstat is None: 1423 lu.LogWarning("Can't compute data for node %s/%s", 1424 node_name, disks[i].iv_name) 1425 continue 1426 1427 cumul_degraded = (cumul_degraded or 1428 (mstat.is_degraded and mstat.sync_percent is None)) 1429 if mstat.sync_percent is not None: 1430 done = False 1431 if mstat.estimated_time is not None: 1432 rem_time = ("%s remaining (estimated)" % 1433 utils.FormatSeconds(mstat.estimated_time)) 1434 max_time = mstat.estimated_time 1435 else: 1436 rem_time = "no time estimate" 1437 max_time = 5 # sleep at least a bit between retries 1438 lu.LogInfo("- device %s: %5.2f%% done, %s", 1439 disks[i].iv_name, mstat.sync_percent, rem_time) 1440 1441 # if we're done but degraded, let's do a few small retries, to 1442 # make sure we see a stable and not transient situation; therefore 1443 # we force restart of the loop 1444 if (done or oneshot) and cumul_degraded and degr_retries > 0: 1445 logging.info("Degraded disks found, %d retries left", degr_retries) 1446 degr_retries -= 1 1447 time.sleep(1) 1448 continue 1449 1450 if done or oneshot: 1451 break 1452 1453 time.sleep(min(60, max_time)) 1454 1455 if done: 1456 lu.LogInfo("Instance %s's disks are in sync", instance.name) 1457 1458 return not cumul_degraded
1459
1460 1461 -def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1462 """Shutdown block devices of an instance. 1463 1464 This does the shutdown on all nodes of the instance. 1465 1466 If the ignore_primary is false, errors on the primary node are 1467 ignored. 1468 1469 Modifies the configuration of the instance, so the caller should re-read the 1470 instance configuration, if needed. 1471 1472 """ 1473 all_result = True 1474 1475 if disks is None: 1476 # only mark instance disks as inactive if all disks are affected 1477 lu.cfg.MarkInstanceDisksInactive(instance.uuid) 1478 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1479 disks = ExpandCheckDisks(inst_disks, disks) 1480 1481 for disk in disks: 1482 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node): 1483 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance)) 1484 msg = result.fail_msg 1485 if msg: 1486 lu.LogWarning("Could not shutdown block device %s on node %s: %s", 1487 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1488 if ((node_uuid == instance.primary_node and not ignore_primary) or 1489 (node_uuid != instance.primary_node and not result.offline)): 1490 all_result = False 1491 return all_result
1492
1493 1494 -def _SafeShutdownInstanceDisks(lu, instance, disks=None, req_states=None):
1495 """Shutdown block devices of an instance. 1496 1497 This function checks if an instance is running, before calling 1498 _ShutdownInstanceDisks. 1499 1500 """ 1501 if req_states is None: 1502 req_states = INSTANCE_DOWN 1503 CheckInstanceState(lu, instance, req_states, msg="cannot shutdown disks") 1504 ShutdownInstanceDisks(lu, instance, disks=disks)
1505
1506 1507 -def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, 1508 ignore_size=False):
1509 """Prepare the block devices for an instance. 1510 1511 This sets up the block devices on all nodes. 1512 1513 Modifies the configuration of the instance, so the caller should re-read the 1514 instance configuration, if needed. 1515 1516 @type lu: L{LogicalUnit} 1517 @param lu: the logical unit on whose behalf we execute 1518 @type instance: L{objects.Instance} 1519 @param instance: the instance for whose disks we assemble 1520 @type disks: list of L{objects.Disk} or None 1521 @param disks: which disks to assemble (or all, if None) 1522 @type ignore_secondaries: boolean 1523 @param ignore_secondaries: if true, errors on secondary nodes 1524 won't result in an error return from the function 1525 @type ignore_size: boolean 1526 @param ignore_size: if true, the current known size of the disk 1527 will not be used during the disk activation, useful for cases 1528 when the size is wrong 1529 @return: False if the operation failed, otherwise a list of 1530 (host, instance_visible_name, node_visible_name) 1531 with the mapping from node devices to instance devices 1532 1533 """ 1534 device_info = [] 1535 disks_ok = True 1536 1537 if disks is None: 1538 # only mark instance disks as active if all disks are affected 1539 instance = lu.cfg.MarkInstanceDisksActive(instance.uuid) 1540 1541 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1542 disks = ExpandCheckDisks(inst_disks, disks) 1543 1544 # With the two passes mechanism we try to reduce the window of 1545 # opportunity for the race condition of switching DRBD to primary 1546 # before handshaking occured, but we do not eliminate it 1547 1548 # The proper fix would be to wait (with some limits) until the 1549 # connection has been made and drbd transitions from WFConnection 1550 # into any other network-connected state (Connected, SyncTarget, 1551 # SyncSource, etc.) 1552 1553 # 1st pass, assemble on all nodes in secondary mode 1554 for idx, inst_disk in enumerate(disks): 1555 for node_uuid, node_disk in inst_disk.ComputeNodeTree( 1556 instance.primary_node): 1557 if ignore_size: 1558 node_disk = node_disk.Copy() 1559 node_disk.UnsetSize() 1560 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance), 1561 instance, False, idx) 1562 msg = result.fail_msg 1563 if msg: 1564 secondary_nodes = lu.cfg.GetInstanceSecondaryNodes(instance.uuid) 1565 is_offline_secondary = (node_uuid in secondary_nodes and 1566 result.offline) 1567 lu.LogWarning("Could not prepare block device %s on node %s" 1568 " (is_primary=False, pass=1): %s", 1569 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1570 if not (ignore_secondaries or is_offline_secondary): 1571 disks_ok = False 1572 1573 # FIXME: race condition on drbd migration to primary 1574 1575 # 2nd pass, do only the primary node 1576 for idx, inst_disk in enumerate(disks): 1577 dev_path = None 1578 1579 for node_uuid, node_disk in inst_disk.ComputeNodeTree( 1580 instance.primary_node): 1581 if node_uuid != instance.primary_node: 1582 continue 1583 if ignore_size: 1584 node_disk = node_disk.Copy() 1585 node_disk.UnsetSize() 1586 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance), 1587 instance, True, idx) 1588 msg = result.fail_msg 1589 if msg: 1590 lu.LogWarning("Could not prepare block device %s on node %s" 1591 " (is_primary=True, pass=2): %s", 1592 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1593 disks_ok = False 1594 else: 1595 dev_path, _, __ = result.payload 1596 1597 device_info.append((lu.cfg.GetNodeName(instance.primary_node), 1598 inst_disk.iv_name, dev_path)) 1599 1600 if not disks_ok: 1601 lu.cfg.MarkInstanceDisksInactive(instance.uuid) 1602 1603 return disks_ok, device_info
1604
1605 1606 -def StartInstanceDisks(lu, instance, force):
1607 """Start the disks of an instance. 1608 1609 Modifies the configuration of the instance, so the caller should re-read the 1610 instance configuration, if needed. 1611 1612 """ 1613 disks_ok, _ = AssembleInstanceDisks(lu, instance, 1614 ignore_secondaries=force) 1615 if not disks_ok: 1616 ShutdownInstanceDisks(lu, instance) 1617 if force is not None and not force: 1618 lu.LogWarning("", 1619 hint=("If the message above refers to a secondary node," 1620 " you can retry the operation using '--force'")) 1621 raise errors.OpExecError("Disk consistency error")
1622
1623 1624 -class LUInstanceGrowDisk(LogicalUnit):
1625 """Grow a disk of an instance. 1626 1627 """ 1628 HPATH = "disk-grow" 1629 HTYPE = constants.HTYPE_INSTANCE 1630 REQ_BGL = False 1631
1632 - def ExpandNames(self):
1633 self._ExpandAndLockInstance() 1634 self.needed_locks[locking.LEVEL_NODE] = [] 1635 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1636 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1637 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE 1638 self.dont_collate_locks[locking.LEVEL_NODE] = True 1639 self.dont_collate_locks[locking.LEVEL_NODE_RES] = True
1640
1641 - def DeclareLocks(self, level):
1642 if level == locking.LEVEL_NODE: 1643 self._LockInstancesNodes() 1644 elif level == locking.LEVEL_NODE_RES: 1645 # Copy node locks 1646 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1647 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1648
1649 - def BuildHooksEnv(self):
1650 """Build hooks env. 1651 1652 This runs on the master, the primary and all the secondaries. 1653 1654 """ 1655 env = { 1656 "DISK": self.op.disk, 1657 "AMOUNT": self.op.amount, 1658 "ABSOLUTE": self.op.absolute, 1659 } 1660 env.update(BuildInstanceHookEnvByObject(self, self.instance)) 1661 return env
1662
1663 - def BuildHooksNodes(self):
1664 """Build hooks nodes. 1665 1666 """ 1667 nl = [self.cfg.GetMasterNode()] + \ 1668 list(self.cfg.GetInstanceNodes(self.instance.uuid)) 1669 return (nl, nl)
1670
1671 - def CheckPrereq(self):
1672 """Check prerequisites. 1673 1674 This checks that the instance is in the cluster. 1675 1676 """ 1677 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1678 assert self.instance is not None, \ 1679 "Cannot retrieve locked instance %s" % self.op.instance_name 1680 node_uuids = list(self.cfg.GetInstanceNodes(self.instance.uuid)) 1681 for node_uuid in node_uuids: 1682 CheckNodeOnline(self, node_uuid) 1683 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids) 1684 1685 if self.instance.disk_template not in constants.DTS_GROWABLE: 1686 raise errors.OpPrereqError("Instance's disk layout does not support" 1687 " growing", errors.ECODE_INVAL) 1688 1689 self.disk = self.cfg.GetDiskInfo(self.instance.FindDisk(self.op.disk)) 1690 1691 if self.op.absolute: 1692 self.target = self.op.amount 1693 self.delta = self.target - self.disk.size 1694 if self.delta < 0: 1695 raise errors.OpPrereqError("Requested size (%s) is smaller than " 1696 "current disk size (%s)" % 1697 (utils.FormatUnit(self.target, "h"), 1698 utils.FormatUnit(self.disk.size, "h")), 1699 errors.ECODE_STATE) 1700 else: 1701 self.delta = self.op.amount 1702 self.target = self.disk.size + self.delta 1703 if self.delta < 0: 1704 raise errors.OpPrereqError("Requested increment (%s) is negative" % 1705 utils.FormatUnit(self.delta, "h"), 1706 errors.ECODE_INVAL) 1707 1708 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta)) 1709 1710 self._CheckIPolicy(self.target)
1711
1712 - def _CheckDiskSpace(self, node_uuids, req_vgspace):
1713 template = self.instance.disk_template 1714 if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and 1715 not any(self.node_es_flags.values())): 1716 # TODO: check the free disk space for file, when that feature will be 1717 # supported 1718 # With exclusive storage we need to do something smarter than just looking 1719 # at free space, which, in the end, is basically a dry run. So we rely on 1720 # the dry run performed in Exec() instead. 1721 CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1722
1723 - def _CheckIPolicy(self, target_size):
1724 cluster = self.cfg.GetClusterInfo() 1725 group_uuid = list(self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, 1726 primary_only=True))[0] 1727 group_info = self.cfg.GetNodeGroup(group_uuid) 1728 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 1729 group_info) 1730 1731 disk_sizes = [disk.size if disk.uuid != self.disk.uuid else target_size 1732 for disk in self.cfg.GetInstanceDisks(self.op.instance_uuid)] 1733 1734 # The ipolicy checker below ignores None, so we only give it the disk size 1735 res = ComputeIPolicyDiskSizesViolation(ipolicy, disk_sizes, 1736 self.instance.disk_template) 1737 if res: 1738 msg = ("Growing disk %s violates policy: %s" % 1739 (self.op.disk, 1740 utils.CommaJoin(res))) 1741 if self.op.ignore_ipolicy: 1742 self.LogWarning(msg) 1743 else: 1744 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1745
1746 - def Exec(self, feedback_fn):
1747 """Execute disk grow. 1748 1749 """ 1750 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) 1751 assert (self.owned_locks(locking.LEVEL_NODE) == 1752 self.owned_locks(locking.LEVEL_NODE_RES)) 1753 1754 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks 1755 1756 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk]) 1757 if not disks_ok: 1758 raise errors.OpExecError("Cannot activate block device to grow") 1759 1760 feedback_fn("Growing disk %s of instance '%s' by %s to %s" % 1761 (self.op.disk, self.instance.name, 1762 utils.FormatUnit(self.delta, "h"), 1763 utils.FormatUnit(self.target, "h"))) 1764 1765 # First run all grow ops in dry-run mode 1766 inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid) 1767 for node_uuid in inst_nodes: 1768 result = self.rpc.call_blockdev_grow(node_uuid, 1769 (self.disk, self.instance), 1770 self.delta, True, True, 1771 self.node_es_flags[node_uuid]) 1772 result.Raise("Dry-run grow request failed to node %s" % 1773 self.cfg.GetNodeName(node_uuid)) 1774 1775 if wipe_disks: 1776 # Get disk size from primary node for wiping 1777 result = self.rpc.call_blockdev_getdimensions( 1778 self.instance.primary_node, [([self.disk], self.instance)]) 1779 result.Raise("Failed to retrieve disk size from node '%s'" % 1780 self.instance.primary_node) 1781 1782 (disk_dimensions, ) = result.payload 1783 1784 if disk_dimensions is None: 1785 raise errors.OpExecError("Failed to retrieve disk size from primary" 1786 " node '%s'" % self.instance.primary_node) 1787 (disk_size_in_bytes, _) = disk_dimensions 1788 1789 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes) 1790 1791 assert old_disk_size >= self.disk.size, \ 1792 ("Retrieved disk size too small (got %s, should be at least %s)" % 1793 (old_disk_size, self.disk.size)) 1794 else: 1795 old_disk_size = None 1796 1797 # We know that (as far as we can test) operations across different 1798 # nodes will succeed, time to run it for real on the backing storage 1799 for node_uuid in inst_nodes: 1800 result = self.rpc.call_blockdev_grow(node_uuid, 1801 (self.disk, self.instance), 1802 self.delta, False, True, 1803 self.node_es_flags[node_uuid]) 1804 result.Raise("Grow request failed to node %s" % 1805 self.cfg.GetNodeName(node_uuid)) 1806 1807 # And now execute it for logical storage, on the primary node 1808 node_uuid = self.instance.primary_node 1809 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance), 1810 self.delta, False, False, 1811 self.node_es_flags[node_uuid]) 1812 result.Raise("Grow request failed to node %s" % 1813 self.cfg.GetNodeName(node_uuid)) 1814 1815 self.disk.RecordGrow(self.delta) 1816 self.cfg.Update(self.instance, feedback_fn) 1817 self.cfg.Update(self.disk, feedback_fn) 1818 1819 # Changes have been recorded, release node lock 1820 ReleaseLocks(self, locking.LEVEL_NODE) 1821 1822 # Downgrade lock while waiting for sync 1823 self.WConfdClient().DownGradeLocksLevel( 1824 locking.LEVEL_NAMES[locking.LEVEL_INSTANCE]) 1825 1826 assert wipe_disks ^ (old_disk_size is None) 1827 1828 if wipe_disks: 1829 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 1830 assert inst_disks[self.op.disk] == self.disk 1831 1832 # Wipe newly added disk space 1833 WipeDisks(self, self.instance, 1834 disks=[(self.op.disk, self.disk, old_disk_size)]) 1835 1836 if self.op.wait_for_sync: 1837 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk]) 1838 if disk_abort: 1839 self.LogWarning("Disk syncing has not returned a good status; check" 1840 " the instance") 1841 if not self.instance.disks_active: 1842 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk]) 1843 elif not self.instance.disks_active: 1844 self.LogWarning("Not shutting down the disk even if the instance is" 1845 " not supposed to be running because no wait for" 1846 " sync mode was requested") 1847 1848 assert self.owned_locks(locking.LEVEL_NODE_RES) 1849 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1850
1851 1852 -class LUInstanceReplaceDisks(LogicalUnit):
1853 """Replace the disks of an instance. 1854 1855 """ 1856 HPATH = "mirrors-replace" 1857 HTYPE = constants.HTYPE_INSTANCE 1858 REQ_BGL = False 1859
1860 - def CheckArguments(self):
1861 """Check arguments. 1862 1863 """ 1864 if self.op.mode == constants.REPLACE_DISK_CHG: 1865 if self.op.remote_node is None and self.op.iallocator is None: 1866 raise errors.OpPrereqError("When changing the secondary either an" 1867 " iallocator script must be used or the" 1868 " new node given", errors.ECODE_INVAL) 1869 else: 1870 CheckIAllocatorOrNode(self, "iallocator", "remote_node") 1871 1872 elif self.op.remote_node is not None or self.op.iallocator is not None: 1873 # Not replacing the secondary 1874 raise errors.OpPrereqError("The iallocator and new node options can" 1875 " only be used when changing the" 1876 " secondary node", errors.ECODE_INVAL)
1877
1878 - def ExpandNames(self):
1879 self._ExpandAndLockInstance() 1880 1881 assert locking.LEVEL_NODE not in self.needed_locks 1882 assert locking.LEVEL_NODE_RES not in self.needed_locks 1883 assert locking.LEVEL_NODEGROUP not in self.needed_locks 1884 1885 assert self.op.iallocator is None or self.op.remote_node is None, \ 1886 "Conflicting options" 1887 1888 if self.op.remote_node is not None: 1889 (self.op.remote_node_uuid, self.op.remote_node) = \ 1890 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 1891 self.op.remote_node) 1892 1893 # Warning: do not remove the locking of the new secondary here 1894 # unless DRBD8Dev.AddChildren is changed to work in parallel; 1895 # currently it doesn't since parallel invocations of 1896 # FindUnusedMinor will conflict 1897 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid] 1898 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 1899 else: 1900 self.needed_locks[locking.LEVEL_NODE] = [] 1901 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1902 1903 if self.op.iallocator is not None: 1904 # iallocator will select a new node in the same group 1905 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 1906 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 1907 1908 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1909 1910 self.dont_collate_locks[locking.LEVEL_NODEGROUP] = True 1911 self.dont_collate_locks[locking.LEVEL_NODE] = True 1912 self.dont_collate_locks[locking.LEVEL_NODE_RES] = True 1913 1914 self.replacer = TLReplaceDisks(self, self.op.instance_uuid, 1915 self.op.instance_name, self.op.mode, 1916 self.op.iallocator, self.op.remote_node_uuid, 1917 self.op.disks, self.op.early_release, 1918 self.op.ignore_ipolicy) 1919 1920 self.tasklets = [self.replacer]
1921
1922 - def DeclareLocks(self, level):
1923 if level == locking.LEVEL_NODEGROUP: 1924 assert self.op.remote_node_uuid is None 1925 assert self.op.iallocator is not None 1926 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 1927 1928 self.share_locks[locking.LEVEL_NODEGROUP] = 1 1929 # Lock all groups used by instance optimistically; this requires going 1930 # via the node before it's locked, requiring verification later on 1931 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1932 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid) 1933 1934 elif level == locking.LEVEL_NODE: 1935 if self.op.iallocator is not None: 1936 assert self.op.remote_node_uuid is None 1937 assert not self.needed_locks[locking.LEVEL_NODE] 1938 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) 1939 1940 # Lock member nodes of all locked groups 1941 self.needed_locks[locking.LEVEL_NODE] = \ 1942 [node_uuid 1943 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 1944 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members] 1945 else: 1946 self._LockInstancesNodes() 1947 1948 elif level == locking.LEVEL_NODE_RES: 1949 # Reuse node locks 1950 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1951 self.needed_locks[locking.LEVEL_NODE]
1952
1953 - def BuildHooksEnv(self):
1954 """Build hooks env. 1955 1956 This runs on the master, the primary and all the secondaries. 1957 1958 """ 1959 instance = self.replacer.instance 1960 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 1961 env = { 1962 "MODE": self.op.mode, 1963 "NEW_SECONDARY": self.op.remote_node, 1964 "OLD_SECONDARY": self.cfg.GetNodeName(secondary_nodes[0]), 1965 } 1966 env.update(BuildInstanceHookEnvByObject(self, instance)) 1967 return env
1968
1969 - def BuildHooksNodes(self):
1970 """Build hooks nodes. 1971 1972 """ 1973 instance = self.replacer.instance 1974 nl = [ 1975 self.cfg.GetMasterNode(), 1976 instance.primary_node, 1977 ] 1978 if self.op.remote_node_uuid is not None: 1979 nl.append(self.op.remote_node_uuid) 1980 return nl, nl
1981
1982 - def CheckPrereq(self):
1983 """Check prerequisites. 1984 1985 """ 1986 # Verify if node group locks are still correct 1987 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 1988 if owned_groups: 1989 CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups) 1990 1991 return LogicalUnit.CheckPrereq(self)
1992
1993 1994 -class LUInstanceActivateDisks(NoHooksLU):
1995 """Bring up an instance's disks. 1996 1997 """ 1998 REQ_BGL = False 1999
2000 - def ExpandNames(self):
2001 self._ExpandAndLockInstance() 2002 self.needed_locks[locking.LEVEL_NODE] = [] 2003 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2004
2005 - def DeclareLocks(self, level):
2006 if level == locking.LEVEL_NODE: 2007 self._LockInstancesNodes()
2008
2009 - def CheckPrereq(self):
2010 """Check prerequisites. 2011 2012 This checks that the instance is in the cluster. 2013 2014 """ 2015 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 2016 assert self.instance is not None, \ 2017 "Cannot retrieve locked instance %s" % self.op.instance_name 2018 CheckNodeOnline(self, self.instance.primary_node)
2019
2020 - def Exec(self, feedback_fn):
2021 """Activate the disks. 2022 2023 """ 2024 disks_ok, disks_info = \ 2025 AssembleInstanceDisks(self, self.instance, 2026 ignore_size=self.op.ignore_size) 2027 if not disks_ok: 2028 raise errors.OpExecError("Cannot activate block devices") 2029 2030 if self.op.wait_for_sync: 2031 if not WaitForSync(self, self.instance): 2032 self.cfg.MarkInstanceDisksInactive(self.instance.uuid) 2033 raise errors.OpExecError("Some disks of the instance are degraded!") 2034 2035 return disks_info
2036
2037 2038 -class LUInstanceDeactivateDisks(NoHooksLU):
2039 """Shutdown an instance's disks. 2040 2041 """ 2042 REQ_BGL = False 2043
2044 - def ExpandNames(self):
2045 self._ExpandAndLockInstance() 2046 self.needed_locks[locking.LEVEL_NODE] = [] 2047 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2048
2049 - def DeclareLocks(self, level):
2050 if level == locking.LEVEL_NODE: 2051 self._LockInstancesNodes()
2052
2053 - def CheckPrereq(self):
2054 """Check prerequisites. 2055 2056 This checks that the instance is in the cluster. 2057 2058 """ 2059 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 2060 assert self.instance is not None, \ 2061 "Cannot retrieve locked instance %s" % self.op.instance_name
2062
2063 - def Exec(self, feedback_fn):
2064 """Deactivate the disks 2065 2066 """ 2067 if self.op.force: 2068 ShutdownInstanceDisks(self, self.instance) 2069 else: 2070 _SafeShutdownInstanceDisks(self, self.instance)
2071
2072 2073 -def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary, 2074 ldisk=False):
2075 """Check that mirrors are not degraded. 2076 2077 @attention: The device has to be annotated already. 2078 2079 The ldisk parameter, if True, will change the test from the 2080 is_degraded attribute (which represents overall non-ok status for 2081 the device(s)) to the ldisk (representing the local storage status). 2082 2083 """ 2084 result = True 2085 2086 if on_primary or dev.AssembleOnSecondary(): 2087 rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance)) 2088 msg = rstats.fail_msg 2089 if msg: 2090 lu.LogWarning("Can't find disk on node %s: %s", 2091 lu.cfg.GetNodeName(node_uuid), msg) 2092 result = False 2093 elif not rstats.payload: 2094 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid)) 2095 result = False 2096 else: 2097 if ldisk: 2098 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY 2099 else: 2100 result = result and not rstats.payload.is_degraded 2101 2102 if dev.children: 2103 for child in dev.children: 2104 result = result and _CheckDiskConsistencyInner(lu, instance, child, 2105 node_uuid, on_primary) 2106 2107 return result
2108
2109 2110 -def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
2111 """Wrapper around L{_CheckDiskConsistencyInner}. 2112 2113 """ 2114 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 2115 return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary, 2116 ldisk=ldisk)
2117
2118 2119 -def _BlockdevFind(lu, node_uuid, dev, instance):
2120 """Wrapper around call_blockdev_find to annotate diskparams. 2121 2122 @param lu: A reference to the lu object 2123 @param node_uuid: The node to call out 2124 @param dev: The device to find 2125 @param instance: The instance object the device belongs to 2126 @returns The result of the rpc call 2127 2128 """ 2129 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 2130 return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
2131
2132 2133 -def _GenerateUniqueNames(lu, exts):
2134 """Generate a suitable LV name. 2135 2136 This will generate a logical volume name for the given instance. 2137 2138 """ 2139 results = [] 2140 for val in exts: 2141 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 2142 results.append("%s%s" % (new_id, val)) 2143 return results
2144
2145 2146 -class TLReplaceDisks(Tasklet):
2147 """Replaces disks for an instance. 2148 2149 Note: Locking is not within the scope of this class. 2150 2151 """
2152 - def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name, 2153 remote_node_uuid, disks, early_release, ignore_ipolicy):
2154 """Initializes this class. 2155 2156 """ 2157 Tasklet.__init__(self, lu) 2158 2159 # Parameters 2160 self.instance_uuid = instance_uuid 2161 self.instance_name = instance_name 2162 self.mode = mode 2163 self.iallocator_name = iallocator_name 2164 self.remote_node_uuid = remote_node_uuid 2165 self.disks = disks 2166 self.early_release = early_release 2167 self.ignore_ipolicy = ignore_ipolicy 2168 2169 # Runtime data 2170 self.instance = None 2171 self.new_node_uuid = None 2172 self.target_node_uuid = None 2173 self.other_node_uuid = None 2174 self.remote_node_info = None 2175 self.node_secondary_ip = None
2176 2177 @staticmethod
2178 - def _RunAllocator(lu, iallocator_name, instance_uuid, 2179 relocate_from_node_uuids):
2180 """Compute a new secondary node using an IAllocator. 2181 2182 """ 2183 req = iallocator.IAReqRelocate( 2184 inst_uuid=instance_uuid, 2185 relocate_from_node_uuids=list(relocate_from_node_uuids)) 2186 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req) 2187 2188 ial.Run(iallocator_name) 2189 2190 if not ial.success: 2191 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 2192 " %s" % (iallocator_name, ial.info), 2193 errors.ECODE_NORES) 2194 2195 remote_node_name = ial.result[0] 2196 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name) 2197 2198 if remote_node is None: 2199 raise errors.OpPrereqError("Node %s not found in configuration" % 2200 remote_node_name, errors.ECODE_NOENT) 2201 2202 lu.LogInfo("Selected new secondary for instance '%s': %s", 2203 instance_uuid, remote_node_name) 2204 2205 return remote_node.uuid
2206
2207 - def _FindFaultyDisks(self, node_uuid):
2208 """Wrapper for L{FindFaultyInstanceDisks}. 2209 2210 """ 2211 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, 2212 node_uuid, True)
2213
2214 - def _CheckDisksActivated(self, instance):
2215 """Checks if the instance disks are activated. 2216 2217 @param instance: The instance to check disks 2218 @return: True if they are activated, False otherwise 2219 2220 """ 2221 node_uuids = self.cfg.GetInstanceNodes(instance.uuid) 2222 2223 for idx, dev in enumerate(self.cfg.GetInstanceDisks(instance.uuid)): 2224 for node_uuid in node_uuids: 2225 self.lu.LogInfo("Checking disk/%d on %s", idx, 2226 self.cfg.GetNodeName(node_uuid)) 2227 2228 result = _BlockdevFind(self, node_uuid, dev, instance) 2229 2230 if result.offline: 2231 continue 2232 elif result.fail_msg or not result.payload: 2233 return False 2234 2235 return True
2236
2237 - def CheckPrereq(self):
2238 """Check prerequisites. 2239 2240 This checks that the instance is in the cluster. 2241 2242 """ 2243 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 2244 assert self.instance is not None, \ 2245 "Cannot retrieve locked instance %s" % self.instance_name 2246 2247 if self.instance.disk_template != constants.DT_DRBD8: 2248 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" 2249 " instances", errors.ECODE_INVAL) 2250 2251 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 2252 if len(secondary_nodes) != 1: 2253 raise errors.OpPrereqError("The instance has a strange layout," 2254 " expected one secondary but found %d" % 2255 len(secondary_nodes), 2256 errors.ECODE_FAULT) 2257 2258 secondary_node_uuid = secondary_nodes[0] 2259 2260 if self.iallocator_name is None: 2261 remote_node_uuid = self.remote_node_uuid 2262 else: 2263 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name, 2264 self.instance.uuid, 2265 secondary_nodes) 2266 2267 if remote_node_uuid is None: 2268 self.remote_node_info = None 2269 else: 2270 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \ 2271 "Remote node '%s' is not locked" % remote_node_uuid 2272 2273 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid) 2274 assert self.remote_node_info is not None, \ 2275 "Cannot retrieve locked node %s" % remote_node_uuid 2276 2277 if remote_node_uuid == self.instance.primary_node: 2278 raise errors.OpPrereqError("The specified node is the primary node of" 2279 " the instance", errors.ECODE_INVAL) 2280 2281 if remote_node_uuid == secondary_node_uuid: 2282 raise errors.OpPrereqError("The specified node is already the" 2283 " secondary node of the instance", 2284 errors.ECODE_INVAL) 2285 2286 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, 2287 constants.REPLACE_DISK_CHG): 2288 raise errors.OpPrereqError("Cannot specify disks to be replaced", 2289 errors.ECODE_INVAL) 2290 2291 if self.mode == constants.REPLACE_DISK_AUTO: 2292 if not self._CheckDisksActivated(self.instance): 2293 raise errors.OpPrereqError("Please run activate-disks on instance %s" 2294 " first" % self.instance_name, 2295 errors.ECODE_STATE) 2296 faulty_primary = self._FindFaultyDisks(self.instance.primary_node) 2297 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid) 2298 2299 if faulty_primary and faulty_secondary: 2300 raise errors.OpPrereqError("Instance %s has faulty disks on more than" 2301 " one node and can not be repaired" 2302 " automatically" % self.instance_name, 2303 errors.ECODE_STATE) 2304 2305 if faulty_primary: 2306 self.disks = faulty_primary 2307 self.target_node_uuid = self.instance.primary_node 2308 self.other_node_uuid = secondary_node_uuid 2309 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2310 elif faulty_secondary: 2311 self.disks = faulty_secondary 2312 self.target_node_uuid = secondary_node_uuid 2313 self.other_node_uuid = self.instance.primary_node 2314 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2315 else: 2316 self.disks = [] 2317 check_nodes = [] 2318 2319 else: 2320 # Non-automatic modes 2321 if self.mode == constants.REPLACE_DISK_PRI: 2322 self.target_node_uuid = self.instance.primary_node 2323 self.other_node_uuid = secondary_node_uuid 2324 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2325 2326 elif self.mode == constants.REPLACE_DISK_SEC: 2327 self.target_node_uuid = secondary_node_uuid 2328 self.other_node_uuid = self.instance.primary_node 2329 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2330 2331 elif self.mode == constants.REPLACE_DISK_CHG: 2332 self.new_node_uuid = remote_node_uuid 2333 self.other_node_uuid = self.instance.primary_node 2334 self.target_node_uuid = secondary_node_uuid 2335 check_nodes = [self.new_node_uuid, self.other_node_uuid] 2336 2337 CheckNodeNotDrained(self.lu, remote_node_uuid) 2338 CheckNodeVmCapable(self.lu, remote_node_uuid) 2339 2340 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid) 2341 assert old_node_info is not None 2342 if old_node_info.offline and not self.early_release: 2343 # doesn't make sense to delay the release 2344 self.early_release = True 2345 self.lu.LogInfo("Old secondary %s is offline, automatically enabling" 2346 " early-release mode", secondary_node_uuid) 2347 2348 else: 2349 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % 2350 self.mode) 2351 2352 # If not specified all disks should be replaced 2353 if not self.disks: 2354 self.disks = range(len(self.instance.disks)) 2355 2356 # TODO: This is ugly, but right now we can't distinguish between internal 2357 # submitted opcode and external one. We should fix that. 2358 if self.remote_node_info: 2359 # We change the node, lets verify it still meets instance policy 2360 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) 2361 cluster = self.cfg.GetClusterInfo() 2362 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 2363 new_group_info) 2364 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, 2365 self.remote_node_info, self.cfg, 2366 ignore=self.ignore_ipolicy) 2367 2368 for node_uuid in check_nodes: 2369 CheckNodeOnline(self.lu, node_uuid) 2370 2371 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid, 2372 self.other_node_uuid, 2373 self.target_node_uuid] 2374 if node_uuid is not None) 2375 2376 # Release unneeded node and node resource locks 2377 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) 2378 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) 2379 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) 2380 2381 # Release any owned node group 2382 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) 2383 2384 # Check whether disks are valid 2385 for disk_idx in self.disks: 2386 self.instance.FindDisk(disk_idx) 2387 2388 # Get secondary node IP addresses 2389 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node) 2390 in self.cfg.GetMultiNodeInfo(touched_nodes))
2391
2392 - def Exec(self, feedback_fn):
2393 """Execute disk replacement. 2394 2395 This dispatches the disk replacement to the appropriate handler. 2396 2397 """ 2398 if __debug__: 2399 # Verify owned locks before starting operation 2400 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) 2401 assert set(owned_nodes) == set(self.node_secondary_ip), \ 2402 ("Incorrect node locks, owning %s, expected %s" % 2403 (owned_nodes, self.node_secondary_ip.keys())) 2404 assert (self.lu.owned_locks(locking.LEVEL_NODE) == 2405 self.lu.owned_locks(locking.LEVEL_NODE_RES)) 2406 2407 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) 2408 assert list(owned_instances) == [self.instance_name], \ 2409 "Instance '%s' not locked" % self.instance_name 2410 2411 if not self.disks: 2412 feedback_fn("No disks need replacement for instance '%s'" % 2413 self.instance.name) 2414 return 2415 2416 feedback_fn("Replacing disk(s) %s for instance '%s'" % 2417 (utils.CommaJoin(self.disks), self.instance.name)) 2418 feedback_fn("Current primary node: %s" % 2419 self.cfg.GetNodeName(self.instance.primary_node)) 2420 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 2421 feedback_fn("Current secondary node: %s" % 2422 utils.CommaJoin(self.cfg.GetNodeNames(secondary_nodes))) 2423 2424 activate_disks = not self.instance.disks_active 2425 2426 # Activate the instance disks if we're replacing them on a down instance 2427 if activate_disks: 2428 StartInstanceDisks(self.lu, self.instance, True) 2429 # Re-read the instance object modified by the previous call 2430 self.instance = self.cfg.GetInstanceInfo(self.instance.uuid) 2431 2432 try: 2433 # Should we replace the secondary node? 2434 if self.new_node_uuid is not None: 2435 fn = self._ExecDrbd8Secondary 2436 else: 2437 fn = self._ExecDrbd8DiskOnly 2438 2439 result = fn(feedback_fn) 2440 finally: 2441 # Deactivate the instance disks if we're replacing them on a 2442 # down instance 2443 if activate_disks: 2444 _SafeShutdownInstanceDisks(self.lu, self.instance, 2445 req_states=INSTANCE_NOT_RUNNING) 2446 2447 assert not self.lu.owned_locks(locking.LEVEL_NODE) 2448 2449 if __debug__: 2450 # Verify owned locks 2451 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES) 2452 nodes = frozenset(self.node_secondary_ip) 2453 assert ((self.early_release and not owned_nodes) or 2454 (not self.early_release and not (set(owned_nodes) - nodes))), \ 2455 ("Not owning the correct locks, early_release=%s, owned=%r," 2456 " nodes=%r" % (self.early_release, owned_nodes, nodes)) 2457 2458 return result
2459
2460 - def _CheckVolumeGroup(self, node_uuids):
2461 self.lu.LogInfo("Checking volume groups") 2462 2463 vgname = self.cfg.GetVGName() 2464 2465 # Make sure volume group exists on all involved nodes 2466 results = self.rpc.call_vg_list(node_uuids) 2467 if not results: 2468 raise errors.OpExecError("Can't list volume groups on the nodes") 2469 2470 for node_uuid in node_uuids: 2471 res = results[node_uuid] 2472 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid)) 2473 if vgname not in res.payload: 2474 raise errors.OpExecError("Volume group '%s' not found on node %s" % 2475 (vgname, self.cfg.GetNodeName(node_uuid)))
2476
2477 - def _CheckDisksExistence(self, node_uuids):
2478 # Check disk existence 2479 for idx, dev in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)): 2480 if idx not in self.disks: 2481 continue 2482 2483 for node_uuid in node_uuids: 2484 self.lu.LogInfo("Checking disk/%d on %s", idx, 2485 self.cfg.GetNodeName(node_uuid)) 2486 2487 result = _BlockdevFind(self, node_uuid, dev, self.instance) 2488 2489 msg = result.fail_msg 2490 if msg or not result.payload: 2491 if not msg: 2492 msg = "disk not found" 2493 if not self._CheckDisksActivated(self.instance): 2494 extra_hint = ("\nDisks seem to be not properly activated. Try" 2495 " running activate-disks on the instance before" 2496 " using replace-disks.") 2497 else: 2498 extra_hint = "" 2499 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" % 2500 (idx, self.cfg.GetNodeName(node_uuid), msg, 2501 extra_hint))
2502
2503 - def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2504 for idx, dev in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)): 2505 if idx not in self.disks: 2506 continue 2507 2508 self.lu.LogInfo("Checking disk/%d consistency on node %s" % 2509 (idx, self.cfg.GetNodeName(node_uuid))) 2510 2511 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid, 2512 on_primary, ldisk=ldisk): 2513 raise errors.OpExecError("Node %s has degraded storage, unsafe to" 2514 " replace disks for instance %s" % 2515 (self.cfg.GetNodeName(node_uuid), 2516 self.instance.name))
2517
2518 - def _CreateNewStorage(self, node_uuid):
2519 """Create new storage on the primary or secondary node. 2520 2521 This is only used for same-node replaces, not for changing the 2522 secondary node, hence we don't want to modify the existing disk. 2523 2524 """ 2525 iv_names = {} 2526 2527 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 2528 disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg) 2529 for idx, dev in enumerate(disks): 2530 if idx not in self.disks: 2531 continue 2532 2533 self.lu.LogInfo("Adding storage on %s for disk/%d", 2534 self.cfg.GetNodeName(node_uuid), idx) 2535 2536 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] 2537 names = _GenerateUniqueNames(self.lu, lv_names) 2538 2539 (data_disk, meta_disk) = dev.children 2540 vg_data = data_disk.logical_id[0] 2541 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size, 2542 logical_id=(vg_data, names[0]), 2543 params=data_disk.params) 2544 vg_meta = meta_disk.logical_id[0] 2545 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN, 2546 size=constants.DRBD_META_SIZE, 2547 logical_id=(vg_meta, names[1]), 2548 params=meta_disk.params) 2549 2550 new_lvs = [lv_data, lv_meta] 2551 old_lvs = [child.Copy() for child in dev.children] 2552 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) 2553 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid) 2554 2555 # we pass force_create=True to force the LVM creation 2556 for new_lv in new_lvs: 2557 try: 2558 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True, 2559 GetInstanceInfoText(self.instance), False, 2560 excl_stor) 2561 except errors.DeviceCreationError, e: 2562 raise errors.OpExecError("Can't create block device: %s" % e.message) 2563 2564 return iv_names
2565
2566 - def _CheckDevices(self, node_uuid, iv_names):
2567 for name, (dev, _, _) in iv_names.iteritems(): 2568 result = _BlockdevFind(self, node_uuid, dev, self.instance) 2569 2570 msg = result.fail_msg 2571 if msg or not result.payload: 2572 if not msg: 2573 msg = "disk not found" 2574 raise errors.OpExecError("Can't find DRBD device %s: %s" % 2575 (name, msg)) 2576 2577 if result.payload.is_degraded: 2578 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2579
2580 - def _RemoveOldStorage(self, node_uuid, iv_names):
2581 for name, (_, old_lvs, _) in iv_names.iteritems(): 2582 self.lu.LogInfo("Remove logical volumes for %s", name) 2583 2584 for lv in old_lvs: 2585 msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \ 2586 .fail_msg 2587 if msg: 2588 self.lu.LogWarning("Can't remove old LV: %s", msg, 2589 hint="remove unused LVs manually")
2590
2591 - def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2592 """Replace a disk on the primary or secondary for DRBD 8. 2593 2594 The algorithm for replace is quite complicated: 2595 2596 1. for each disk to be replaced: 2597 2598 1. create new LVs on the target node with unique names 2599 1. detach old LVs from the drbd device 2600 1. rename old LVs to name_replaced.<time_t> 2601 1. rename new LVs to old LVs 2602 1. attach the new LVs (with the old names now) to the drbd device 2603 2604 1. wait for sync across all devices 2605 2606 1. for each modified disk: 2607 2608 1. remove old LVs (which have the name name_replaces.<time_t>) 2609 2610 Failures are not very well handled. 2611 2612 """ 2613 steps_total = 6 2614 2615 # Step: check device activation 2616 self.lu.LogStep(1, steps_total, "Check device existence") 2617 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid]) 2618 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid]) 2619 2620 # Step: check other node consistency 2621 self.lu.LogStep(2, steps_total, "Check peer consistency") 2622 self._CheckDisksConsistency( 2623 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node, 2624 False) 2625 2626 # Step: create new storage 2627 self.lu.LogStep(3, steps_total, "Allocate new storage") 2628 iv_names = self._CreateNewStorage(self.target_node_uuid) 2629 2630 # Step: for each lv, detach+rename*2+attach 2631 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2632 for dev, old_lvs, new_lvs in iv_names.itervalues(): 2633 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name) 2634 2635 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, 2636 (dev, self.instance), 2637 (old_lvs, self.instance)) 2638 result.Raise("Can't detach drbd from local storage on node" 2639 " %s for device %s" % 2640 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name)) 2641 #dev.children = [] 2642 #cfg.Update(instance) 2643 2644 # ok, we created the new LVs, so now we know we have the needed 2645 # storage; as such, we proceed on the target node to rename 2646 # old_lv to _old, and new_lv to old_lv; note that we rename LVs 2647 # using the assumption that logical_id == unique_id on that node 2648 2649 # FIXME(iustin): use a better name for the replaced LVs 2650 temp_suffix = int(time.time()) 2651 ren_fn = lambda d, suff: (d.logical_id[0], 2652 d.logical_id[1] + "_replaced-%s" % suff) 2653 2654 # Build the rename list based on what LVs exist on the node 2655 rename_old_to_new = [] 2656 for to_ren in old_lvs: 2657 result = self.rpc.call_blockdev_find(self.target_node_uuid, 2658 (to_ren, self.instance)) 2659 if not result.fail_msg and result.payload: 2660 # device exists 2661 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) 2662 2663 self.lu.LogInfo("Renaming the old LVs on the target node") 2664 result = self.rpc.call_blockdev_rename(self.target_node_uuid, 2665 rename_old_to_new) 2666 result.Raise("Can't rename old LVs on node %s" % 2667 self.cfg.GetNodeName(self.target_node_uuid)) 2668 2669 # Now we rename the new LVs to the old LVs 2670 self.lu.LogInfo("Renaming the new LVs on the target node") 2671 rename_new_to_old = [(new, old.logical_id) 2672 for old, new in zip(old_lvs, new_lvs)] 2673 result = self.rpc.call_blockdev_rename(self.target_node_uuid, 2674 rename_new_to_old) 2675 result.Raise("Can't rename new LVs on node %s" % 2676 self.cfg.GetNodeName(self.target_node_uuid)) 2677 2678 # Intermediate steps of in memory modifications 2679 for old, new in zip(old_lvs, new_lvs): 2680 new.logical_id = old.logical_id 2681 2682 # We need to modify old_lvs so that removal later removes the 2683 # right LVs, not the newly added ones; note that old_lvs is a 2684 # copy here 2685 for disk in old_lvs: 2686 disk.logical_id = ren_fn(disk, temp_suffix) 2687 2688 # Now that the new lvs have the old name, we can add them to the device 2689 self.lu.LogInfo("Adding new mirror component on %s", 2690 self.cfg.GetNodeName(self.target_node_uuid)) 2691 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid, 2692 (dev, self.instance), 2693 (new_lvs, self.instance)) 2694 msg = result.fail_msg 2695 if msg: 2696 for new_lv in new_lvs: 2697 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid, 2698 (new_lv, self.instance)).fail_msg 2699 if msg2: 2700 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, 2701 hint=("cleanup manually the unused logical" 2702 "volumes")) 2703 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) 2704 2705 cstep = itertools.count(5) 2706 2707 if self.early_release: 2708 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2709 self._RemoveOldStorage(self.target_node_uuid, iv_names) 2710 # TODO: Check if releasing locks early still makes sense 2711 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2712 else: 2713 # Release all resource locks except those used by the instance 2714 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2715 keep=self.node_secondary_ip.keys()) 2716 2717 # Release all node locks while waiting for sync 2718 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2719 2720 # TODO: Can the instance lock be downgraded here? Take the optional disk 2721 # shutdown in the caller into consideration. 2722 2723 # Wait for sync 2724 # This can fail as the old devices are degraded and _WaitForSync 2725 # does a combined result over all disks, so we don't check its return value 2726 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2727 WaitForSync(self.lu, self.instance) 2728 2729 # Check all devices manually 2730 self._CheckDevices(self.instance.primary_node, iv_names) 2731 2732 # Step: remove old storage 2733 if not self.early_release: 2734 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2735 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2736
2737 - def _ExecDrbd8Secondary(self, feedback_fn):
2738 """Replace the secondary node for DRBD 8. 2739 2740 The algorithm for replace is quite complicated: 2741 - for all disks of the instance: 2742 - create new LVs on the new node with same names 2743 - shutdown the drbd device on the old secondary 2744 - disconnect the drbd network on the primary 2745 - create the drbd device on the new secondary 2746 - network attach the drbd on the primary, using an artifice: 2747 the drbd code for Attach() will connect to the network if it 2748 finds a device which is connected to the good local disks but 2749 not network enabled 2750 - wait for sync across all devices 2751 - remove all disks from the old secondary 2752 2753 Failures are not very well handled. 2754 2755 """ 2756 steps_total = 6 2757 2758 pnode = self.instance.primary_node 2759 2760 # Step: check device activation 2761 self.lu.LogStep(1, steps_total, "Check device existence") 2762 self._CheckDisksExistence([self.instance.primary_node]) 2763 self._CheckVolumeGroup([self.instance.primary_node]) 2764 2765 # Step: check other node consistency 2766 self.lu.LogStep(2, steps_total, "Check peer consistency") 2767 self._CheckDisksConsistency(self.instance.primary_node, True, True) 2768 2769 # Step: create new storage 2770 self.lu.LogStep(3, steps_total, "Allocate new storage") 2771 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 2772 disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg) 2773 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, 2774 self.new_node_uuid) 2775 for idx, dev in enumerate(disks): 2776 self.lu.LogInfo("Adding new local storage on %s for disk/%d" % 2777 (self.cfg.GetNodeName(self.new_node_uuid), idx)) 2778 # we pass force_create=True to force LVM creation 2779 for new_lv in dev.children: 2780 try: 2781 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance, 2782 new_lv, True, GetInstanceInfoText(self.instance), 2783 False, excl_stor) 2784 except errors.DeviceCreationError, e: 2785 raise errors.OpExecError("Can't create block device: %s" % e.message) 2786 2787 # Step 4: dbrd minors and drbd setups changes 2788 # after this, we must manually remove the drbd minors on both the 2789 # error and the success paths 2790 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2791 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid 2792 for _ in inst_disks], 2793 self.instance.uuid) 2794 logging.debug("Allocated minors %r", minors) 2795 2796 iv_names = {} 2797 for idx, (dev, new_minor) in enumerate(zip(inst_disks, minors)): 2798 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % 2799 (self.cfg.GetNodeName(self.new_node_uuid), idx)) 2800 # create new devices on new_node; note that we create two IDs: 2801 # one without port, so the drbd will be activated without 2802 # networking information on the new node at this stage, and one 2803 # with network, for the latter activation in step 4 2804 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id 2805 if self.instance.primary_node == o_node1: 2806 p_minor = o_minor1 2807 else: 2808 assert self.instance.primary_node == o_node2, "Three-node instance?" 2809 p_minor = o_minor2 2810 2811 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None, 2812 p_minor, new_minor, o_secret) 2813 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port, 2814 p_minor, new_minor, o_secret) 2815 2816 iv_names[idx] = (dev, dev.children, new_net_id) 2817 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, 2818 new_net_id) 2819 new_drbd = objects.Disk(dev_type=constants.DT_DRBD8, 2820 logical_id=new_alone_id, 2821 children=dev.children, 2822 size=dev.size, 2823 params={}) 2824 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd], 2825 self.cfg) 2826 try: 2827 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance, 2828 anno_new_drbd, 2829 GetInstanceInfoText(self.instance), False, 2830 excl_stor) 2831 except errors.GenericError: 2832 self.cfg.ReleaseDRBDMinors(self.instance.uuid) 2833 raise 2834 2835 # We have new devices, shutdown the drbd on the old secondary 2836 for idx, dev in enumerate(inst_disks): 2837 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx) 2838 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid, 2839 (dev, self.instance)).fail_msg 2840 if msg: 2841 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" 2842 "node: %s" % (idx, msg), 2843 hint=("Please cleanup this device manually as" 2844 " soon as possible")) 2845 2846 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") 2847 result = self.rpc.call_drbd_disconnect_net( 2848 [pnode], (inst_disks, self.instance))[pnode] 2849 2850 msg = result.fail_msg 2851 if msg: 2852 # detaches didn't succeed (unlikely) 2853 self.cfg.ReleaseDRBDMinors(self.instance.uuid) 2854 raise errors.OpExecError("Can't detach the disks from the network on" 2855 " old node: %s" % (msg,)) 2856 2857 # if we managed to detach at least one, we update all the disks of 2858 # the instance to point to the new secondary 2859 self.lu.LogInfo("Updating instance configuration") 2860 for dev, _, new_logical_id in iv_names.itervalues(): 2861 dev.logical_id = new_logical_id 2862 self.cfg.Update(dev, feedback_fn) 2863 2864 self.cfg.Update(self.instance, feedback_fn) 2865 2866 # Release all node locks (the configuration has been updated) 2867 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2868 2869 # and now perform the drbd attach 2870 self.lu.LogInfo("Attaching primary drbds to new secondary" 2871 " (standalone => connected)") 2872 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 2873 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, 2874 self.new_node_uuid], 2875 (inst_disks, self.instance), 2876 self.instance.name, 2877 False) 2878 for to_node, to_result in result.items(): 2879 msg = to_result.fail_msg 2880 if msg: 2881 raise errors.OpExecError( 2882 "Can't attach drbd disks on node %s: %s (please do a gnt-instance " 2883 "info %s to see the status of disks)" % 2884 (self.cfg.GetNodeName(to_node), msg, self.instance.name)) 2885 2886 cstep = itertools.count(5) 2887 2888 if self.early_release: 2889 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2890 self._RemoveOldStorage(self.target_node_uuid, iv_names) 2891 # TODO: Check if releasing locks early still makes sense 2892 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2893 else: 2894 # Release all resource locks except those used by the instance 2895 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2896 keep=self.node_secondary_ip.keys()) 2897 2898 # TODO: Can the instance lock be downgraded here? Take the optional disk 2899 # shutdown in the caller into consideration. 2900 2901 # Wait for sync 2902 # This can fail as the old devices are degraded and _WaitForSync 2903 # does a combined result over all disks, so we don't check its return value 2904 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2905 WaitForSync(self.lu, self.instance) 2906 2907 # Check all devices manually 2908 self._CheckDevices(self.instance.primary_node, iv_names) 2909 2910 # Step: remove old storage 2911 if not self.early_release: 2912 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2913 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2914
2915 2916 -class TemporaryDisk():
2917 """ Creates a new temporary bootable disk, and makes sure it is destroyed. 2918 2919 Is a context manager, and should be used with the ``with`` statement as such. 2920 2921 The disk is guaranteed to be created at index 0, shifting any other disks of 2922 the instance by one place, and allowing the instance to be booted with the 2923 content of the disk. 2924 2925 """ 2926
2927 - def __init__(self, lu, instance, disks, feedback_fn, 2928 shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
2929 """ Constructor storing arguments until used later. 2930 2931 @type lu: L{ganeti.cmdlib.base.LogicalUnit} 2932 @param lu: The LU within which this disk is created. 2933 2934 @type instance: L{ganeti.objects.Instance} 2935 @param instance: The instance to which the disk should be added 2936 2937 @type disks: list of triples (disk template, disk access mode, int) 2938 @param disks: 2939 disk specification, which is a list of triples containing the 2940 disk template (e.g., L{constants.DT_PLAIN}), the disk access 2941 mode (i.e., L{constants.DISK_RDONLY} or L{constants.DISK_RDWR}), 2942 and size in MiB. 2943 2944 @type feedback_fn: function 2945 @param feedback_fn: Function used to log progress 2946 2947 """ 2948 self._lu = lu 2949 self._instance = instance 2950 self._disks = disks 2951 self._feedback_fn = feedback_fn 2952 self._shutdown_timeout = shutdown_timeout
2953
2954 - def _EnsureInstanceDiskState(self):
2955 """ Ensures that the instance is down, and its disks inactive. 2956 2957 All the operations related to the creation and destruction of disks require 2958 that the instance is down and that the disks are inactive. This function is 2959 invoked to make it so. 2960 2961 """ 2962 # The instance needs to be down before any of these actions occur 2963 # Whether it is must be checked manually through a RPC - configuration 2964 # reflects only the desired state 2965 self._feedback_fn("Shutting down instance") 2966 result = self._lu.rpc.call_instance_shutdown(self._instance.primary_node, 2967 self._instance, 2968 self._shutdown_timeout, 2969 self._lu.op.reason) 2970 result.Raise("Shutdown of instance '%s' while removing temporary disk " 2971 "failed" % self._instance.name) 2972 2973 # Disks need to be deactivated prior to being removed 2974 # The disks_active configuration entry should match the actual state 2975 if self._instance.disks_active: 2976 self._feedback_fn("Deactivating disks") 2977 ShutdownInstanceDisks(self._lu, self._instance)
2978
2979 - def __enter__(self):
2980 """ Context manager entry function, creating the disk. 2981 2982 @rtype: L{ganeti.objects.Disk} 2983 @return: The disk object created. 2984 2985 """ 2986 self._EnsureInstanceDiskState() 2987 2988 new_disks = [] 2989 2990 # The iv_name of the disk intentionally diverges from Ganeti's standards, as 2991 # this disk should be very temporary and its presence should be reported. 2992 # With the special iv_name, gnt-cluster verify detects the disk and warns 2993 # the user of its presence. Removing the disk restores the instance to its 2994 # proper state, despite an error that appears when the removal is performed. 2995 for idx, (disk_template, disk_access, disk_size) in enumerate(self._disks): 2996 new_disk = objects.Disk() 2997 new_disk.dev_type = disk_template 2998 new_disk.mode = disk_access 2999 new_disk.uuid = self._lu.cfg.GenerateUniqueID(self._lu.proc.GetECId()) 3000 new_disk.logical_id = (self._lu.cfg.GetVGName(), new_disk.uuid) 3001 new_disk.params = {} 3002 new_disk.size = disk_size 3003 3004 new_disks.append(new_disk) 3005 3006 self._feedback_fn("Attempting to create temporary disk") 3007 3008 self._undoing_info = CreateDisks(self._lu, self._instance, disks=new_disks) 3009 for idx, new_disk in enumerate(new_disks): 3010 self._lu.cfg.AddInstanceDisk(self._instance.uuid, new_disk, idx=idx) 3011 self._instance = self._lu.cfg.GetInstanceInfo(self._instance.uuid) 3012 3013 self._feedback_fn("Temporary disk created") 3014 3015 self._new_disks = new_disks 3016 3017 return new_disks
3018
3019 - def __exit__(self, exc_type, _value, _traceback):
3020 """ Context manager exit function, destroying the disk. 3021 3022 """ 3023 if exc_type: 3024 self._feedback_fn("Exception raised, cleaning up temporary disk") 3025 else: 3026 self._feedback_fn("Regular cleanup of temporary disk") 3027 3028 try: 3029 self._EnsureInstanceDiskState() 3030 3031 _UndoCreateDisks(self._lu, self._undoing_info, self._instance) 3032 3033 for disk in self._new_disks: 3034 self._lu.cfg.RemoveInstanceDisk(self._instance.uuid, disk.uuid) 3035 self._instance = self._lu.cfg.GetInstanceInfo(self._instance.uuid) 3036 3037 self._feedback_fn("Temporary disk removed") 3038 except: 3039 self._feedback_fn("Disk cleanup failed; it will have to be removed " 3040 "manually") 3041 raise
3042