Package ganeti :: Package cmdlib :: Module instance_storage
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_storage

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with storage of instances.""" 
  32   
  33  import itertools 
  34  import logging 
  35  import os 
  36  import time 
  37   
  38  from ganeti import compat 
  39  from ganeti import constants 
  40  from ganeti import errors 
  41  from ganeti import ht 
  42  from ganeti import locking 
  43  from ganeti.masterd import iallocator 
  44  from ganeti import objects 
  45  from ganeti import utils 
  46  import ganeti.rpc.node as rpc 
  47  from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet 
  48  from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \ 
  49    AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \ 
  50    ComputeIPolicyDiskSizesViolation, \ 
  51    CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \ 
  52    IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \ 
  53    CheckDiskTemplateEnabled 
  54  from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \ 
  55    CopyLockList, ReleaseLocks, CheckNodeVmCapable, \ 
  56    BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy 
  57   
  58  import ganeti.masterd.instance 
  59   
  60   
  61  _DISK_TEMPLATE_NAME_PREFIX = { 
  62    constants.DT_PLAIN: "", 
  63    constants.DT_RBD: ".rbd", 
  64    constants.DT_EXT: ".ext", 
  65    constants.DT_FILE: ".file", 
  66    constants.DT_SHARED_FILE: ".sharedfile", 
  67    } 
68 69 70 -def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open, 71 excl_stor):
72 """Create a single block device on a given node. 73 74 This will not recurse over children of the device, so they must be 75 created in advance. 76 77 @param lu: the lu on whose behalf we execute 78 @param node_uuid: the node on which to create the device 79 @type instance: L{objects.Instance} 80 @param instance: the instance which owns the device 81 @type device: L{objects.Disk} 82 @param device: the device to create 83 @param info: the extra 'metadata' we should attach to the device 84 (this will be represented as a LVM tag) 85 @type force_open: boolean 86 @param force_open: this parameter will be passes to the 87 L{backend.BlockdevCreate} function where it specifies 88 whether we run on primary or not, and it affects both 89 the child assembly and the device own Open() execution 90 @type excl_stor: boolean 91 @param excl_stor: Whether exclusive_storage is active for the node 92 93 """ 94 result = lu.rpc.call_blockdev_create(node_uuid, (device, instance), 95 device.size, instance.name, force_open, 96 info, excl_stor) 97 result.Raise("Can't create block device %s on" 98 " node %s for instance %s" % (device, 99 lu.cfg.GetNodeName(node_uuid), 100 instance.name))
101
102 103 -def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create, 104 info, force_open, excl_stor):
105 """Create a tree of block devices on a given node. 106 107 If this device type has to be created on secondaries, create it and 108 all its children. 109 110 If not, just recurse to children keeping the same 'force' value. 111 112 @attention: The device has to be annotated already. 113 114 @param lu: the lu on whose behalf we execute 115 @param node_uuid: the node on which to create the device 116 @type instance: L{objects.Instance} 117 @param instance: the instance which owns the device 118 @type device: L{objects.Disk} 119 @param device: the device to create 120 @type force_create: boolean 121 @param force_create: whether to force creation of this device; this 122 will be change to True whenever we find a device which has 123 CreateOnSecondary() attribute 124 @param info: the extra 'metadata' we should attach to the device 125 (this will be represented as a LVM tag) 126 @type force_open: boolean 127 @param force_open: this parameter will be passes to the 128 L{backend.BlockdevCreate} function where it specifies 129 whether we run on primary or not, and it affects both 130 the child assembly and the device own Open() execution 131 @type excl_stor: boolean 132 @param excl_stor: Whether exclusive_storage is active for the node 133 134 @return: list of created devices 135 """ 136 created_devices = [] 137 try: 138 if device.CreateOnSecondary(): 139 force_create = True 140 141 if device.children: 142 for child in device.children: 143 devs = _CreateBlockDevInner(lu, node_uuid, instance, child, 144 force_create, info, force_open, excl_stor) 145 created_devices.extend(devs) 146 147 if not force_create: 148 return created_devices 149 150 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open, 151 excl_stor) 152 # The device has been completely created, so there is no point in keeping 153 # its subdevices in the list. We just add the device itself instead. 154 created_devices = [(node_uuid, device)] 155 return created_devices 156 157 except errors.DeviceCreationError, e: 158 e.created_devices.extend(created_devices) 159 raise e 160 except errors.OpExecError, e: 161 raise errors.DeviceCreationError(str(e), created_devices)
162
163 164 -def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
165 """Whether exclusive_storage is in effect for the given node. 166 167 @type cfg: L{config.ConfigWriter} 168 @param cfg: The cluster configuration 169 @type node_uuid: string 170 @param node_uuid: The node UUID 171 @rtype: bool 172 @return: The effective value of exclusive_storage 173 @raise errors.OpPrereqError: if no node exists with the given name 174 175 """ 176 ni = cfg.GetNodeInfo(node_uuid) 177 if ni is None: 178 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid, 179 errors.ECODE_NOENT) 180 return IsExclusiveStorageEnabledNode(cfg, ni)
181
182 183 -def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info, 184 force_open):
185 """Wrapper around L{_CreateBlockDevInner}. 186 187 This method annotates the root device first. 188 189 """ 190 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg) 191 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid) 192 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info, 193 force_open, excl_stor)
194
195 196 -def _UndoCreateDisks(lu, disks_created, instance):
197 """Undo the work performed by L{CreateDisks}. 198 199 This function is called in case of an error to undo the work of 200 L{CreateDisks}. 201 202 @type lu: L{LogicalUnit} 203 @param lu: the logical unit on whose behalf we execute 204 @param disks_created: the result returned by L{CreateDisks} 205 @type instance: L{objects.Instance} 206 @param instance: the instance for which disks were created 207 208 """ 209 for (node_uuid, disk) in disks_created: 210 result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance)) 211 result.Warn("Failed to remove newly-created disk %s on node %s" % 212 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
213
214 215 -def CreateDisks(lu, instance, disk_template=None, 216 to_skip=None, target_node_uuid=None, disks=None):
217 """Create all disks for an instance. 218 219 This abstracts away some work from AddInstance. 220 221 Since the instance may not have been saved to the config file yet, this 222 function can not query the config file for the instance's disks; in that 223 case they need to be passed as an argument. 224 225 This function is also used by the disk template conversion mechanism to 226 create the new disks of the instance. Since the instance will have the 227 old template at the time we create the new disks, the new template must 228 be passed as an extra argument. 229 230 @type lu: L{LogicalUnit} 231 @param lu: the logical unit on whose behalf we execute 232 @type instance: L{objects.Instance} 233 @param instance: the instance whose disks we should create 234 @type disk_template: string 235 @param disk_template: if provided, overrides the instance's disk_template 236 @type to_skip: list 237 @param to_skip: list of indices to skip 238 @type target_node_uuid: string 239 @param target_node_uuid: if passed, overrides the target node for creation 240 @type disks: list of {objects.Disk} 241 @param disks: the disks to create; if not specified, all the disks of the 242 instance are created 243 @return: information about the created disks, to be used to call 244 L{_UndoCreateDisks} 245 @raise errors.OpPrereqError: in case of error 246 247 """ 248 info = GetInstanceInfoText(instance) 249 250 if disks is None: 251 disks = lu.cfg.GetInstanceDisks(instance.uuid) 252 253 if target_node_uuid is None: 254 pnode_uuid = instance.primary_node 255 # We cannot use config's 'GetInstanceNodes' here as 'CreateDisks' 256 # is used by 'LUInstanceCreate' and the instance object is not 257 # stored in the config yet. 258 all_node_uuids = [] 259 for disk in disks: 260 all_node_uuids.extend(disk.all_nodes) 261 all_node_uuids = set(all_node_uuids) 262 # ensure that primary node is always the first 263 all_node_uuids.discard(pnode_uuid) 264 all_node_uuids = [pnode_uuid] + list(all_node_uuids) 265 else: 266 pnode_uuid = target_node_uuid 267 all_node_uuids = [pnode_uuid] 268 269 if disk_template is None: 270 disk_template = utils.GetDiskTemplate(disks) 271 if disk_template == constants.DT_MIXED: 272 raise errors.OpExecError("Creating disk for '%s' instances " 273 "only possible with explicit disk template." 274 % (constants.DT_MIXED,)) 275 276 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), disk_template) 277 278 if disk_template in constants.DTS_FILEBASED: 279 file_storage_dir = os.path.dirname(disks[0].logical_id[1]) 280 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir) 281 282 result.Raise("Failed to create directory '%s' on" 283 " node %s" % (file_storage_dir, 284 lu.cfg.GetNodeName(pnode_uuid))) 285 286 disks_created = [] 287 for idx, device in enumerate(disks): 288 if to_skip and idx in to_skip: 289 continue 290 logging.info("Creating disk %s for instance '%s'", idx, instance.name) 291 for node_uuid in all_node_uuids: 292 f_create = node_uuid == pnode_uuid 293 try: 294 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info, 295 f_create) 296 disks_created.append((node_uuid, device)) 297 except errors.DeviceCreationError, e: 298 logging.warning("Creating disk %s for instance '%s' failed", 299 idx, instance.name) 300 disks_created.extend(e.created_devices) 301 _UndoCreateDisks(lu, disks_created, instance) 302 raise errors.OpExecError(e.message) 303 return disks_created
304
305 306 -def ComputeDiskSizePerVG(disk_template, disks):
307 """Compute disk size requirements in the volume group 308 309 """ 310 def _compute(disks, payload): 311 """Universal algorithm. 312 313 """ 314 vgs = {} 315 for disk in disks: 316 vg_name = disk[constants.IDISK_VG] 317 vgs[vg_name] = \ 318 vgs.get(vg_name, 0) + disk[constants.IDISK_SIZE] + payload 319 320 return vgs
321 322 # Required free disk space as a function of disk and swap space 323 req_size_dict = { 324 constants.DT_DISKLESS: {}, 325 constants.DT_PLAIN: _compute(disks, 0), 326 # 128 MB are added for drbd metadata for each disk 327 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE), 328 constants.DT_FILE: {}, 329 constants.DT_SHARED_FILE: {}, 330 constants.DT_GLUSTER: {}, 331 } 332 333 if disk_template not in req_size_dict: 334 raise errors.ProgrammerError("Disk template '%s' size requirement" 335 " is unknown" % disk_template) 336 337 return req_size_dict[disk_template] 338
339 340 -def ComputeDisks(disks, disk_template, default_vg):
341 """Computes the instance disks. 342 343 @type disks: list of dictionaries 344 @param disks: The disks' input dictionary 345 @type disk_template: string 346 @param disk_template: The disk template of the instance 347 @type default_vg: string 348 @param default_vg: The default_vg to assume 349 350 @return: The computed disks 351 352 """ 353 new_disks = [] 354 for disk in disks: 355 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR) 356 if mode not in constants.DISK_ACCESS_SET: 357 raise errors.OpPrereqError("Invalid disk access mode '%s'" % 358 mode, errors.ECODE_INVAL) 359 size = disk.get(constants.IDISK_SIZE, None) 360 if size is None: 361 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) 362 try: 363 size = int(size) 364 except (TypeError, ValueError): 365 raise errors.OpPrereqError("Invalid disk size '%s'" % size, 366 errors.ECODE_INVAL) 367 368 CheckDiskExtProvider(disk, disk_template) 369 370 data_vg = disk.get(constants.IDISK_VG, default_vg) 371 name = disk.get(constants.IDISK_NAME, None) 372 if name is not None and name.lower() == constants.VALUE_NONE: 373 name = None 374 new_disk = { 375 constants.IDISK_SIZE: size, 376 constants.IDISK_MODE: mode, 377 constants.IDISK_VG: data_vg, 378 constants.IDISK_NAME: name, 379 constants.IDISK_TYPE: disk_template, 380 } 381 382 for key in [ 383 constants.IDISK_METAVG, 384 constants.IDISK_ADOPT, 385 constants.IDISK_SPINDLES, 386 ]: 387 if key in disk: 388 new_disk[key] = disk[key] 389 390 # Add IDISK_ACCESS parameter for disk templates that support it 391 if (disk_template in constants.DTS_HAVE_ACCESS and 392 constants.IDISK_ACCESS in disk): 393 new_disk[constants.IDISK_ACCESS] = disk[constants.IDISK_ACCESS] 394 395 # For extstorage, demand the `provider' option and add any 396 # additional parameters (ext-params) to the dict 397 if disk_template == constants.DT_EXT: 398 new_disk[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER] 399 for key in disk: 400 if key not in constants.IDISK_PARAMS: 401 new_disk[key] = disk[key] 402 403 new_disks.append(new_disk) 404 405 return new_disks
406
407 408 -def ComputeDisksInfo(disks, disk_template, default_vg, ext_params):
409 """Computes the new instance's disks for the template conversion. 410 411 This method is used by the disks template conversion mechanism. Using the 412 'ComputeDisks' method as an auxiliary method computes the disks that will be 413 used for generating the new disk template of the instance. It computes the 414 size, mode, and name parameters from the instance's current disks, such as 415 the volume group and the access parameters for the templates that support 416 them. For conversions targeting an extstorage template, the mandatory 417 provider's name or any user-provided extstorage parameters will also be 418 included in the result. 419 420 @type disks: list of {objects.Disk} 421 @param disks: The current disks of the instance 422 @type disk_template: string 423 @param disk_template: The disk template of the instance 424 @type default_vg: string 425 @param default_vg: The default volume group to assume 426 @type ext_params: dict 427 @param ext_params: The extstorage parameters 428 429 @rtype: list of dictionaries 430 @return: The computed disks' information for the new template 431 432 """ 433 # Ensure 'ext_params' does not violate existing disks' params 434 for key in ext_params.keys(): 435 if key != constants.IDISK_PROVIDER: 436 assert key not in constants.IDISK_PARAMS, \ 437 "Invalid extstorage parameter '%s'" % key 438 439 # Prepare the disks argument for the 'ComputeDisks' method. 440 inst_disks = [dict((key, value) for key, value in disk.iteritems() 441 if key in constants.IDISK_PARAMS) 442 for disk in map(objects.Disk.ToDict, disks)] 443 444 # Update disks with the user-provided 'ext_params'. 445 for disk in inst_disks: 446 disk.update(ext_params) 447 448 # Compute the new disks' information. 449 new_disks = ComputeDisks(inst_disks, disk_template, default_vg) 450 451 # Add missing parameters to the previously computed disks. 452 for disk, new_disk in zip(disks, new_disks): 453 # Conversions between ExtStorage templates allowed only for different 454 # providers. 455 if (disk.dev_type == disk_template and 456 disk_template == constants.DT_EXT): 457 provider = new_disk[constants.IDISK_PROVIDER] 458 if provider == disk.params[constants.IDISK_PROVIDER]: 459 raise errors.OpPrereqError("Not converting, '%s' of type ExtStorage" 460 " already using provider '%s'" % 461 (disk.iv_name, provider), errors.ECODE_INVAL) 462 463 # Add IDISK_ACCESS parameter for conversions between disk templates that 464 # support it. 465 if (disk_template in constants.DTS_HAVE_ACCESS and 466 constants.IDISK_ACCESS in disk.params): 467 new_disk[constants.IDISK_ACCESS] = disk.params[constants.IDISK_ACCESS] 468 469 # For LVM-based conversions (plain <-> drbd) use the same volume group. 470 if disk_template in constants.DTS_LVM: 471 if disk.dev_type == constants.DT_PLAIN: 472 new_disk[constants.IDISK_VG] = disk.logical_id[0] 473 elif disk.dev_type == constants.DT_DRBD8: 474 new_disk[constants.IDISK_VG] = disk.children[0].logical_id[0] 475 476 return new_disks
477
478 479 -def CalculateFileStorageDir(disk_type, cfg, instance_name, 480 file_storage_dir=None):
481 """Calculate final instance file storage dir. 482 483 @type disk_type: disk template 484 @param disk_type: L{constants.DT_FILE}, L{constants.DT_SHARED_FILE}, or 485 L{constants.DT_GLUSTER} 486 487 @type cfg: ConfigWriter 488 @param cfg: the configuration that is to be used. 489 @type file_storage_dir: path 490 @param file_storage_dir: the path below the configured base. 491 @type instance_name: string 492 @param instance_name: name of the instance this disk is for. 493 494 @rtype: string 495 @return: The file storage directory for the instance 496 497 """ 498 # file storage dir calculation/check 499 instance_file_storage_dir = None 500 if disk_type in constants.DTS_FILEBASED: 501 # build the full file storage dir path 502 joinargs = [] 503 504 cfg_storage = None 505 if disk_type == constants.DT_FILE: 506 cfg_storage = cfg.GetFileStorageDir() 507 elif disk_type == constants.DT_SHARED_FILE: 508 cfg_storage = cfg.GetSharedFileStorageDir() 509 elif disk_type == constants.DT_GLUSTER: 510 cfg_storage = cfg.GetGlusterStorageDir() 511 512 if not cfg_storage: 513 raise errors.OpPrereqError( 514 "Cluster file storage dir for {tpl} storage type not defined".format( 515 tpl=repr(disk_type) 516 ), 517 errors.ECODE_STATE) 518 519 joinargs.append(cfg_storage) 520 521 if file_storage_dir is not None: 522 joinargs.append(file_storage_dir) 523 524 if disk_type != constants.DT_GLUSTER: 525 joinargs.append(instance_name) 526 527 if len(joinargs) > 1: 528 # pylint: disable=W0142 529 instance_file_storage_dir = utils.PathJoin(*joinargs) 530 else: 531 instance_file_storage_dir = joinargs[0] 532 533 return instance_file_storage_dir
534
535 536 -def CheckRADOSFreeSpace():
537 """Compute disk size requirements inside the RADOS cluster. 538 539 """ 540 # For the RADOS cluster we assume there is always enough space. 541 pass
542
543 544 -def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names, 545 iv_name, forthcoming=False):
546 """Generate a drbd8 device complete with its children. 547 548 """ 549 assert len(vgnames) == len(names) == 2 550 port = lu.cfg.AllocatePort() 551 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) 552 553 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size, 554 logical_id=(vgnames[0], names[0]), 555 nodes=[primary_uuid, secondary_uuid], 556 params={}, forthcoming=forthcoming) 557 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 558 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN, 559 size=constants.DRBD_META_SIZE, 560 logical_id=(vgnames[1], names[1]), 561 nodes=[primary_uuid, secondary_uuid], 562 params={}, forthcoming=forthcoming) 563 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 564 565 drbd_uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 566 minors = lu.cfg.AllocateDRBDMinor([primary_uuid, secondary_uuid], drbd_uuid) 567 assert len(minors) == 2 568 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size, 569 logical_id=(primary_uuid, secondary_uuid, port, 570 minors[0], minors[1], 571 shared_secret), 572 children=[dev_data, dev_meta], 573 nodes=[primary_uuid, secondary_uuid], 574 iv_name=iv_name, params={}, 575 forthcoming=forthcoming) 576 drbd_dev.uuid = drbd_uuid 577 return drbd_dev
578
579 580 -def GenerateDiskTemplate( 581 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids, 582 disk_info, file_storage_dir, file_driver, base_index, 583 feedback_fn, full_disk_params, forthcoming=False):
584 """Generate the entire disk layout for a given template type. 585 586 """ 587 vgname = lu.cfg.GetVGName() 588 disk_count = len(disk_info) 589 disks = [] 590 591 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name) 592 593 if template_name == constants.DT_DISKLESS: 594 pass 595 elif template_name == constants.DT_DRBD8: 596 if len(secondary_node_uuids) != 1: 597 raise errors.ProgrammerError("Wrong template configuration") 598 remote_node_uuid = secondary_node_uuids[0] 599 600 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name, 601 full_disk_params) 602 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] 603 604 names = [] 605 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) 606 for i in range(disk_count)]): 607 names.append(lv_prefix + "_data") 608 names.append(lv_prefix + "_meta") 609 for idx, disk in enumerate(disk_info): 610 disk_index = idx + base_index 611 data_vg = disk.get(constants.IDISK_VG, vgname) 612 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg) 613 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid, 614 disk[constants.IDISK_SIZE], 615 [data_vg, meta_vg], 616 names[idx * 2:idx * 2 + 2], 617 "disk/%d" % disk_index, 618 forthcoming=forthcoming) 619 disk_dev.mode = disk[constants.IDISK_MODE] 620 disk_dev.name = disk.get(constants.IDISK_NAME, None) 621 disk_dev.dev_type = template_name 622 disks.append(disk_dev) 623 else: 624 if secondary_node_uuids: 625 raise errors.ProgrammerError("Wrong template configuration") 626 627 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None) 628 if name_prefix is None: 629 names = None 630 else: 631 names = _GenerateUniqueNames(lu, ["%s.disk%s" % 632 (name_prefix, base_index + i) 633 for i in range(disk_count)]) 634 disk_nodes = [] 635 636 if template_name == constants.DT_PLAIN: 637 638 def logical_id_fn(idx, _, disk): 639 vg = disk.get(constants.IDISK_VG, vgname) 640 return (vg, names[idx])
641 642 disk_nodes = [primary_node_uuid] 643 644 elif template_name == constants.DT_GLUSTER: 645 logical_id_fn = lambda _1, disk_index, _2: \ 646 (file_driver, "ganeti/%s.%d" % (instance_uuid, 647 disk_index)) 648 649 elif template_name in constants.DTS_FILEBASED: # Gluster handled above 650 logical_id_fn = \ 651 lambda _, disk_index, disk: (file_driver, 652 "%s/%s" % (file_storage_dir, 653 names[idx])) 654 if template_name == constants.DT_FILE: 655 disk_nodes = [primary_node_uuid] 656 657 elif template_name == constants.DT_BLOCK: 658 logical_id_fn = \ 659 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL, 660 disk[constants.IDISK_ADOPT]) 661 elif template_name == constants.DT_RBD: 662 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx]) 663 elif template_name == constants.DT_EXT: 664 def logical_id_fn(idx, _, disk): 665 provider = disk.get(constants.IDISK_PROVIDER, None) 666 if provider is None: 667 raise errors.ProgrammerError("Disk template is %s, but '%s' is" 668 " not found", constants.DT_EXT, 669 constants.IDISK_PROVIDER) 670 return (provider, names[idx]) 671 else: 672 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) 673 674 dev_type = template_name 675 676 for idx, disk in enumerate(disk_info): 677 params = {} 678 # Only for the Ext template add disk_info to params 679 if template_name == constants.DT_EXT: 680 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER] 681 for key in disk: 682 if key not in constants.IDISK_PARAMS: 683 params[key] = disk[key] 684 # Add IDISK_ACCESS param to disk params 685 if (template_name in constants.DTS_HAVE_ACCESS and 686 constants.IDISK_ACCESS in disk): 687 params[constants.IDISK_ACCESS] = disk[constants.IDISK_ACCESS] 688 disk_index = idx + base_index 689 size = disk[constants.IDISK_SIZE] 690 feedback_fn("* disk %s, size %s" % 691 (disk_index, utils.FormatUnit(size, "h"))) 692 disk_dev = objects.Disk(dev_type=dev_type, size=size, 693 logical_id=logical_id_fn(idx, disk_index, disk), 694 iv_name="disk/%d" % disk_index, 695 mode=disk[constants.IDISK_MODE], 696 params=params, nodes=disk_nodes, 697 spindles=disk.get(constants.IDISK_SPINDLES), 698 forthcoming=forthcoming) 699 disk_dev.name = disk.get(constants.IDISK_NAME, None) 700 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 701 disks.append(disk_dev) 702 703 return disks 704
705 706 -def CommitDisks(disks):
707 """Recursively remove the forthcoming flag 708 709 """ 710 for disk in disks: 711 disk.forthcoming = False 712 CommitDisks(disk.children)
713
714 715 -def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
716 """Check the presence of the spindle options with exclusive_storage. 717 718 @type diskdict: dict 719 @param diskdict: disk parameters 720 @type es_flag: bool 721 @param es_flag: the effective value of the exlusive_storage flag 722 @type required: bool 723 @param required: whether spindles are required or just optional 724 @raise errors.OpPrereqError when spindles are given and they should not 725 726 """ 727 if (not es_flag and constants.IDISK_SPINDLES in diskdict and 728 diskdict[constants.IDISK_SPINDLES] is not None): 729 raise errors.OpPrereqError("Spindles in instance disks cannot be specified" 730 " when exclusive storage is not active", 731 errors.ECODE_INVAL) 732 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or 733 diskdict[constants.IDISK_SPINDLES] is None)): 734 raise errors.OpPrereqError("You must specify spindles in instance disks" 735 " when exclusive storage is active", 736 errors.ECODE_INVAL)
737
738 739 -def CheckDiskExtProvider(diskdict, disk_template):
740 """Check that the given disk should or should not have the provider param. 741 742 @type diskdict: dict 743 @param diskdict: disk parameters 744 @type disk_template: string 745 @param disk_template: the desired template of this disk 746 @raise errors.OpPrereqError: when the parameter is used in the wrong way 747 748 """ 749 ext_provider = diskdict.get(constants.IDISK_PROVIDER, None) 750 751 if ext_provider and disk_template != constants.DT_EXT: 752 raise errors.OpPrereqError("The '%s' option is only valid for the %s" 753 " disk template, not %s" % 754 (constants.IDISK_PROVIDER, constants.DT_EXT, 755 disk_template), errors.ECODE_INVAL) 756 757 if ext_provider is None and disk_template == constants.DT_EXT: 758 raise errors.OpPrereqError("Missing provider for template '%s'" % 759 constants.DT_EXT, errors.ECODE_INVAL)
760
761 762 -class LUInstanceRecreateDisks(LogicalUnit):
763 """Recreate an instance's missing disks. 764 765 """ 766 HPATH = "instance-recreate-disks" 767 HTYPE = constants.HTYPE_INSTANCE 768 REQ_BGL = False 769 770 _MODIFYABLE = compat.UniqueFrozenset([ 771 constants.IDISK_SIZE, 772 constants.IDISK_MODE, 773 constants.IDISK_SPINDLES, 774 ]) 775 776 # New or changed disk parameters may have different semantics 777 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([ 778 constants.IDISK_ADOPT, 779 780 # TODO: Implement support changing VG while recreating 781 constants.IDISK_VG, 782 constants.IDISK_METAVG, 783 constants.IDISK_PROVIDER, 784 constants.IDISK_NAME, 785 constants.IDISK_ACCESS, 786 constants.IDISK_TYPE, 787 ])) 788
789 - def _RunAllocator(self):
790 """Run the allocator based on input opcode. 791 792 """ 793 be_full = self.cfg.GetClusterInfo().FillBE(self.instance) 794 795 # FIXME 796 # The allocator should actually run in "relocate" mode, but current 797 # allocators don't support relocating all the nodes of an instance at 798 # the same time. As a workaround we use "allocate" mode, but this is 799 # suboptimal for two reasons: 800 # - The instance name passed to the allocator is present in the list of 801 # existing instances, so there could be a conflict within the 802 # internal structures of the allocator. This doesn't happen with the 803 # current allocators, but it's a liability. 804 # - The allocator counts the resources used by the instance twice: once 805 # because the instance exists already, and once because it tries to 806 # allocate a new instance. 807 # The allocator could choose some of the nodes on which the instance is 808 # running, but that's not a problem. If the instance nodes are broken, 809 # they should be already be marked as drained or offline, and hence 810 # skipped by the allocator. If instance disks have been lost for other 811 # reasons, then recreating the disks on the same nodes should be fine. 812 spindle_use = be_full[constants.BE_SPINDLE_USE] 813 disk_template = self.cfg.GetInstanceDiskTemplate(self.instance.uuid) 814 disks = [{ 815 constants.IDISK_SIZE: d.size, 816 constants.IDISK_MODE: d.mode, 817 constants.IDISK_SPINDLES: d.spindles, 818 constants.IDISK_TYPE: d.dev_type 819 } for d in self.cfg.GetInstanceDisks(self.instance.uuid)] 820 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name, 821 disk_template=disk_template, 822 group_name=None, 823 tags=list(self.instance.GetTags()), 824 os=self.instance.os, 825 nics=[{}], 826 vcpus=be_full[constants.BE_VCPUS], 827 memory=be_full[constants.BE_MAXMEM], 828 spindle_use=spindle_use, 829 disks=disks, 830 hypervisor=self.instance.hypervisor, 831 node_whitelist=None) 832 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 833 834 ial.Run(self.op.iallocator) 835 836 assert req.RequiredNodes() == \ 837 len(self.cfg.GetInstanceNodes(self.instance.uuid)) 838 839 if not ial.success: 840 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 841 " %s" % (self.op.iallocator, ial.info), 842 errors.ECODE_NORES) 843 844 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result) 845 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 846 self.op.instance_name, self.op.iallocator, 847 utils.CommaJoin(self.op.nodes))
848
849 - def CheckArguments(self):
850 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]): 851 # Normalize and convert deprecated list of disk indices 852 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] 853 854 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks)) 855 if duplicates: 856 raise errors.OpPrereqError("Some disks have been specified more than" 857 " once: %s" % utils.CommaJoin(duplicates), 858 errors.ECODE_INVAL) 859 860 # We don't want _CheckIAllocatorOrNode selecting the default iallocator 861 # when neither iallocator nor nodes are specified 862 if self.op.iallocator or self.op.nodes: 863 CheckIAllocatorOrNode(self, "iallocator", "nodes") 864 865 for (idx, params) in self.op.disks: 866 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) 867 unsupported = frozenset(params.keys()) - self._MODIFYABLE 868 if unsupported: 869 raise errors.OpPrereqError("Parameters for disk %s try to change" 870 " unmodifyable parameter(s): %s" % 871 (idx, utils.CommaJoin(unsupported)), 872 errors.ECODE_INVAL)
873
874 - def ExpandNames(self):
875 self._ExpandAndLockInstance() 876 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 877 878 if self.op.nodes: 879 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes) 880 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids) 881 else: 882 self.needed_locks[locking.LEVEL_NODE] = [] 883 if self.op.iallocator: 884 # iallocator will select a new node in the same group 885 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 886 887 self.needed_locks[locking.LEVEL_NODE_RES] = [] 888 889 self.dont_collate_locks[locking.LEVEL_NODEGROUP] = True 890 self.dont_collate_locks[locking.LEVEL_NODE] = True 891 self.dont_collate_locks[locking.LEVEL_NODE_RES] = True
892
893 - def DeclareLocks(self, level):
894 if level == locking.LEVEL_NODEGROUP: 895 assert self.op.iallocator is not None 896 assert not self.op.nodes 897 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 898 self.share_locks[locking.LEVEL_NODEGROUP] = 1 899 # Lock the primary group used by the instance optimistically; this 900 # requires going via the node before it's locked, requiring 901 # verification later on 902 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 903 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True) 904 905 elif level == locking.LEVEL_NODE: 906 # If an allocator is used, then we lock all the nodes in the current 907 # instance group, as we don't know yet which ones will be selected; 908 # if we replace the nodes without using an allocator, locks are 909 # already declared in ExpandNames; otherwise, we need to lock all the 910 # instance nodes for disk re-creation 911 if self.op.iallocator: 912 assert not self.op.nodes 913 assert not self.needed_locks[locking.LEVEL_NODE] 914 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1 915 916 # Lock member nodes of the group of the primary node 917 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP): 918 self.needed_locks[locking.LEVEL_NODE].extend( 919 self.cfg.GetNodeGroup(group_uuid).members) 920 921 elif not self.op.nodes: 922 self._LockInstancesNodes(primary_only=False) 923 elif level == locking.LEVEL_NODE_RES: 924 # Copy node locks 925 self.needed_locks[locking.LEVEL_NODE_RES] = \ 926 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
927
928 - def BuildHooksEnv(self):
929 """Build hooks env. 930 931 This runs on master, primary and secondary nodes of the instance. 932 933 """ 934 return BuildInstanceHookEnvByObject(self, self.instance)
935
936 - def BuildHooksNodes(self):
937 """Build hooks nodes. 938 939 """ 940 nl = [self.cfg.GetMasterNode()] + \ 941 list(self.cfg.GetInstanceNodes(self.instance.uuid)) 942 return (nl, nl)
943
944 - def CheckPrereq(self):
945 """Check prerequisites. 946 947 This checks that the instance is in the cluster and is not running. 948 949 """ 950 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 951 assert instance is not None, \ 952 "Cannot retrieve locked instance %s" % self.op.instance_name 953 if self.op.node_uuids: 954 inst_nodes = self.cfg.GetInstanceNodes(instance.uuid) 955 if len(self.op.node_uuids) != len(inst_nodes): 956 raise errors.OpPrereqError("Instance %s currently has %d nodes, but" 957 " %d replacement nodes were specified" % 958 (instance.name, len(inst_nodes), 959 len(self.op.node_uuids)), 960 errors.ECODE_INVAL) 961 disks = self.cfg.GetInstanceDisks(instance.uuid) 962 assert (not utils.AnyDiskOfType(disks, [constants.DT_DRBD8]) or 963 len(self.op.node_uuids) == 2) 964 assert (not utils.AnyDiskOfType(disks, [constants.DT_PLAIN]) or 965 len(self.op.node_uuids) == 1) 966 primary_node = self.op.node_uuids[0] 967 else: 968 primary_node = instance.primary_node 969 if not self.op.iallocator: 970 CheckNodeOnline(self, primary_node) 971 972 if not instance.disks: 973 raise errors.OpPrereqError("Instance '%s' has no disks" % 974 self.op.instance_name, errors.ECODE_INVAL) 975 976 # Verify if node group locks are still correct 977 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 978 if owned_groups: 979 # Node group locks are acquired only for the primary node (and only 980 # when the allocator is used) 981 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups, 982 primary_only=True) 983 984 # if we replace nodes *and* the old primary is offline, we don't 985 # check the instance state 986 old_pnode = self.cfg.GetNodeInfo(instance.primary_node) 987 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline): 988 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, 989 msg="cannot recreate disks") 990 991 if self.op.disks: 992 self.disks = dict(self.op.disks) 993 else: 994 self.disks = dict((idx, {}) for idx in range(len(instance.disks))) 995 996 maxidx = max(self.disks.keys()) 997 if maxidx >= len(instance.disks): 998 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, 999 errors.ECODE_INVAL) 1000 1001 if ((self.op.node_uuids or self.op.iallocator) and 1002 sorted(self.disks.keys()) != range(len(instance.disks))): 1003 raise errors.OpPrereqError("Can't recreate disks partially and" 1004 " change the nodes at the same time", 1005 errors.ECODE_INVAL) 1006 1007 self.instance = instance 1008 1009 if self.op.iallocator: 1010 self._RunAllocator() 1011 # Release unneeded node and node resource locks 1012 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids) 1013 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids) 1014 1015 if self.op.node_uuids: 1016 node_uuids = self.op.node_uuids 1017 else: 1018 node_uuids = self.cfg.GetInstanceNodes(instance.uuid) 1019 excl_stor = compat.any( 1020 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values() 1021 ) 1022 for new_params in self.disks.values(): 1023 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
1024
1025 - def Exec(self, feedback_fn):
1026 """Recreate the disks. 1027 1028 """ 1029 assert (self.owned_locks(locking.LEVEL_NODE) == 1030 self.owned_locks(locking.LEVEL_NODE_RES)) 1031 1032 to_skip = [] 1033 mods = [] # keeps track of needed changes 1034 1035 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 1036 for idx, disk in enumerate(inst_disks): 1037 try: 1038 changes = self.disks[idx] 1039 except KeyError: 1040 # Disk should not be recreated 1041 to_skip.append(idx) 1042 continue 1043 1044 # update secondaries for disks, if needed 1045 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8: 1046 # need to update the nodes and minors 1047 assert len(self.op.node_uuids) == 2 1048 assert len(disk.logical_id) == 6 # otherwise disk internals 1049 # have changed 1050 (_, _, old_port, _, _, old_secret) = disk.logical_id 1051 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids, 1052 disk.uuid) 1053 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port, 1054 new_minors[0], new_minors[1], old_secret) 1055 assert len(disk.logical_id) == len(new_id) 1056 else: 1057 new_id = None 1058 1059 mods.append((idx, new_id, changes)) 1060 1061 # now that we have passed all asserts above, we can apply the mods 1062 # in a single run (to avoid partial changes) 1063 for idx, new_id, changes in mods: 1064 disk = inst_disks[idx] 1065 if new_id is not None: 1066 assert disk.dev_type == constants.DT_DRBD8 1067 disk.logical_id = new_id 1068 if changes: 1069 disk.Update(size=changes.get(constants.IDISK_SIZE, None), 1070 mode=changes.get(constants.IDISK_MODE, None), 1071 spindles=changes.get(constants.IDISK_SPINDLES, None)) 1072 self.cfg.Update(disk, feedback_fn) 1073 1074 # change primary node, if needed 1075 if self.op.node_uuids: 1076 self.LogWarning("Changing the instance's nodes, you will have to" 1077 " remove any disks left on the older nodes manually") 1078 self.instance.primary_node = self.op.node_uuids[0] 1079 self.cfg.Update(self.instance, feedback_fn) 1080 for disk in inst_disks: 1081 self.cfg.SetDiskNodes(disk.uuid, self.op.node_uuids) 1082 1083 # All touched nodes must be locked 1084 mylocks = self.owned_locks(locking.LEVEL_NODE) 1085 inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid) 1086 assert mylocks.issuperset(frozenset(inst_nodes)) 1087 new_disks = CreateDisks(self, self.instance, to_skip=to_skip) 1088 1089 # TODO: Release node locks before wiping, or explain why it's not possible 1090 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 1091 if self.cfg.GetClusterInfo().prealloc_wipe_disks: 1092 wipedisks = [(idx, disk, 0) 1093 for (idx, disk) in enumerate(inst_disks) 1094 if idx not in to_skip] 1095 WipeOrCleanupDisks(self, self.instance, disks=wipedisks, 1096 cleanup=new_disks)
1097
1098 1099 -def _PerformNodeInfoCall(lu, node_uuids, vg):
1100 """Prepares the input and performs a node info call. 1101 1102 @type lu: C{LogicalUnit} 1103 @param lu: a logical unit from which we get configuration data 1104 @type node_uuids: list of string 1105 @param node_uuids: list of node UUIDs to perform the call for 1106 @type vg: string 1107 @param vg: the volume group's name 1108 1109 """ 1110 lvm_storage_units = [(constants.ST_LVM_VG, vg)] 1111 storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units, 1112 node_uuids) 1113 hvname = lu.cfg.GetHypervisorType() 1114 hvparams = lu.cfg.GetClusterInfo().hvparams 1115 nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units, 1116 [(hvname, hvparams[hvname])]) 1117 return nodeinfo
1118
1119 1120 -def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
1121 """Checks the vg capacity for a given node. 1122 1123 @type node_info: tuple (_, list of dicts, _) 1124 @param node_info: the result of the node info call for one node 1125 @type node_name: string 1126 @param node_name: the name of the node 1127 @type vg: string 1128 @param vg: volume group name 1129 @type requested: int 1130 @param requested: the amount of disk in MiB to check for 1131 @raise errors.OpPrereqError: if the node doesn't have enough disk, 1132 or we cannot check the node 1133 1134 """ 1135 (_, space_info, _) = node_info 1136 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType( 1137 space_info, constants.ST_LVM_VG) 1138 if not lvm_vg_info: 1139 raise errors.OpPrereqError("Can't retrieve storage information for LVM", 1140 errors.ECODE_ENVIRON) 1141 vg_free = lvm_vg_info.get("storage_free", None) 1142 if not isinstance(vg_free, int): 1143 raise errors.OpPrereqError("Can't compute free disk space on node" 1144 " %s for vg %s, result was '%s'" % 1145 (node_name, vg, vg_free), errors.ECODE_ENVIRON) 1146 if requested > vg_free: 1147 raise errors.OpPrereqError("Not enough disk space on target node %s" 1148 " vg %s: required %d MiB, available %d MiB" % 1149 (node_name, vg, requested, vg_free), 1150 errors.ECODE_NORES)
1151
1152 1153 -def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
1154 """Checks if nodes have enough free disk space in the specified VG. 1155 1156 This function checks if all given nodes have the needed amount of 1157 free disk. In case any node has less disk or we cannot get the 1158 information from the node, this function raises an OpPrereqError 1159 exception. 1160 1161 @type lu: C{LogicalUnit} 1162 @param lu: a logical unit from which we get configuration data 1163 @type node_uuids: C{list} 1164 @param node_uuids: the list of node UUIDs to check 1165 @type vg: C{str} 1166 @param vg: the volume group to check 1167 @type requested: C{int} 1168 @param requested: the amount of disk in MiB to check for 1169 @raise errors.OpPrereqError: if the node doesn't have enough disk, 1170 or we cannot check the node 1171 1172 """ 1173 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg) 1174 for node_uuid in node_uuids: 1175 node_name = lu.cfg.GetNodeName(node_uuid) 1176 info = nodeinfo[node_uuid] 1177 info.Raise("Cannot get current information from node %s" % node_name, 1178 prereq=True, ecode=errors.ECODE_ENVIRON) 1179 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
1180
1181 1182 -def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
1183 """Checks if nodes have enough free disk space in all the VGs. 1184 1185 This function checks if all given nodes have the needed amount of 1186 free disk. In case any node has less disk or we cannot get the 1187 information from the node, this function raises an OpPrereqError 1188 exception. 1189 1190 @type lu: C{LogicalUnit} 1191 @param lu: a logical unit from which we get configuration data 1192 @type node_uuids: C{list} 1193 @param node_uuids: the list of node UUIDs to check 1194 @type req_sizes: C{dict} 1195 @param req_sizes: the hash of vg and corresponding amount of disk in 1196 MiB to check for 1197 @raise errors.OpPrereqError: if the node doesn't have enough disk, 1198 or we cannot check the node 1199 1200 """ 1201 for vg, req_size in req_sizes.items(): 1202 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
1203
1204 1205 -def _DiskSizeInBytesToMebibytes(lu, size):
1206 """Converts a disk size in bytes to mebibytes. 1207 1208 Warns and rounds up if the size isn't an even multiple of 1 MiB. 1209 1210 """ 1211 (mib, remainder) = divmod(size, 1024 * 1024) 1212 1213 if remainder != 0: 1214 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up" 1215 " to not overwrite existing data (%s bytes will not be" 1216 " wiped)", (1024 * 1024) - remainder) 1217 mib += 1 1218 1219 return mib
1220
1221 1222 -def _CalcEta(time_taken, written, total_size):
1223 """Calculates the ETA based on size written and total size. 1224 1225 @param time_taken: The time taken so far 1226 @param written: amount written so far 1227 @param total_size: The total size of data to be written 1228 @return: The remaining time in seconds 1229 1230 """ 1231 avg_time = time_taken / float(written) 1232 return (total_size - written) * avg_time
1233
1234 1235 -def WipeDisks(lu, instance, disks=None):
1236 """Wipes instance disks. 1237 1238 @type lu: L{LogicalUnit} 1239 @param lu: the logical unit on whose behalf we execute 1240 @type instance: L{objects.Instance} 1241 @param instance: the instance whose disks we should create 1242 @type disks: None or list of tuple of (number, L{objects.Disk}, number) 1243 @param disks: Disk details; tuple contains disk index, disk object and the 1244 start offset 1245 1246 """ 1247 node_uuid = instance.primary_node 1248 node_name = lu.cfg.GetNodeName(node_uuid) 1249 1250 if disks is None: 1251 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1252 disks = [(idx, disk, 0) 1253 for (idx, disk) in enumerate(inst_disks)] 1254 1255 logging.info("Pausing synchronization of disks of instance '%s'", 1256 instance.name) 1257 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1258 (map(compat.snd, disks), 1259 instance), 1260 True) 1261 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name) 1262 1263 for idx, success in enumerate(result.payload): 1264 if not success: 1265 logging.warn("Pausing synchronization of disk %s of instance '%s'" 1266 " failed", idx, instance.name) 1267 1268 try: 1269 for (idx, device, offset) in disks: 1270 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but 1271 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors. 1272 wipe_chunk_size = \ 1273 int(min(constants.MAX_WIPE_CHUNK, 1274 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT)) 1275 1276 size = device.size 1277 last_output = 0 1278 start_time = time.time() 1279 1280 if offset == 0: 1281 info_text = "" 1282 else: 1283 info_text = (" (from %s to %s)" % 1284 (utils.FormatUnit(offset, "h"), 1285 utils.FormatUnit(size, "h"))) 1286 1287 lu.LogInfo("* Wiping disk %s%s", idx, info_text) 1288 1289 logging.info("Wiping disk %d for instance %s on node %s using" 1290 " chunk size %s", idx, instance.name, node_name, 1291 wipe_chunk_size) 1292 1293 while offset < size: 1294 wipe_size = min(wipe_chunk_size, size - offset) 1295 1296 logging.debug("Wiping disk %d, offset %s, chunk %s", 1297 idx, offset, wipe_size) 1298 1299 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance), 1300 offset, wipe_size) 1301 result.Raise("Could not wipe disk %d at offset %d for size %d" % 1302 (idx, offset, wipe_size)) 1303 1304 now = time.time() 1305 offset += wipe_size 1306 if now - last_output >= 60: 1307 eta = _CalcEta(now - start_time, offset, size) 1308 lu.LogInfo(" - done: %.1f%% ETA: %s", 1309 offset / float(size) * 100, utils.FormatSeconds(eta)) 1310 last_output = now 1311 finally: 1312 logging.info("Resuming synchronization of disks for instance '%s'", 1313 instance.name) 1314 1315 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1316 (map(compat.snd, disks), 1317 instance), 1318 False) 1319 1320 if result.fail_msg: 1321 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s", 1322 node_name, result.fail_msg) 1323 else: 1324 for idx, success in enumerate(result.payload): 1325 if not success: 1326 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'" 1327 " failed", idx, instance.name)
1328
1329 1330 -def ImageDisks(lu, instance, image, disks=None):
1331 """Dumps an image onto an instance disk. 1332 1333 @type lu: L{LogicalUnit} 1334 @param lu: the logical unit on whose behalf we execute 1335 @type instance: L{objects.Instance} 1336 @param instance: the instance whose disks we should create 1337 @type image: string 1338 @param image: the image whose disks we should create 1339 @type disks: None or list of ints 1340 @param disks: disk indices 1341 1342 """ 1343 node_uuid = instance.primary_node 1344 node_name = lu.cfg.GetNodeName(node_uuid) 1345 1346 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1347 if disks is None: 1348 disks = [(0, inst_disks[0])] 1349 else: 1350 disks = map(lambda idx: (idx, inst_disks[idx]), disks) 1351 1352 logging.info("Pausing synchronization of disks of instance '%s'", 1353 instance.name) 1354 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1355 (map(compat.snd, disks), 1356 instance), 1357 True) 1358 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name) 1359 1360 for idx, success in enumerate(result.payload): 1361 if not success: 1362 logging.warn("Pausing synchronization of disk %s of instance '%s'" 1363 " failed", idx, instance.name) 1364 1365 try: 1366 for (idx, device) in disks: 1367 lu.LogInfo("Imaging disk '%d' for instance '%s' on node '%s'", 1368 idx, instance.name, node_name) 1369 1370 result = lu.rpc.call_blockdev_image(node_uuid, (device, instance), 1371 image, device.size) 1372 result.Raise("Could not image disk '%d' for instance '%s' on node '%s'" % 1373 (idx, instance.name, node_name)) 1374 finally: 1375 logging.info("Resuming synchronization of disks for instance '%s'", 1376 instance.name) 1377 1378 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid, 1379 (map(compat.snd, disks), 1380 instance), 1381 False) 1382 1383 if result.fail_msg: 1384 lu.LogWarning("Failed to resume disk synchronization for instance '%s' on" 1385 " node '%s'", node_name, result.fail_msg) 1386 else: 1387 for idx, success in enumerate(result.payload): 1388 if not success: 1389 lu.LogWarning("Failed to resume synchronization of disk '%d' of" 1390 " instance '%s'", idx, instance.name)
1391
1392 1393 -def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1394 """Wrapper for L{WipeDisks} that handles errors. 1395 1396 @type lu: L{LogicalUnit} 1397 @param lu: the logical unit on whose behalf we execute 1398 @type instance: L{objects.Instance} 1399 @param instance: the instance whose disks we should wipe 1400 @param disks: see L{WipeDisks} 1401 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in 1402 case of error 1403 @raise errors.OpPrereqError: in case of failure 1404 1405 """ 1406 try: 1407 WipeDisks(lu, instance, disks=disks) 1408 except errors.OpExecError: 1409 logging.warning("Wiping disks for instance '%s' failed", 1410 instance.name) 1411 _UndoCreateDisks(lu, cleanup, instance) 1412 raise
1413
1414 1415 -def ExpandCheckDisks(instance_disks, disks):
1416 """Return the instance disks selected by the disks list 1417 1418 @type disks: list of L{objects.Disk} or None 1419 @param disks: selected disks 1420 @rtype: list of L{objects.Disk} 1421 @return: selected instance disks to act on 1422 1423 """ 1424 if disks is None: 1425 return instance_disks 1426 else: 1427 inst_disks_uuids = [d.uuid for d in instance_disks] 1428 disks_uuids = [d.uuid for d in disks] 1429 if not set(disks_uuids).issubset(inst_disks_uuids): 1430 raise errors.ProgrammerError("Can only act on disks belonging to the" 1431 " target instance: expected a subset of %s," 1432 " got %s" % (inst_disks_uuids, disks_uuids)) 1433 return disks
1434
1435 1436 -def WaitForSync(lu, instance, disks=None, oneshot=False):
1437 """Sleep and poll for an instance's disk to sync. 1438 1439 """ 1440 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1441 if not inst_disks or disks is not None and not disks: 1442 return True 1443 1444 disks = [d for d in ExpandCheckDisks(inst_disks, disks) 1445 if d.dev_type in constants.DTS_INT_MIRROR] 1446 1447 if not oneshot: 1448 lu.LogInfo("Waiting for instance %s to sync disks", instance.name) 1449 1450 node_uuid = instance.primary_node 1451 node_name = lu.cfg.GetNodeName(node_uuid) 1452 1453 # TODO: Convert to utils.Retry 1454 1455 retries = 0 1456 degr_retries = 10 # in seconds, as we sleep 1 second each time 1457 while True: 1458 max_time = 0 1459 done = True 1460 cumul_degraded = False 1461 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance)) 1462 msg = rstats.fail_msg 1463 if msg: 1464 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg) 1465 retries += 1 1466 if retries >= 10: 1467 raise errors.RemoteError("Can't contact node %s for mirror data," 1468 " aborting." % node_name) 1469 time.sleep(6) 1470 continue 1471 rstats = rstats.payload 1472 retries = 0 1473 for i, mstat in enumerate(rstats): 1474 if mstat is None: 1475 lu.LogWarning("Can't compute data for node %s/%s", 1476 node_name, disks[i].iv_name) 1477 continue 1478 1479 cumul_degraded = (cumul_degraded or 1480 (mstat.is_degraded and mstat.sync_percent is None)) 1481 if mstat.sync_percent is not None: 1482 done = False 1483 if mstat.estimated_time is not None: 1484 rem_time = ("%s remaining (estimated)" % 1485 utils.FormatSeconds(mstat.estimated_time)) 1486 max_time = mstat.estimated_time 1487 else: 1488 rem_time = "no time estimate" 1489 max_time = 5 # sleep at least a bit between retries 1490 lu.LogInfo("- device %s: %5.2f%% done, %s", 1491 disks[i].iv_name, mstat.sync_percent, rem_time) 1492 1493 # if we're done but degraded, let's do a few small retries, to 1494 # make sure we see a stable and not transient situation; therefore 1495 # we force restart of the loop 1496 if (done or oneshot) and cumul_degraded and degr_retries > 0: 1497 logging.info("Degraded disks found, %d retries left", degr_retries) 1498 degr_retries -= 1 1499 time.sleep(1) 1500 continue 1501 1502 if done or oneshot: 1503 break 1504 1505 time.sleep(min(60, max_time)) 1506 1507 if done: 1508 lu.LogInfo("Instance %s's disks are in sync", instance.name) 1509 1510 return not cumul_degraded
1511
1512 1513 -def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1514 """Shutdown block devices of an instance. 1515 1516 This does the shutdown on all nodes of the instance. 1517 1518 If the ignore_primary is false, errors on the primary node are 1519 ignored. 1520 1521 Modifies the configuration of the instance, so the caller should re-read the 1522 instance configuration, if needed. 1523 1524 """ 1525 all_result = True 1526 1527 if disks is None: 1528 # only mark instance disks as inactive if all disks are affected 1529 lu.cfg.MarkInstanceDisksInactive(instance.uuid) 1530 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1531 disks = ExpandCheckDisks(inst_disks, disks) 1532 1533 for disk in disks: 1534 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node): 1535 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance)) 1536 msg = result.fail_msg 1537 if msg: 1538 lu.LogWarning("Could not shutdown block device %s on node %s: %s", 1539 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1540 if ((node_uuid == instance.primary_node and not ignore_primary) or 1541 (node_uuid != instance.primary_node and not result.offline)): 1542 all_result = False 1543 return all_result
1544
1545 1546 -def _SafeShutdownInstanceDisks(lu, instance, disks=None, req_states=None):
1547 """Shutdown block devices of an instance. 1548 1549 This function checks if an instance is running, before calling 1550 _ShutdownInstanceDisks. 1551 1552 """ 1553 if req_states is None: 1554 req_states = INSTANCE_DOWN 1555 CheckInstanceState(lu, instance, req_states, msg="cannot shutdown disks") 1556 ShutdownInstanceDisks(lu, instance, disks=disks)
1557
1558 1559 -def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, 1560 ignore_size=False):
1561 """Prepare the block devices for an instance. 1562 1563 This sets up the block devices on all nodes. 1564 1565 Modifies the configuration of the instance, so the caller should re-read the 1566 instance configuration, if needed. 1567 1568 @type lu: L{LogicalUnit} 1569 @param lu: the logical unit on whose behalf we execute 1570 @type instance: L{objects.Instance} 1571 @param instance: the instance for whose disks we assemble 1572 @type disks: list of L{objects.Disk} or None 1573 @param disks: which disks to assemble (or all, if None) 1574 @type ignore_secondaries: boolean 1575 @param ignore_secondaries: if true, errors on secondary nodes 1576 won't result in an error return from the function 1577 @type ignore_size: boolean 1578 @param ignore_size: if true, the current known size of the disk 1579 will not be used during the disk activation, useful for cases 1580 when the size is wrong 1581 @return: False if the operation failed, otherwise a list of 1582 (host, instance_visible_name, node_visible_name) 1583 with the mapping from node devices to instance devices, as well as the 1584 payloads of the RPC calls 1585 1586 """ 1587 device_info = [] 1588 disks_ok = True 1589 payloads = [] 1590 1591 if disks is None: 1592 # only mark instance disks as active if all disks are affected 1593 instance = lu.cfg.MarkInstanceDisksActive(instance.uuid) 1594 1595 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid) 1596 disks = ExpandCheckDisks(inst_disks, disks) 1597 1598 # With the two passes mechanism we try to reduce the window of 1599 # opportunity for the race condition of switching DRBD to primary 1600 # before handshaking occured, but we do not eliminate it 1601 1602 # The proper fix would be to wait (with some limits) until the 1603 # connection has been made and drbd transitions from WFConnection 1604 # into any other network-connected state (Connected, SyncTarget, 1605 # SyncSource, etc.) 1606 1607 # 1st pass, assemble on all nodes in secondary mode 1608 for idx, inst_disk in enumerate(disks): 1609 for node_uuid, node_disk in inst_disk.ComputeNodeTree( 1610 instance.primary_node): 1611 if ignore_size: 1612 node_disk = node_disk.Copy() 1613 node_disk.UnsetSize() 1614 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance), 1615 instance, False, idx) 1616 msg = result.fail_msg 1617 if msg: 1618 secondary_nodes = lu.cfg.GetInstanceSecondaryNodes(instance.uuid) 1619 is_offline_secondary = (node_uuid in secondary_nodes and 1620 result.offline) 1621 lu.LogWarning("Could not prepare block device %s on node %s" 1622 " (is_primary=False, pass=1): %s", 1623 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1624 if not (ignore_secondaries or is_offline_secondary): 1625 disks_ok = False 1626 1627 # FIXME: race condition on drbd migration to primary 1628 1629 # 2nd pass, do only the primary node 1630 for idx, inst_disk in enumerate(disks): 1631 dev_path = None 1632 1633 for node_uuid, node_disk in inst_disk.ComputeNodeTree( 1634 instance.primary_node): 1635 if node_uuid != instance.primary_node: 1636 continue 1637 if ignore_size: 1638 node_disk = node_disk.Copy() 1639 node_disk.UnsetSize() 1640 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance), 1641 instance, True, idx) 1642 payloads.append(result.payload) 1643 msg = result.fail_msg 1644 if msg: 1645 lu.LogWarning("Could not prepare block device %s on node %s" 1646 " (is_primary=True, pass=2): %s", 1647 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg) 1648 disks_ok = False 1649 else: 1650 dev_path, _, __ = result.payload 1651 1652 device_info.append((lu.cfg.GetNodeName(instance.primary_node), 1653 inst_disk.iv_name, dev_path)) 1654 1655 if not disks_ok: 1656 lu.cfg.MarkInstanceDisksInactive(instance.uuid) 1657 1658 return disks_ok, device_info, payloads
1659
1660 1661 -def StartInstanceDisks(lu, instance, force):
1662 """Start the disks of an instance. 1663 1664 Modifies the configuration of the instance, so the caller should re-read the 1665 instance configuration, if needed. 1666 1667 """ 1668 disks_ok, _, _ = AssembleInstanceDisks(lu, instance, 1669 ignore_secondaries=force) 1670 if not disks_ok: 1671 ShutdownInstanceDisks(lu, instance) 1672 if force is not None and not force: 1673 lu.LogWarning("", 1674 hint=("If the message above refers to a secondary node," 1675 " you can retry the operation using '--force'")) 1676 raise errors.OpExecError("Disk consistency error")
1677
1678 1679 -class LUInstanceGrowDisk(LogicalUnit):
1680 """Grow a disk of an instance. 1681 1682 """ 1683 HPATH = "disk-grow" 1684 HTYPE = constants.HTYPE_INSTANCE 1685 REQ_BGL = False 1686
1687 - def ExpandNames(self):
1688 self._ExpandAndLockInstance() 1689 self.needed_locks[locking.LEVEL_NODE] = [] 1690 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1691 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1692 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE 1693 self.dont_collate_locks[locking.LEVEL_NODE] = True 1694 self.dont_collate_locks[locking.LEVEL_NODE_RES] = True
1695
1696 - def DeclareLocks(self, level):
1697 if level == locking.LEVEL_NODE: 1698 self._LockInstancesNodes() 1699 elif level == locking.LEVEL_NODE_RES: 1700 # Copy node locks 1701 self.needed_locks[locking.LEVEL_NODE_RES] = \ 1702 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1703
1704 - def BuildHooksEnv(self):
1705 """Build hooks env. 1706 1707 This runs on the master, the primary and all the secondaries. 1708 1709 """ 1710 env = { 1711 "DISK": self.op.disk, 1712 "AMOUNT": self.op.amount, 1713 "ABSOLUTE": self.op.absolute, 1714 } 1715 env.update(BuildInstanceHookEnvByObject(self, self.instance)) 1716 return env
1717
1718 - def BuildHooksNodes(self):
1719 """Build hooks nodes. 1720 1721 """ 1722 nl = [self.cfg.GetMasterNode()] + \ 1723 list(self.cfg.GetInstanceNodes(self.instance.uuid)) 1724 return (nl, nl)
1725
1726 - def CheckPrereq(self):
1727 """Check prerequisites. 1728 1729 This checks that the instance is in the cluster. 1730 1731 """ 1732 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 1733 assert self.instance is not None, \ 1734 "Cannot retrieve locked instance %s" % self.op.instance_name 1735 node_uuids = list(self.cfg.GetInstanceNodes(self.instance.uuid)) 1736 for node_uuid in node_uuids: 1737 CheckNodeOnline(self, node_uuid) 1738 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids) 1739 1740 self.disk = self.cfg.GetDiskInfo(self.instance.FindDisk(self.op.disk)) 1741 1742 if self.disk.dev_type not in constants.DTS_GROWABLE: 1743 raise errors.OpPrereqError( 1744 "Instance's disk layout %s does not support" 1745 " growing" % self.disk.dev_type, errors.ECODE_INVAL) 1746 1747 if self.op.absolute: 1748 self.target = self.op.amount 1749 self.delta = self.target - self.disk.size 1750 if self.delta < 0: 1751 raise errors.OpPrereqError("Requested size (%s) is smaller than " 1752 "current disk size (%s)" % 1753 (utils.FormatUnit(self.target, "h"), 1754 utils.FormatUnit(self.disk.size, "h")), 1755 errors.ECODE_STATE) 1756 else: 1757 self.delta = self.op.amount 1758 self.target = self.disk.size + self.delta 1759 if self.delta < 0: 1760 raise errors.OpPrereqError("Requested increment (%s) is negative" % 1761 utils.FormatUnit(self.delta, "h"), 1762 errors.ECODE_INVAL) 1763 1764 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta)) 1765 1766 self._CheckIPolicy(self.target)
1767
1768 - def _CheckDiskSpace(self, node_uuids, req_vgspace):
1769 template = self.disk.dev_type 1770 if (template not in constants.DTS_NO_FREE_SPACE_CHECK and 1771 not any(self.node_es_flags.values())): 1772 # TODO: check the free disk space for file, when that feature will be 1773 # supported 1774 # With exclusive storage we need to do something smarter than just looking 1775 # at free space, which, in the end, is basically a dry run. So we rely on 1776 # the dry run performed in Exec() instead. 1777 CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1778
1779 - def _CheckIPolicy(self, target_size):
1780 cluster = self.cfg.GetClusterInfo() 1781 group_uuid = list(self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, 1782 primary_only=True))[0] 1783 group_info = self.cfg.GetNodeGroup(group_uuid) 1784 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 1785 group_info) 1786 1787 disks = self.cfg.GetInstanceDisks(self.op.instance_uuid) 1788 disk_sizes = [disk.size if disk.uuid != self.disk.uuid else target_size 1789 for disk in disks] 1790 1791 # The ipolicy checker below ignores None, so we only give it the disk size 1792 res = ComputeIPolicyDiskSizesViolation(ipolicy, disk_sizes, disks) 1793 if res: 1794 msg = ("Growing disk %s violates policy: %s" % 1795 (self.op.disk, 1796 utils.CommaJoin(res))) 1797 if self.op.ignore_ipolicy: 1798 self.LogWarning(msg) 1799 else: 1800 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1801
1802 - def Exec(self, feedback_fn):
1803 """Execute disk grow. 1804 1805 """ 1806 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) 1807 assert (self.owned_locks(locking.LEVEL_NODE) == 1808 self.owned_locks(locking.LEVEL_NODE_RES)) 1809 1810 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks 1811 1812 disks_ok, _, _ = AssembleInstanceDisks(self, self.instance, 1813 disks=[self.disk]) 1814 if not disks_ok: 1815 raise errors.OpExecError("Cannot activate block device to grow") 1816 1817 feedback_fn("Growing disk %s of instance '%s' by %s to %s" % 1818 (self.op.disk, self.instance.name, 1819 utils.FormatUnit(self.delta, "h"), 1820 utils.FormatUnit(self.target, "h"))) 1821 1822 # First run all grow ops in dry-run mode 1823 inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid) 1824 for node_uuid in inst_nodes: 1825 result = self.rpc.call_blockdev_grow(node_uuid, 1826 (self.disk, self.instance), 1827 self.delta, True, True, 1828 self.node_es_flags[node_uuid]) 1829 result.Raise("Dry-run grow request failed to node %s" % 1830 self.cfg.GetNodeName(node_uuid)) 1831 1832 if wipe_disks: 1833 # Get disk size from primary node for wiping 1834 result = self.rpc.call_blockdev_getdimensions( 1835 self.instance.primary_node, [([self.disk], self.instance)]) 1836 result.Raise("Failed to retrieve disk size from node '%s'" % 1837 self.instance.primary_node) 1838 1839 (disk_dimensions, ) = result.payload 1840 1841 if disk_dimensions is None: 1842 raise errors.OpExecError("Failed to retrieve disk size from primary" 1843 " node '%s'" % self.instance.primary_node) 1844 (disk_size_in_bytes, _) = disk_dimensions 1845 1846 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes) 1847 1848 assert old_disk_size >= self.disk.size, \ 1849 ("Retrieved disk size too small (got %s, should be at least %s)" % 1850 (old_disk_size, self.disk.size)) 1851 else: 1852 old_disk_size = None 1853 1854 # We know that (as far as we can test) operations across different 1855 # nodes will succeed, time to run it for real on the backing storage 1856 for node_uuid in inst_nodes: 1857 result = self.rpc.call_blockdev_grow(node_uuid, 1858 (self.disk, self.instance), 1859 self.delta, False, True, 1860 self.node_es_flags[node_uuid]) 1861 result.Raise("Grow request failed to node %s" % 1862 self.cfg.GetNodeName(node_uuid)) 1863 1864 # And now execute it for logical storage, on the primary node 1865 node_uuid = self.instance.primary_node 1866 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance), 1867 self.delta, False, False, 1868 self.node_es_flags[node_uuid]) 1869 result.Raise("Grow request failed to node %s" % 1870 self.cfg.GetNodeName(node_uuid)) 1871 1872 self.disk.RecordGrow(self.delta) 1873 self.cfg.Update(self.instance, feedback_fn) 1874 self.cfg.Update(self.disk, feedback_fn) 1875 1876 # Changes have been recorded, release node lock 1877 ReleaseLocks(self, locking.LEVEL_NODE) 1878 1879 # Downgrade lock while waiting for sync 1880 self.WConfdClient().DownGradeLocksLevel( 1881 locking.LEVEL_NAMES[locking.LEVEL_INSTANCE]) 1882 1883 assert wipe_disks ^ (old_disk_size is None) 1884 1885 if wipe_disks: 1886 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 1887 assert inst_disks[self.op.disk] == self.disk 1888 1889 # Wipe newly added disk space 1890 WipeDisks(self, self.instance, 1891 disks=[(self.op.disk, self.disk, old_disk_size)]) 1892 1893 if self.op.wait_for_sync: 1894 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk]) 1895 if disk_abort: 1896 self.LogWarning("Disk syncing has not returned a good status; check" 1897 " the instance") 1898 if not self.instance.disks_active: 1899 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk]) 1900 elif not self.instance.disks_active: 1901 self.LogWarning("Not shutting down the disk even if the instance is" 1902 " not supposed to be running because no wait for" 1903 " sync mode was requested") 1904 1905 assert self.owned_locks(locking.LEVEL_NODE_RES) 1906 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1907
1908 1909 -class LUInstanceReplaceDisks(LogicalUnit):
1910 """Replace the disks of an instance. 1911 1912 """ 1913 HPATH = "mirrors-replace" 1914 HTYPE = constants.HTYPE_INSTANCE 1915 REQ_BGL = False 1916
1917 - def CheckArguments(self):
1918 """Check arguments. 1919 1920 """ 1921 if self.op.mode == constants.REPLACE_DISK_CHG: 1922 if self.op.remote_node is None and self.op.iallocator is None: 1923 raise errors.OpPrereqError("When changing the secondary either an" 1924 " iallocator script must be used or the" 1925 " new node given", errors.ECODE_INVAL) 1926 else: 1927 CheckIAllocatorOrNode(self, "iallocator", "remote_node") 1928 1929 elif self.op.remote_node is not None or self.op.iallocator is not None: 1930 # Not replacing the secondary 1931 raise errors.OpPrereqError("The iallocator and new node options can" 1932 " only be used when changing the" 1933 " secondary node", errors.ECODE_INVAL)
1934
1935 - def ExpandNames(self):
1936 self._ExpandAndLockInstance(allow_forthcoming=True) 1937 1938 assert locking.LEVEL_NODE not in self.needed_locks 1939 assert locking.LEVEL_NODE_RES not in self.needed_locks 1940 assert locking.LEVEL_NODEGROUP not in self.needed_locks 1941 1942 assert self.op.iallocator is None or self.op.remote_node is None, \ 1943 "Conflicting options" 1944 1945 if self.op.remote_node is not None: 1946 (self.op.remote_node_uuid, self.op.remote_node) = \ 1947 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, 1948 self.op.remote_node) 1949 1950 # Warning: do not remove the locking of the new secondary here 1951 # unless DRBD8Dev.AddChildren is changed to work in parallel; 1952 # currently it doesn't since parallel invocations of 1953 # FindUnusedMinor will conflict 1954 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid] 1955 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND 1956 else: 1957 self.needed_locks[locking.LEVEL_NODE] = [] 1958 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 1959 1960 if self.op.iallocator is not None: 1961 # iallocator will select a new node in the same group 1962 self.needed_locks[locking.LEVEL_NODEGROUP] = [] 1963 1964 self.needed_locks[locking.LEVEL_NODE_RES] = [] 1965 1966 self.dont_collate_locks[locking.LEVEL_NODEGROUP] = True 1967 self.dont_collate_locks[locking.LEVEL_NODE] = True 1968 self.dont_collate_locks[locking.LEVEL_NODE_RES] = True 1969 1970 self.replacer = TLReplaceDisks(self, self.op.instance_uuid, 1971 self.op.instance_name, self.op.mode, 1972 self.op.iallocator, self.op.remote_node_uuid, 1973 self.op.disks, self.op.early_release, 1974 self.op.ignore_ipolicy) 1975 1976 self.tasklets = [self.replacer]
1977
1978 - def DeclareLocks(self, level):
1979 if level == locking.LEVEL_NODEGROUP: 1980 assert self.op.remote_node_uuid is None 1981 assert self.op.iallocator is not None 1982 assert not self.needed_locks[locking.LEVEL_NODEGROUP] 1983 1984 self.share_locks[locking.LEVEL_NODEGROUP] = 1 1985 # Lock all groups used by instance optimistically; this requires going 1986 # via the node before it's locked, requiring verification later on 1987 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 1988 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid) 1989 1990 elif level == locking.LEVEL_NODE: 1991 if self.op.iallocator is not None: 1992 assert self.op.remote_node_uuid is None 1993 assert not self.needed_locks[locking.LEVEL_NODE] 1994 1995 # Lock member nodes of all locked groups 1996 self.needed_locks[locking.LEVEL_NODE] = \ 1997 [node_uuid 1998 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 1999 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members] 2000 else: 2001 self._LockInstancesNodes() 2002 2003 elif level == locking.LEVEL_NODE_RES: 2004 # Reuse node locks 2005 self.needed_locks[locking.LEVEL_NODE_RES] = \ 2006 self.needed_locks[locking.LEVEL_NODE]
2007
2008 - def BuildHooksEnv(self):
2009 """Build hooks env. 2010 2011 This runs on the master, the primary and all the secondaries. 2012 2013 """ 2014 instance = self.replacer.instance 2015 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 2016 env = { 2017 "MODE": self.op.mode, 2018 "NEW_SECONDARY": self.op.remote_node, 2019 "OLD_SECONDARY": self.cfg.GetNodeName(secondary_nodes[0]), 2020 } 2021 env.update(BuildInstanceHookEnvByObject(self, instance)) 2022 return env
2023
2024 - def BuildHooksNodes(self):
2025 """Build hooks nodes. 2026 2027 """ 2028 instance = self.replacer.instance 2029 nl = [ 2030 self.cfg.GetMasterNode(), 2031 instance.primary_node, 2032 ] 2033 if self.op.remote_node_uuid is not None: 2034 nl.append(self.op.remote_node_uuid) 2035 return nl, nl
2036
2037 - def CheckPrereq(self):
2038 """Check prerequisites. 2039 2040 """ 2041 # Verify if node group locks are still correct 2042 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) 2043 if owned_groups: 2044 CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups) 2045 2046 return LogicalUnit.CheckPrereq(self)
2047
2048 2049 -class LUInstanceActivateDisks(NoHooksLU):
2050 """Bring up an instance's disks. 2051 2052 """ 2053 REQ_BGL = False 2054
2055 - def ExpandNames(self):
2056 self._ExpandAndLockInstance() 2057 self.needed_locks[locking.LEVEL_NODE] = [] 2058 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2059
2060 - def DeclareLocks(self, level):
2061 if level == locking.LEVEL_NODE: 2062 self._LockInstancesNodes()
2063
2064 - def CheckPrereq(self):
2065 """Check prerequisites. 2066 2067 This checks that the instance is in the cluster. 2068 2069 """ 2070 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 2071 assert self.instance is not None, \ 2072 "Cannot retrieve locked instance %s" % self.op.instance_name 2073 CheckNodeOnline(self, self.instance.primary_node)
2074
2075 - def Exec(self, feedback_fn):
2076 """Activate the disks. 2077 2078 """ 2079 disks_ok, disks_info, _ = AssembleInstanceDisks( 2080 self, self.instance, ignore_size=self.op.ignore_size) 2081 2082 if not disks_ok: 2083 raise errors.OpExecError("Cannot activate block devices") 2084 2085 if self.op.wait_for_sync: 2086 if not WaitForSync(self, self.instance): 2087 self.cfg.MarkInstanceDisksInactive(self.instance.uuid) 2088 raise errors.OpExecError("Some disks of the instance are degraded!") 2089 2090 return disks_info
2091
2092 2093 -class LUInstanceDeactivateDisks(NoHooksLU):
2094 """Shutdown an instance's disks. 2095 2096 """ 2097 REQ_BGL = False 2098
2099 - def ExpandNames(self):
2100 self._ExpandAndLockInstance() 2101 self.needed_locks[locking.LEVEL_NODE] = [] 2102 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2103
2104 - def DeclareLocks(self, level):
2105 if level == locking.LEVEL_NODE: 2106 self._LockInstancesNodes()
2107
2108 - def CheckPrereq(self):
2109 """Check prerequisites. 2110 2111 This checks that the instance is in the cluster. 2112 2113 """ 2114 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid) 2115 assert self.instance is not None, \ 2116 "Cannot retrieve locked instance %s" % self.op.instance_name
2117
2118 - def Exec(self, feedback_fn):
2119 """Deactivate the disks 2120 2121 """ 2122 if self.op.force: 2123 ShutdownInstanceDisks(self, self.instance) 2124 else: 2125 _SafeShutdownInstanceDisks(self, self.instance)
2126
2127 2128 -def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary, 2129 ldisk=False):
2130 """Check that mirrors are not degraded. 2131 2132 @attention: The device has to be annotated already. 2133 2134 The ldisk parameter, if True, will change the test from the 2135 is_degraded attribute (which represents overall non-ok status for 2136 the device(s)) to the ldisk (representing the local storage status). 2137 2138 """ 2139 result = True 2140 2141 if on_primary or dev.AssembleOnSecondary(): 2142 rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance)) 2143 msg = rstats.fail_msg 2144 if msg: 2145 lu.LogWarning("Can't find disk on node %s: %s", 2146 lu.cfg.GetNodeName(node_uuid), msg) 2147 result = False 2148 elif not rstats.payload: 2149 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid)) 2150 result = False 2151 else: 2152 if ldisk: 2153 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY 2154 else: 2155 result = result and not rstats.payload.is_degraded 2156 2157 if dev.children: 2158 for child in dev.children: 2159 result = result and _CheckDiskConsistencyInner(lu, instance, child, 2160 node_uuid, on_primary) 2161 2162 return result
2163
2164 2165 -def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
2166 """Wrapper around L{_CheckDiskConsistencyInner}. 2167 2168 """ 2169 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 2170 return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary, 2171 ldisk=ldisk)
2172
2173 2174 -def _BlockdevFind(lu, node_uuid, dev, instance):
2175 """Wrapper around call_blockdev_find to annotate diskparams. 2176 2177 @param lu: A reference to the lu object 2178 @param node_uuid: The node to call out 2179 @param dev: The device to find 2180 @param instance: The instance object the device belongs to 2181 @returns The result of the rpc call 2182 2183 """ 2184 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg) 2185 return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
2186
2187 2188 -def _GenerateUniqueNames(lu, exts):
2189 """Generate a suitable LV name. 2190 2191 This will generate a logical volume name for the given instance. 2192 2193 """ 2194 results = [] 2195 for val in exts: 2196 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) 2197 results.append("%s%s" % (new_id, val)) 2198 return results
2199
2200 2201 -class TLReplaceDisks(Tasklet):
2202 """Replaces disks for an instance. 2203 2204 Note: Locking is not within the scope of this class. 2205 2206 """
2207 - def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name, 2208 remote_node_uuid, disks, early_release, ignore_ipolicy):
2209 """Initializes this class. 2210 2211 """ 2212 Tasklet.__init__(self, lu) 2213 2214 # Parameters 2215 self.instance_uuid = instance_uuid 2216 self.instance_name = instance_name 2217 self.mode = mode 2218 self.iallocator_name = iallocator_name 2219 self.remote_node_uuid = remote_node_uuid 2220 self.disks = disks 2221 self.early_release = early_release 2222 self.ignore_ipolicy = ignore_ipolicy 2223 2224 # Runtime data 2225 self.instance = None 2226 self.new_node_uuid = None 2227 self.target_node_uuid = None 2228 self.other_node_uuid = None 2229 self.remote_node_info = None 2230 self.node_secondary_ip = None
2231 2232 @staticmethod
2233 - def _RunAllocator(lu, iallocator_name, instance_uuid, 2234 relocate_from_node_uuids):
2235 """Compute a new secondary node using an IAllocator. 2236 2237 """ 2238 req = iallocator.IAReqRelocate( 2239 inst_uuid=instance_uuid, 2240 relocate_from_node_uuids=list(relocate_from_node_uuids)) 2241 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req) 2242 2243 ial.Run(iallocator_name) 2244 2245 if not ial.success: 2246 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" 2247 " %s" % (iallocator_name, ial.info), 2248 errors.ECODE_NORES) 2249 2250 remote_node_name = ial.result[0] 2251 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name) 2252 2253 if remote_node is None: 2254 raise errors.OpPrereqError("Node %s not found in configuration" % 2255 remote_node_name, errors.ECODE_NOENT) 2256 2257 lu.LogInfo("Selected new secondary for instance '%s': %s", 2258 instance_uuid, remote_node_name) 2259 2260 return remote_node.uuid
2261
2262 - def _FindFaultyDisks(self, node_uuid):
2263 """Wrapper for L{FindFaultyInstanceDisks}. 2264 2265 """ 2266 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, 2267 node_uuid, True)
2268
2269 - def _CheckDisksActivated(self, instance):
2270 """Checks if the instance disks are activated. 2271 2272 @param instance: The instance to check disks 2273 @return: True if they are activated, False otherwise 2274 2275 """ 2276 node_uuids = self.cfg.GetInstanceNodes(instance.uuid) 2277 2278 for idx, dev in enumerate(self.cfg.GetInstanceDisks(instance.uuid)): 2279 for node_uuid in node_uuids: 2280 self.lu.LogInfo("Checking disk/%d on %s", idx, 2281 self.cfg.GetNodeName(node_uuid)) 2282 2283 result = _BlockdevFind(self, node_uuid, dev, instance) 2284 2285 if result.offline: 2286 continue 2287 elif result.fail_msg or not result.payload: 2288 return False 2289 2290 return True
2291
2292 - def CheckPrereq(self):
2293 """Check prerequisites. 2294 2295 This checks that the instance is in the cluster. 2296 2297 """ 2298 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 2299 assert self.instance is not None, \ 2300 "Cannot retrieve locked instance %s" % self.instance_name 2301 2302 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 2303 if len(secondary_nodes) != 1: 2304 raise errors.OpPrereqError("The instance has a strange layout," 2305 " expected one secondary but found %d" % 2306 len(secondary_nodes), 2307 errors.ECODE_FAULT) 2308 2309 secondary_node_uuid = secondary_nodes[0] 2310 2311 if self.iallocator_name is None: 2312 remote_node_uuid = self.remote_node_uuid 2313 else: 2314 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name, 2315 self.instance.uuid, 2316 secondary_nodes) 2317 2318 if remote_node_uuid is None: 2319 self.remote_node_info = None 2320 else: 2321 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \ 2322 "Remote node '%s' is not locked" % remote_node_uuid 2323 2324 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid) 2325 assert self.remote_node_info is not None, \ 2326 "Cannot retrieve locked node %s" % remote_node_uuid 2327 2328 if remote_node_uuid == self.instance.primary_node: 2329 raise errors.OpPrereqError("The specified node is the primary node of" 2330 " the instance", errors.ECODE_INVAL) 2331 2332 if remote_node_uuid == secondary_node_uuid: 2333 raise errors.OpPrereqError("The specified node is already the" 2334 " secondary node of the instance", 2335 errors.ECODE_INVAL) 2336 2337 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, 2338 constants.REPLACE_DISK_CHG): 2339 raise errors.OpPrereqError("Cannot specify disks to be replaced", 2340 errors.ECODE_INVAL) 2341 2342 if self.mode == constants.REPLACE_DISK_AUTO: 2343 if not self._CheckDisksActivated(self.instance): 2344 raise errors.OpPrereqError("Please run activate-disks on instance %s" 2345 " first" % self.instance_name, 2346 errors.ECODE_STATE) 2347 faulty_primary = self._FindFaultyDisks(self.instance.primary_node) 2348 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid) 2349 2350 if faulty_primary and faulty_secondary: 2351 raise errors.OpPrereqError("Instance %s has faulty disks on more than" 2352 " one node and can not be repaired" 2353 " automatically" % self.instance_name, 2354 errors.ECODE_STATE) 2355 2356 if faulty_primary: 2357 self.disks = faulty_primary 2358 self.target_node_uuid = self.instance.primary_node 2359 self.other_node_uuid = secondary_node_uuid 2360 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2361 elif faulty_secondary: 2362 self.disks = faulty_secondary 2363 self.target_node_uuid = secondary_node_uuid 2364 self.other_node_uuid = self.instance.primary_node 2365 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2366 else: 2367 self.disks = [] 2368 check_nodes = [] 2369 2370 else: 2371 # Non-automatic modes 2372 if self.mode == constants.REPLACE_DISK_PRI: 2373 self.target_node_uuid = self.instance.primary_node 2374 self.other_node_uuid = secondary_node_uuid 2375 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2376 2377 elif self.mode == constants.REPLACE_DISK_SEC: 2378 self.target_node_uuid = secondary_node_uuid 2379 self.other_node_uuid = self.instance.primary_node 2380 check_nodes = [self.target_node_uuid, self.other_node_uuid] 2381 2382 elif self.mode == constants.REPLACE_DISK_CHG: 2383 self.new_node_uuid = remote_node_uuid 2384 self.other_node_uuid = self.instance.primary_node 2385 self.target_node_uuid = secondary_node_uuid 2386 check_nodes = [self.new_node_uuid, self.other_node_uuid] 2387 2388 CheckNodeNotDrained(self.lu, remote_node_uuid) 2389 CheckNodeVmCapable(self.lu, remote_node_uuid) 2390 2391 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid) 2392 assert old_node_info is not None 2393 if old_node_info.offline and not self.early_release: 2394 # doesn't make sense to delay the release 2395 self.early_release = True 2396 self.lu.LogInfo("Old secondary %s is offline, automatically enabling" 2397 " early-release mode", secondary_node_uuid) 2398 2399 else: 2400 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % 2401 self.mode) 2402 2403 # If not specified all disks should be replaced 2404 if not self.disks: 2405 self.disks = range(len(self.instance.disks)) 2406 2407 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 2408 if (not disks or 2409 not utils.AllDiskOfType(disks, [constants.DT_DRBD8])): 2410 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" 2411 " instances", errors.ECODE_INVAL) 2412 2413 # TODO: This is ugly, but right now we can't distinguish between internal 2414 # submitted opcode and external one. We should fix that. 2415 if self.remote_node_info: 2416 # We change the node, lets verify it still meets instance policy 2417 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) 2418 cluster = self.cfg.GetClusterInfo() 2419 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 2420 new_group_info) 2421 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, 2422 self.remote_node_info, self.cfg, 2423 ignore=self.ignore_ipolicy) 2424 2425 for node_uuid in check_nodes: 2426 CheckNodeOnline(self.lu, node_uuid) 2427 2428 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid, 2429 self.other_node_uuid, 2430 self.target_node_uuid] 2431 if node_uuid is not None) 2432 2433 # Release unneeded node and node resource locks 2434 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) 2435 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) 2436 2437 # Release any owned node group 2438 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) 2439 2440 # Check whether disks are valid 2441 for disk_idx in self.disks: 2442 self.instance.FindDisk(disk_idx) 2443 2444 # Get secondary node IP addresses 2445 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node) 2446 in self.cfg.GetMultiNodeInfo(touched_nodes))
2447
2448 - def Exec(self, feedback_fn):
2449 """Execute disk replacement. 2450 2451 This dispatches the disk replacement to the appropriate handler. 2452 2453 """ 2454 if __debug__: 2455 # Verify owned locks before starting operation 2456 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) 2457 assert set(owned_nodes) == set(self.node_secondary_ip), \ 2458 ("Incorrect node locks, owning %s, expected %s" % 2459 (owned_nodes, self.node_secondary_ip.keys())) 2460 assert (self.lu.owned_locks(locking.LEVEL_NODE) == 2461 self.lu.owned_locks(locking.LEVEL_NODE_RES)) 2462 2463 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) 2464 assert list(owned_instances) == [self.instance_name], \ 2465 "Instance '%s' not locked" % self.instance_name 2466 2467 if not self.disks: 2468 feedback_fn("No disks need replacement for instance '%s'" % 2469 self.instance.name) 2470 return 2471 2472 feedback_fn("Replacing disk(s) %s for instance '%s'" % 2473 (utils.CommaJoin(self.disks), self.instance.name)) 2474 feedback_fn("Current primary node: %s" % 2475 self.cfg.GetNodeName(self.instance.primary_node)) 2476 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 2477 feedback_fn("Current secondary node: %s" % 2478 utils.CommaJoin(self.cfg.GetNodeNames(secondary_nodes))) 2479 2480 activate_disks = not self.instance.disks_active 2481 2482 # Activate the instance disks if we're replacing them on a down instance 2483 # that is real (forthcoming instances currently only have forthcoming 2484 # disks). 2485 if activate_disks and not self.instance.forthcoming: 2486 StartInstanceDisks(self.lu, self.instance, True) 2487 # Re-read the instance object modified by the previous call 2488 self.instance = self.cfg.GetInstanceInfo(self.instance.uuid) 2489 2490 try: 2491 # Should we replace the secondary node? 2492 if self.new_node_uuid is not None: 2493 fn = self._ExecDrbd8Secondary 2494 else: 2495 fn = self._ExecDrbd8DiskOnly 2496 2497 result = fn(feedback_fn) 2498 finally: 2499 # Deactivate the instance disks if we're replacing them on a 2500 # down instance 2501 if activate_disks and not self.instance.forthcoming: 2502 _SafeShutdownInstanceDisks(self.lu, self.instance, 2503 req_states=INSTANCE_NOT_RUNNING) 2504 2505 self.lu.AssertReleasedLocks(locking.LEVEL_NODE) 2506 2507 if __debug__: 2508 # Verify owned locks 2509 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES) 2510 nodes = frozenset(self.node_secondary_ip) 2511 assert ((self.early_release and not owned_nodes) or 2512 (not self.early_release and not (set(owned_nodes) - nodes))), \ 2513 ("Not owning the correct locks, early_release=%s, owned=%r," 2514 " nodes=%r" % (self.early_release, owned_nodes, nodes)) 2515 2516 return result
2517
2518 - def _CheckVolumeGroup(self, node_uuids):
2519 self.lu.LogInfo("Checking volume groups") 2520 2521 vgname = self.cfg.GetVGName() 2522 2523 # Make sure volume group exists on all involved nodes 2524 results = self.rpc.call_vg_list(node_uuids) 2525 if not results: 2526 raise errors.OpExecError("Can't list volume groups on the nodes") 2527 2528 for node_uuid in node_uuids: 2529 res = results[node_uuid] 2530 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid)) 2531 if vgname not in res.payload: 2532 raise errors.OpExecError("Volume group '%s' not found on node %s" % 2533 (vgname, self.cfg.GetNodeName(node_uuid)))
2534
2535 - def _CheckDisksExistence(self, node_uuids):
2536 # Check disk existence 2537 for idx, dev in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)): 2538 if idx not in self.disks: 2539 continue 2540 2541 for node_uuid in node_uuids: 2542 self.lu.LogInfo("Checking disk/%d on %s", idx, 2543 self.cfg.GetNodeName(node_uuid)) 2544 2545 result = _BlockdevFind(self, node_uuid, dev, self.instance) 2546 2547 msg = result.fail_msg 2548 if msg or not result.payload: 2549 if not msg: 2550 msg = "disk not found" 2551 if not self._CheckDisksActivated(self.instance): 2552 extra_hint = ("\nDisks seem to be not properly activated. Try" 2553 " running activate-disks on the instance before" 2554 " using replace-disks.") 2555 else: 2556 extra_hint = "" 2557 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" % 2558 (idx, self.cfg.GetNodeName(node_uuid), msg, 2559 extra_hint))
2560
2561 - def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2562 for idx, dev in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)): 2563 if idx not in self.disks: 2564 continue 2565 2566 self.lu.LogInfo("Checking disk/%d consistency on node %s" % 2567 (idx, self.cfg.GetNodeName(node_uuid))) 2568 2569 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid, 2570 on_primary, ldisk=ldisk): 2571 raise errors.OpExecError("Node %s has degraded storage, unsafe to" 2572 " replace disks for instance %s" % 2573 (self.cfg.GetNodeName(node_uuid), 2574 self.instance.name))
2575
2576 - def _CreateNewStorage(self, node_uuid):
2577 """Create new storage on the primary or secondary node. 2578 2579 This is only used for same-node replaces, not for changing the 2580 secondary node, hence we don't want to modify the existing disk. 2581 2582 """ 2583 iv_names = {} 2584 2585 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 2586 disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg) 2587 for idx, dev in enumerate(disks): 2588 if idx not in self.disks: 2589 continue 2590 2591 self.lu.LogInfo("Adding storage on %s for disk/%d", 2592 self.cfg.GetNodeName(node_uuid), idx) 2593 2594 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] 2595 names = _GenerateUniqueNames(self.lu, lv_names) 2596 2597 (data_disk, meta_disk) = dev.children 2598 vg_data = data_disk.logical_id[0] 2599 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size, 2600 logical_id=(vg_data, names[0]), 2601 params=data_disk.params) 2602 vg_meta = meta_disk.logical_id[0] 2603 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN, 2604 size=constants.DRBD_META_SIZE, 2605 logical_id=(vg_meta, names[1]), 2606 params=meta_disk.params) 2607 2608 new_lvs = [lv_data, lv_meta] 2609 old_lvs = [child.Copy() for child in dev.children] 2610 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) 2611 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid) 2612 2613 # we pass force_create=True to force the LVM creation 2614 for new_lv in new_lvs: 2615 try: 2616 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True, 2617 GetInstanceInfoText(self.instance), False, 2618 excl_stor) 2619 except errors.DeviceCreationError, e: 2620 raise errors.OpExecError("Can't create block device: %s" % e.message) 2621 2622 return iv_names
2623
2624 - def _CheckDevices(self, node_uuid, iv_names):
2625 for name, (dev, _, _) in iv_names.iteritems(): 2626 result = _BlockdevFind(self, node_uuid, dev, self.instance) 2627 2628 msg = result.fail_msg 2629 if msg or not result.payload: 2630 if not msg: 2631 msg = "disk not found" 2632 raise errors.OpExecError("Can't find DRBD device %s: %s" % 2633 (name, msg)) 2634 2635 if result.payload.is_degraded: 2636 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2637
2638 - def _RemoveOldStorage(self, node_uuid, iv_names):
2639 for name, (_, old_lvs, _) in iv_names.iteritems(): 2640 self.lu.LogInfo("Remove logical volumes for %s", name) 2641 2642 for lv in old_lvs: 2643 msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \ 2644 .fail_msg 2645 if msg: 2646 self.lu.LogWarning("Can't remove old LV: %s", msg, 2647 hint="remove unused LVs manually")
2648
2649 - def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2650 """Replace a disk on the primary or secondary for DRBD 8. 2651 2652 The algorithm for replace is quite complicated: 2653 2654 1. for each disk to be replaced: 2655 2656 1. create new LVs on the target node with unique names 2657 1. detach old LVs from the drbd device 2658 1. rename old LVs to name_replaced.<time_t> 2659 1. rename new LVs to old LVs 2660 1. attach the new LVs (with the old names now) to the drbd device 2661 2662 1. wait for sync across all devices 2663 2664 1. for each modified disk: 2665 2666 1. remove old LVs (which have the name name_replaces.<time_t>) 2667 2668 Failures are not very well handled. 2669 2670 """ 2671 steps_total = 6 2672 2673 if self.instance.forthcoming: 2674 feedback_fn("Instance forthcoming, not touching disks") 2675 return 2676 2677 # Step: check device activation 2678 self.lu.LogStep(1, steps_total, "Check device existence") 2679 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid]) 2680 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid]) 2681 2682 # Step: check other node consistency 2683 self.lu.LogStep(2, steps_total, "Check peer consistency") 2684 self._CheckDisksConsistency( 2685 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node, 2686 False) 2687 2688 # Step: create new storage 2689 self.lu.LogStep(3, steps_total, "Allocate new storage") 2690 iv_names = self._CreateNewStorage(self.target_node_uuid) 2691 2692 # Step: for each lv, detach+rename*2+attach 2693 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2694 for dev, old_lvs, new_lvs in iv_names.itervalues(): 2695 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name) 2696 2697 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, 2698 (dev, self.instance), 2699 (old_lvs, self.instance)) 2700 result.Raise("Can't detach drbd from local storage on node" 2701 " %s for device %s" % 2702 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name)) 2703 #dev.children = [] 2704 #cfg.Update(instance) 2705 2706 # ok, we created the new LVs, so now we know we have the needed 2707 # storage; as such, we proceed on the target node to rename 2708 # old_lv to _old, and new_lv to old_lv; note that we rename LVs 2709 # using the assumption that logical_id == unique_id on that node 2710 2711 # FIXME(iustin): use a better name for the replaced LVs 2712 temp_suffix = int(time.time()) 2713 ren_fn = lambda d, suff: (d.logical_id[0], 2714 d.logical_id[1] + "_replaced-%s" % suff) 2715 2716 # Build the rename list based on what LVs exist on the node 2717 rename_old_to_new = [] 2718 for to_ren in old_lvs: 2719 result = self.rpc.call_blockdev_find(self.target_node_uuid, 2720 (to_ren, self.instance)) 2721 if not result.fail_msg and result.payload: 2722 # device exists 2723 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) 2724 2725 self.lu.LogInfo("Renaming the old LVs on the target node") 2726 result = self.rpc.call_blockdev_rename(self.target_node_uuid, 2727 rename_old_to_new) 2728 result.Raise("Can't rename old LVs on node %s" % 2729 self.cfg.GetNodeName(self.target_node_uuid)) 2730 2731 # Now we rename the new LVs to the old LVs 2732 self.lu.LogInfo("Renaming the new LVs on the target node") 2733 rename_new_to_old = [(new, old.logical_id) 2734 for old, new in zip(old_lvs, new_lvs)] 2735 result = self.rpc.call_blockdev_rename(self.target_node_uuid, 2736 rename_new_to_old) 2737 result.Raise("Can't rename new LVs on node %s" % 2738 self.cfg.GetNodeName(self.target_node_uuid)) 2739 2740 # Intermediate steps of in memory modifications 2741 for old, new in zip(old_lvs, new_lvs): 2742 new.logical_id = old.logical_id 2743 2744 # We need to modify old_lvs so that removal later removes the 2745 # right LVs, not the newly added ones; note that old_lvs is a 2746 # copy here 2747 for disk in old_lvs: 2748 disk.logical_id = ren_fn(disk, temp_suffix) 2749 2750 # Now that the new lvs have the old name, we can add them to the device 2751 self.lu.LogInfo("Adding new mirror component on %s", 2752 self.cfg.GetNodeName(self.target_node_uuid)) 2753 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid, 2754 (dev, self.instance), 2755 (new_lvs, self.instance)) 2756 msg = result.fail_msg 2757 if msg: 2758 for new_lv in new_lvs: 2759 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid, 2760 (new_lv, self.instance)).fail_msg 2761 if msg2: 2762 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, 2763 hint=("cleanup manually the unused logical" 2764 "volumes")) 2765 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) 2766 2767 cstep = itertools.count(5) 2768 2769 if self.early_release: 2770 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2771 self._RemoveOldStorage(self.target_node_uuid, iv_names) 2772 # TODO: Check if releasing locks early still makes sense 2773 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2774 else: 2775 # Release all resource locks except those used by the instance 2776 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2777 keep=self.node_secondary_ip.keys()) 2778 2779 # Release all node locks while waiting for sync 2780 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2781 2782 # TODO: Can the instance lock be downgraded here? Take the optional disk 2783 # shutdown in the caller into consideration. 2784 2785 # Wait for sync 2786 # This can fail as the old devices are degraded and _WaitForSync 2787 # does a combined result over all disks, so we don't check its return value 2788 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 2789 WaitForSync(self.lu, self.instance) 2790 2791 # Check all devices manually 2792 self._CheckDevices(self.instance.primary_node, iv_names) 2793 2794 # Step: remove old storage 2795 if not self.early_release: 2796 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2797 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2798
2799 - def _UpdateDisksSecondary(self, iv_names, feedback_fn):
2800 """Update the configuration of disks to have a new secondary. 2801 2802 @param iv_names: iterable of triples for all volumes of the instance. 2803 The first component has to be the device and the third the logical 2804 id. 2805 @param feedback_fn: function to used send feedback back to the caller of 2806 the OpCode 2807 """ 2808 self.lu.LogInfo("Updating instance configuration") 2809 for dev, _, new_logical_id in iv_names.itervalues(): 2810 dev.logical_id = new_logical_id 2811 self.cfg.Update(dev, feedback_fn) 2812 self.cfg.SetDiskNodes(dev.uuid, [self.instance.primary_node, 2813 self.new_node_uuid]) 2814 2815 self.cfg.Update(self.instance, feedback_fn)
2816
2817 - def _ExecDrbd8Secondary(self, feedback_fn):
2818 """Replace the secondary node for DRBD 8. 2819 2820 The algorithm for replace is quite complicated: 2821 - for all disks of the instance: 2822 - create new LVs on the new node with same names 2823 - shutdown the drbd device on the old secondary 2824 - disconnect the drbd network on the primary 2825 - create the drbd device on the new secondary 2826 - network attach the drbd on the primary, using an artifice: 2827 the drbd code for Attach() will connect to the network if it 2828 finds a device which is connected to the good local disks but 2829 not network enabled 2830 - wait for sync across all devices 2831 - remove all disks from the old secondary 2832 2833 Failures are not very well handled. 2834 2835 """ 2836 if self.instance.forthcoming: 2837 feedback_fn("Instance fortcoming, will only update the configuration") 2838 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 2839 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid 2840 for _ in inst_disks], 2841 self.instance.uuid) 2842 logging.debug("Allocated minors %r", minors) 2843 iv_names = {} 2844 for idx, (dev, new_minor) in enumerate(zip(inst_disks, minors)): 2845 (o_node1, _, o_port, o_minor1, o_minor2, o_secret) = \ 2846 dev.logical_id 2847 if self.instance.primary_node == o_node1: 2848 p_minor = o_minor1 2849 else: 2850 p_minor = o_minor2 2851 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port, 2852 p_minor, new_minor, o_secret) 2853 iv_names[idx] = (dev, dev.children, new_net_id) 2854 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, 2855 new_net_id) 2856 self._UpdateDisksSecondary(iv_names, feedback_fn) 2857 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2858 return 2859 2860 steps_total = 6 2861 2862 pnode = self.instance.primary_node 2863 2864 # Step: check device activation 2865 self.lu.LogStep(1, steps_total, "Check device existence") 2866 self._CheckDisksExistence([self.instance.primary_node]) 2867 self._CheckVolumeGroup([self.instance.primary_node]) 2868 2869 # Step: check other node consistency 2870 self.lu.LogStep(2, steps_total, "Check peer consistency") 2871 self._CheckDisksConsistency(self.instance.primary_node, True, True) 2872 2873 # Step: create new storage 2874 self.lu.LogStep(3, steps_total, "Allocate new storage") 2875 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 2876 disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg) 2877 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, 2878 self.new_node_uuid) 2879 for idx, dev in enumerate(disks): 2880 self.lu.LogInfo("Adding new local storage on %s for disk/%d" % 2881 (self.cfg.GetNodeName(self.new_node_uuid), idx)) 2882 # we pass force_create=True to force LVM creation 2883 for new_lv in dev.children: 2884 try: 2885 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance, 2886 new_lv, True, GetInstanceInfoText(self.instance), 2887 False, excl_stor) 2888 except errors.DeviceCreationError, e: 2889 raise errors.OpExecError("Can't create block device: %s" % e.message) 2890 2891 # Step 4: dbrd minors and drbd setups changes 2892 # after this, we must manually remove the drbd minors on both the 2893 # error and the success paths 2894 self.lu.LogStep(4, steps_total, "Changing drbd configuration") 2895 minors = [] 2896 for disk in inst_disks: 2897 minor = self.cfg.AllocateDRBDMinor([self.new_node_uuid], disk.uuid) 2898 minors.append(minor[0]) 2899 logging.debug("Allocated minors %r", minors) 2900 2901 iv_names = {} 2902 for idx, (dev, new_minor) in enumerate(zip(inst_disks, minors)): 2903 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % 2904 (self.cfg.GetNodeName(self.new_node_uuid), idx)) 2905 # create new devices on new_node; note that we create two IDs: 2906 # one without port, so the drbd will be activated without 2907 # networking information on the new node at this stage, and one 2908 # with network, for the latter activation in step 4 2909 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id 2910 if self.instance.primary_node == o_node1: 2911 p_minor = o_minor1 2912 else: 2913 assert self.instance.primary_node == o_node2, "Three-node instance?" 2914 p_minor = o_minor2 2915 2916 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None, 2917 p_minor, new_minor, o_secret) 2918 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port, 2919 p_minor, new_minor, o_secret) 2920 2921 iv_names[idx] = (dev, dev.children, new_net_id) 2922 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, 2923 new_net_id) 2924 new_drbd = objects.Disk(dev_type=constants.DT_DRBD8, 2925 logical_id=new_alone_id, 2926 children=dev.children, 2927 size=dev.size, 2928 params={}) 2929 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd], 2930 self.cfg) 2931 try: 2932 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance, 2933 anno_new_drbd, 2934 GetInstanceInfoText(self.instance), False, 2935 excl_stor) 2936 except errors.GenericError: 2937 for disk in inst_disks: 2938 self.cfg.ReleaseDRBDMinors(disk.uuid) 2939 raise 2940 2941 # We have new devices, shutdown the drbd on the old secondary 2942 2943 for idx, dev in enumerate(inst_disks): 2944 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx) 2945 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid, 2946 (dev, self.instance)).fail_msg 2947 if msg: 2948 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" 2949 "node: %s" % (idx, msg), 2950 hint=("Please cleanup this device manually as" 2951 " soon as possible")) 2952 2953 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") 2954 result = self.rpc.call_drbd_disconnect_net( 2955 [pnode], (inst_disks, self.instance))[pnode] 2956 2957 msg = result.fail_msg 2958 if msg: 2959 # detaches didn't succeed (unlikely) 2960 for disk in inst_disks: 2961 self.cfg.ReleaseDRBDMinors(disk.uuid) 2962 raise errors.OpExecError("Can't detach the disks from the network on" 2963 " old node: %s" % (msg,)) 2964 2965 # if we managed to detach at least one, we update all the disks of 2966 # the instance to point to the new secondary 2967 self._UpdateDisksSecondary(iv_names, feedback_fn) 2968 2969 # Release all node locks (the configuration has been updated) 2970 ReleaseLocks(self.lu, locking.LEVEL_NODE) 2971 2972 # and now perform the drbd attach 2973 self.lu.LogInfo("Attaching primary drbds to new secondary" 2974 " (standalone => connected)") 2975 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 2976 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, 2977 self.new_node_uuid], 2978 (inst_disks, self.instance), 2979 False) 2980 for to_node, to_result in result.items(): 2981 msg = to_result.fail_msg 2982 if msg: 2983 raise errors.OpExecError( 2984 "Can't attach drbd disks on node %s: %s (please do a gnt-instance " 2985 "info %s to see the status of disks)" % 2986 (self.cfg.GetNodeName(to_node), msg, self.instance.name)) 2987 2988 cstep = itertools.count(5) 2989 2990 if self.early_release: 2991 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 2992 self._RemoveOldStorage(self.target_node_uuid, iv_names) 2993 # TODO: Check if releasing locks early still makes sense 2994 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) 2995 else: 2996 # Release all resource locks except those used by the instance 2997 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, 2998 keep=self.node_secondary_ip.keys()) 2999 3000 # TODO: Can the instance lock be downgraded here? Take the optional disk 3001 # shutdown in the caller into consideration. 3002 3003 # Wait for sync 3004 # This can fail as the old devices are degraded and _WaitForSync 3005 # does a combined result over all disks, so we don't check its return value 3006 self.lu.LogStep(cstep.next(), steps_total, "Sync devices") 3007 WaitForSync(self.lu, self.instance) 3008 3009 # Check all devices manually 3010 self._CheckDevices(self.instance.primary_node, iv_names) 3011 3012 # Step: remove old storage 3013 if not self.early_release: 3014 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") 3015 self._RemoveOldStorage(self.target_node_uuid, iv_names)
3016
3017 3018 -class TemporaryDisk():
3019 """ Creates a new temporary bootable disk, and makes sure it is destroyed. 3020 3021 Is a context manager, and should be used with the ``with`` statement as such. 3022 3023 The disk is guaranteed to be created at index 0, shifting any other disks of 3024 the instance by one place, and allowing the instance to be booted with the 3025 content of the disk. 3026 3027 """ 3028
3029 - def __init__(self, lu, instance, disks, feedback_fn, 3030 shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
3031 """ Constructor storing arguments until used later. 3032 3033 @type lu: L{ganeti.cmdlib.base.LogicalUnit} 3034 @param lu: The LU within which this disk is created. 3035 3036 @type instance: L{ganeti.objects.Instance} 3037 @param instance: The instance to which the disk should be added 3038 3039 @type disks: list of triples (disk template, disk access mode, int) 3040 @param disks: 3041 disk specification, which is a list of triples containing the 3042 disk template (e.g., L{constants.DT_PLAIN}), the disk access 3043 mode (i.e., L{constants.DISK_RDONLY} or L{constants.DISK_RDWR}), 3044 and size in MiB. 3045 3046 @type feedback_fn: function 3047 @param feedback_fn: Function used to log progress 3048 3049 """ 3050 self._lu = lu 3051 self._instance = instance 3052 self._disks = disks 3053 self._feedback_fn = feedback_fn 3054 self._shutdown_timeout = shutdown_timeout
3055
3056 - def _EnsureInstanceDiskState(self):
3057 """ Ensures that the instance is down, and its disks inactive. 3058 3059 All the operations related to the creation and destruction of disks require 3060 that the instance is down and that the disks are inactive. This function is 3061 invoked to make it so. 3062 3063 """ 3064 # The instance needs to be down before any of these actions occur 3065 # Whether it is must be checked manually through a RPC - configuration 3066 # reflects only the desired state 3067 self._feedback_fn("Shutting down instance") 3068 result = self._lu.rpc.call_instance_shutdown(self._instance.primary_node, 3069 self._instance, 3070 self._shutdown_timeout, 3071 self._lu.op.reason) 3072 result.Raise("Shutdown of instance '%s' while removing temporary disk " 3073 "failed" % self._instance.name) 3074 3075 # Disks need to be deactivated prior to being removed 3076 # The disks_active configuration entry should match the actual state 3077 if self._instance.disks_active: 3078 self._feedback_fn("Deactivating disks") 3079 ShutdownInstanceDisks(self._lu, self._instance)
3080
3081 - def __enter__(self):
3082 """ Context manager entry function, creating the disk. 3083 3084 @rtype: L{ganeti.objects.Disk} 3085 @return: The disk object created. 3086 3087 """ 3088 self._EnsureInstanceDiskState() 3089 3090 new_disks = [] 3091 3092 # The iv_name of the disk intentionally diverges from Ganeti's standards, as 3093 # this disk should be very temporary and its presence should be reported. 3094 # With the special iv_name, gnt-cluster verify detects the disk and warns 3095 # the user of its presence. Removing the disk restores the instance to its 3096 # proper state, despite an error that appears when the removal is performed. 3097 for idx, (disk_template, disk_access, disk_size) in enumerate(self._disks): 3098 new_disk = objects.Disk() 3099 new_disk.dev_type = disk_template 3100 new_disk.mode = disk_access 3101 new_disk.uuid = self._lu.cfg.GenerateUniqueID(self._lu.proc.GetECId()) 3102 new_disk.logical_id = (self._lu.cfg.GetVGName(), new_disk.uuid) 3103 new_disk.params = {} 3104 new_disk.size = disk_size 3105 3106 new_disks.append(new_disk) 3107 3108 self._feedback_fn("Attempting to create temporary disk") 3109 3110 self._undoing_info = CreateDisks(self._lu, self._instance, disks=new_disks) 3111 for idx, new_disk in enumerate(new_disks): 3112 self._lu.cfg.AddInstanceDisk(self._instance.uuid, new_disk, idx=idx) 3113 self._instance = self._lu.cfg.GetInstanceInfo(self._instance.uuid) 3114 3115 self._feedback_fn("Temporary disk created") 3116 3117 self._new_disks = new_disks 3118 3119 return new_disks
3120
3121 - def __exit__(self, exc_type, _value, _traceback):
3122 """ Context manager exit function, destroying the disk. 3123 3124 """ 3125 if exc_type: 3126 self._feedback_fn("Exception raised, cleaning up temporary disk") 3127 else: 3128 self._feedback_fn("Regular cleanup of temporary disk") 3129 3130 try: 3131 self._EnsureInstanceDiskState() 3132 3133 _UndoCreateDisks(self._lu, self._undoing_info, self._instance) 3134 3135 for disk in self._new_disks: 3136 self._lu.cfg.RemoveInstanceDisk(self._instance.uuid, disk.uuid) 3137 self._instance = self._lu.cfg.GetInstanceInfo(self._instance.uuid) 3138 3139 self._feedback_fn("Temporary disk removed") 3140 except: 3141 self._feedback_fn("Disk cleanup failed; it will have to be removed " 3142 "manually") 3143 raise
3144