1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import rpc
38 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43 CheckDiskTemplateEnabled
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52 constants.DT_PLAIN: "",
53 constants.DT_RBD: ".rbd",
54 constants.DT_EXT: ".ext",
55 constants.DT_FILE: ".file",
56 constants.DT_SHARED_FILE: ".sharedfile",
57 }
62 """Create a single block device on a given node.
63
64 This will not recurse over children of the device, so they must be
65 created in advance.
66
67 @param lu: the lu on whose behalf we execute
68 @param node_uuid: the node on which to create the device
69 @type instance: L{objects.Instance}
70 @param instance: the instance which owns the device
71 @type device: L{objects.Disk}
72 @param device: the device to create
73 @param info: the extra 'metadata' we should attach to the device
74 (this will be represented as a LVM tag)
75 @type force_open: boolean
76 @param force_open: this parameter will be passes to the
77 L{backend.BlockdevCreate} function where it specifies
78 whether we run on primary or not, and it affects both
79 the child assembly and the device own Open() execution
80 @type excl_stor: boolean
81 @param excl_stor: Whether exclusive_storage is active for the node
82
83 """
84 lu.cfg.SetDiskID(device, node_uuid)
85 result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
86 instance.name, force_open, info,
87 excl_stor)
88 result.Raise("Can't create block device %s on"
89 " node %s for instance %s" % (device,
90 lu.cfg.GetNodeName(node_uuid),
91 instance.name))
92 if device.physical_id is None:
93 device.physical_id = result.payload
94
95
96 -def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
97 info, force_open, excl_stor):
98 """Create a tree of block devices on a given node.
99
100 If this device type has to be created on secondaries, create it and
101 all its children.
102
103 If not, just recurse to children keeping the same 'force' value.
104
105 @attention: The device has to be annotated already.
106
107 @param lu: the lu on whose behalf we execute
108 @param node_uuid: the node on which to create the device
109 @type instance: L{objects.Instance}
110 @param instance: the instance which owns the device
111 @type device: L{objects.Disk}
112 @param device: the device to create
113 @type force_create: boolean
114 @param force_create: whether to force creation of this device; this
115 will be change to True whenever we find a device which has
116 CreateOnSecondary() attribute
117 @param info: the extra 'metadata' we should attach to the device
118 (this will be represented as a LVM tag)
119 @type force_open: boolean
120 @param force_open: this parameter will be passes to the
121 L{backend.BlockdevCreate} function where it specifies
122 whether we run on primary or not, and it affects both
123 the child assembly and the device own Open() execution
124 @type excl_stor: boolean
125 @param excl_stor: Whether exclusive_storage is active for the node
126
127 @return: list of created devices
128 """
129 created_devices = []
130 try:
131 if device.CreateOnSecondary():
132 force_create = True
133
134 if device.children:
135 for child in device.children:
136 devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
137 force_create, info, force_open, excl_stor)
138 created_devices.extend(devs)
139
140 if not force_create:
141 return created_devices
142
143 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
144 excl_stor)
145
146
147 created_devices = [(node_uuid, device)]
148 return created_devices
149
150 except errors.DeviceCreationError, e:
151 e.created_devices.extend(created_devices)
152 raise e
153 except errors.OpExecError, e:
154 raise errors.DeviceCreationError(str(e), created_devices)
155
158 """Whether exclusive_storage is in effect for the given node.
159
160 @type cfg: L{config.ConfigWriter}
161 @param cfg: The cluster configuration
162 @type node_uuid: string
163 @param node_uuid: The node UUID
164 @rtype: bool
165 @return: The effective value of exclusive_storage
166 @raise errors.OpPrereqError: if no node exists with the given name
167
168 """
169 ni = cfg.GetNodeInfo(node_uuid)
170 if ni is None:
171 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
172 errors.ECODE_NOENT)
173 return IsExclusiveStorageEnabledNode(cfg, ni)
174
175
176 -def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
177 force_open):
178 """Wrapper around L{_CreateBlockDevInner}.
179
180 This method annotates the root device first.
181
182 """
183 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
184 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
185 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
186 force_open, excl_stor)
187
190 """Undo the work performed by L{CreateDisks}.
191
192 This function is called in case of an error to undo the work of
193 L{CreateDisks}.
194
195 @type lu: L{LogicalUnit}
196 @param lu: the logical unit on whose behalf we execute
197 @param disks_created: the result returned by L{CreateDisks}
198
199 """
200 for (node_uuid, disk) in disks_created:
201 lu.cfg.SetDiskID(disk, node_uuid)
202 result = lu.rpc.call_blockdev_remove(node_uuid, disk)
203 result.Warn("Failed to remove newly-created disk %s on node %s" %
204 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
205
206
207 -def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
208 """Create all disks for an instance.
209
210 This abstracts away some work from AddInstance.
211
212 @type lu: L{LogicalUnit}
213 @param lu: the logical unit on whose behalf we execute
214 @type instance: L{objects.Instance}
215 @param instance: the instance whose disks we should create
216 @type to_skip: list
217 @param to_skip: list of indices to skip
218 @type target_node_uuid: string
219 @param target_node_uuid: if passed, overrides the target node for creation
220 @type disks: list of {objects.Disk}
221 @param disks: the disks to create; if not specified, all the disks of the
222 instance are created
223 @return: information about the created disks, to be used to call
224 L{_UndoCreateDisks}
225 @raise errors.OpPrereqError: in case of error
226
227 """
228 info = GetInstanceInfoText(instance)
229 if target_node_uuid is None:
230 pnode_uuid = instance.primary_node
231 all_node_uuids = instance.all_nodes
232 else:
233 pnode_uuid = target_node_uuid
234 all_node_uuids = [pnode_uuid]
235
236 if disks is None:
237 disks = instance.disks
238
239 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
240
241 if instance.disk_template in constants.DTS_FILEBASED:
242 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
243 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
244
245 result.Raise("Failed to create directory '%s' on"
246 " node %s" % (file_storage_dir,
247 lu.cfg.GetNodeName(pnode_uuid)))
248
249 disks_created = []
250 for idx, device in enumerate(disks):
251 if to_skip and idx in to_skip:
252 continue
253 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
254 for node_uuid in all_node_uuids:
255 f_create = node_uuid == pnode_uuid
256 try:
257 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
258 f_create)
259 disks_created.append((node_uuid, device))
260 except errors.DeviceCreationError, e:
261 logging.warning("Creating disk %s for instance '%s' failed",
262 idx, instance.name)
263 disks_created.extend(e.created_devices)
264 _UndoCreateDisks(lu, disks_created)
265 raise errors.OpExecError(e.message)
266 return disks_created
267
270 """Compute disk size requirements in the volume group
271
272 """
273 def _compute(disks, payload):
274 """Universal algorithm.
275
276 """
277 vgs = {}
278 for disk in disks:
279 vgs[disk[constants.IDISK_VG]] = \
280 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
281
282 return vgs
283
284
285 req_size_dict = {
286 constants.DT_DISKLESS: {},
287 constants.DT_PLAIN: _compute(disks, 0),
288
289 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
290 constants.DT_FILE: {},
291 constants.DT_SHARED_FILE: {},
292 }
293
294 if disk_template not in req_size_dict:
295 raise errors.ProgrammerError("Disk template '%s' size requirement"
296 " is unknown" % disk_template)
297
298 return req_size_dict[disk_template]
299
302 """Computes the instance disks.
303
304 @param op: The instance opcode
305 @param default_vg: The default_vg to assume
306
307 @return: The computed disks
308
309 """
310 disks = []
311 for disk in op.disks:
312 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
313 if mode not in constants.DISK_ACCESS_SET:
314 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
315 mode, errors.ECODE_INVAL)
316 size = disk.get(constants.IDISK_SIZE, None)
317 if size is None:
318 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
319 try:
320 size = int(size)
321 except (TypeError, ValueError):
322 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
323 errors.ECODE_INVAL)
324
325 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
326 if ext_provider and op.disk_template != constants.DT_EXT:
327 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
328 " disk template, not %s" %
329 (constants.IDISK_PROVIDER, constants.DT_EXT,
330 op.disk_template), errors.ECODE_INVAL)
331
332 data_vg = disk.get(constants.IDISK_VG, default_vg)
333 name = disk.get(constants.IDISK_NAME, None)
334 if name is not None and name.lower() == constants.VALUE_NONE:
335 name = None
336 new_disk = {
337 constants.IDISK_SIZE: size,
338 constants.IDISK_MODE: mode,
339 constants.IDISK_VG: data_vg,
340 constants.IDISK_NAME: name,
341 }
342
343 for key in [
344 constants.IDISK_METAVG,
345 constants.IDISK_ADOPT,
346 constants.IDISK_SPINDLES,
347 ]:
348 if key in disk:
349 new_disk[key] = disk[key]
350
351
352
353 if op.disk_template == constants.DT_EXT:
354 if ext_provider:
355 new_disk[constants.IDISK_PROVIDER] = ext_provider
356 for key in disk:
357 if key not in constants.IDISK_PARAMS:
358 new_disk[key] = disk[key]
359 else:
360 raise errors.OpPrereqError("Missing provider for template '%s'" %
361 constants.DT_EXT, errors.ECODE_INVAL)
362
363 disks.append(new_disk)
364
365 return disks
366
369 """Compute disk size requirements inside the RADOS cluster.
370
371 """
372
373 pass
374
375
376 -def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
377 iv_name, p_minor, s_minor):
378 """Generate a drbd8 device complete with its children.
379
380 """
381 assert len(vgnames) == len(names) == 2
382 port = lu.cfg.AllocatePort()
383 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384
385 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
386 logical_id=(vgnames[0], names[0]),
387 params={})
388 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
390 size=constants.DRBD_META_SIZE,
391 logical_id=(vgnames[1], names[1]),
392 params={})
393 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
395 logical_id=(primary_uuid, secondary_uuid, port,
396 p_minor, s_minor,
397 shared_secret),
398 children=[dev_data, dev_meta],
399 iv_name=iv_name, params={})
400 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401 return drbd_dev
402
403
404 -def GenerateDiskTemplate(
405 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
406 disk_info, file_storage_dir, file_driver, base_index,
407 feedback_fn, full_disk_params):
408 """Generate the entire disk layout for a given template type.
409
410 """
411 vgname = lu.cfg.GetVGName()
412 disk_count = len(disk_info)
413 disks = []
414
415 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
416
417 if template_name == constants.DT_DISKLESS:
418 pass
419 elif template_name == constants.DT_DRBD8:
420 if len(secondary_node_uuids) != 1:
421 raise errors.ProgrammerError("Wrong template configuration")
422 remote_node_uuid = secondary_node_uuids[0]
423 minors = lu.cfg.AllocateDRBDMinor(
424 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
425
426 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
427 full_disk_params)
428 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
429
430 names = []
431 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
432 for i in range(disk_count)]):
433 names.append(lv_prefix + "_data")
434 names.append(lv_prefix + "_meta")
435 for idx, disk in enumerate(disk_info):
436 disk_index = idx + base_index
437 data_vg = disk.get(constants.IDISK_VG, vgname)
438 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
439 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
440 disk[constants.IDISK_SIZE],
441 [data_vg, meta_vg],
442 names[idx * 2:idx * 2 + 2],
443 "disk/%d" % disk_index,
444 minors[idx * 2], minors[idx * 2 + 1])
445 disk_dev.mode = disk[constants.IDISK_MODE]
446 disk_dev.name = disk.get(constants.IDISK_NAME, None)
447 disks.append(disk_dev)
448 else:
449 if secondary_node_uuids:
450 raise errors.ProgrammerError("Wrong template configuration")
451
452 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
453 if name_prefix is None:
454 names = None
455 else:
456 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
457 (name_prefix, base_index + i)
458 for i in range(disk_count)])
459
460 if template_name == constants.DT_PLAIN:
461
462 def logical_id_fn(idx, _, disk):
463 vg = disk.get(constants.IDISK_VG, vgname)
464 return (vg, names[idx])
465
466 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
467 logical_id_fn = \
468 lambda _, disk_index, disk: (file_driver,
469 "%s/%s" % (file_storage_dir,
470 names[idx]))
471 elif template_name == constants.DT_BLOCK:
472 logical_id_fn = \
473 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
474 disk[constants.IDISK_ADOPT])
475 elif template_name == constants.DT_RBD:
476 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
477 elif template_name == constants.DT_EXT:
478 def logical_id_fn(idx, _, disk):
479 provider = disk.get(constants.IDISK_PROVIDER, None)
480 if provider is None:
481 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
482 " not found", constants.DT_EXT,
483 constants.IDISK_PROVIDER)
484 return (provider, names[idx])
485 else:
486 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
487
488 dev_type = template_name
489
490 for idx, disk in enumerate(disk_info):
491 params = {}
492
493 if template_name == constants.DT_EXT:
494 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
495 for key in disk:
496 if key not in constants.IDISK_PARAMS:
497 params[key] = disk[key]
498 disk_index = idx + base_index
499 size = disk[constants.IDISK_SIZE]
500 feedback_fn("* disk %s, size %s" %
501 (disk_index, utils.FormatUnit(size, "h")))
502 disk_dev = objects.Disk(dev_type=dev_type, size=size,
503 logical_id=logical_id_fn(idx, disk_index, disk),
504 iv_name="disk/%d" % disk_index,
505 mode=disk[constants.IDISK_MODE],
506 params=params,
507 spindles=disk.get(constants.IDISK_SPINDLES))
508 disk_dev.name = disk.get(constants.IDISK_NAME, None)
509 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
510 disks.append(disk_dev)
511
512 return disks
513
516 """Check the presence of the spindle options with exclusive_storage.
517
518 @type diskdict: dict
519 @param diskdict: disk parameters
520 @type es_flag: bool
521 @param es_flag: the effective value of the exlusive_storage flag
522 @type required: bool
523 @param required: whether spindles are required or just optional
524 @raise errors.OpPrereqError when spindles are given and they should not
525
526 """
527 if (not es_flag and constants.IDISK_SPINDLES in diskdict and
528 diskdict[constants.IDISK_SPINDLES] is not None):
529 raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
530 " when exclusive storage is not active",
531 errors.ECODE_INVAL)
532 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
533 diskdict[constants.IDISK_SPINDLES] is None)):
534 raise errors.OpPrereqError("You must specify spindles in instance disks"
535 " when exclusive storage is active",
536 errors.ECODE_INVAL)
537
540 """Recreate an instance's missing disks.
541
542 """
543 HPATH = "instance-recreate-disks"
544 HTYPE = constants.HTYPE_INSTANCE
545 REQ_BGL = False
546
547 _MODIFYABLE = compat.UniqueFrozenset([
548 constants.IDISK_SIZE,
549 constants.IDISK_MODE,
550 constants.IDISK_SPINDLES,
551 ])
552
553
554 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555 constants.IDISK_ADOPT,
556
557
558 constants.IDISK_VG,
559 constants.IDISK_METAVG,
560 constants.IDISK_PROVIDER,
561 constants.IDISK_NAME,
562 ]))
563
565 """Run the allocator based on input opcode.
566
567 """
568 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587 disk_template = self.instance.disk_template
588 spindle_use = be_full[constants.BE_SPINDLE_USE]
589 disks = [{
590 constants.IDISK_SIZE: d.size,
591 constants.IDISK_MODE: d.mode,
592 constants.IDISK_SPINDLES: d.spindles,
593 } for d in self.instance.disks]
594 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
595 disk_template=disk_template,
596 tags=list(self.instance.GetTags()),
597 os=self.instance.os,
598 nics=[{}],
599 vcpus=be_full[constants.BE_VCPUS],
600 memory=be_full[constants.BE_MAXMEM],
601 spindle_use=spindle_use,
602 disks=disks,
603 hypervisor=self.instance.hypervisor,
604 node_whitelist=None)
605 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
606
607 ial.Run(self.op.iallocator)
608
609 assert req.RequiredNodes() == len(self.instance.all_nodes)
610
611 if not ial.success:
612 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
613 " %s" % (self.op.iallocator, ial.info),
614 errors.ECODE_NORES)
615
616 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
617 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
618 self.op.instance_name, self.op.iallocator,
619 utils.CommaJoin(self.op.nodes))
620
645
661
663 if level == locking.LEVEL_NODEGROUP:
664 assert self.op.iallocator is not None
665 assert not self.op.nodes
666 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
667 self.share_locks[locking.LEVEL_NODEGROUP] = 1
668
669
670
671 self.needed_locks[locking.LEVEL_NODEGROUP] = \
672 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
673
674 elif level == locking.LEVEL_NODE:
675
676
677
678
679
680 if self.op.iallocator:
681 assert not self.op.nodes
682 assert not self.needed_locks[locking.LEVEL_NODE]
683 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
684
685
686 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
687 self.needed_locks[locking.LEVEL_NODE].extend(
688 self.cfg.GetNodeGroup(group_uuid).members)
689
690 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
691 elif not self.op.nodes:
692 self._LockInstancesNodes(primary_only=False)
693 elif level == locking.LEVEL_NODE_RES:
694
695 self.needed_locks[locking.LEVEL_NODE_RES] = \
696 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
697
705
712
714 """Check prerequisites.
715
716 This checks that the instance is in the cluster and is not running.
717
718 """
719 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
720 assert instance is not None, \
721 "Cannot retrieve locked instance %s" % self.op.instance_name
722 if self.op.node_uuids:
723 if len(self.op.node_uuids) != len(instance.all_nodes):
724 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
725 " %d replacement nodes were specified" %
726 (instance.name, len(instance.all_nodes),
727 len(self.op.node_uuids)),
728 errors.ECODE_INVAL)
729 assert instance.disk_template != constants.DT_DRBD8 or \
730 len(self.op.node_uuids) == 2
731 assert instance.disk_template != constants.DT_PLAIN or \
732 len(self.op.node_uuids) == 1
733 primary_node = self.op.node_uuids[0]
734 else:
735 primary_node = instance.primary_node
736 if not self.op.iallocator:
737 CheckNodeOnline(self, primary_node)
738
739 if instance.disk_template == constants.DT_DISKLESS:
740 raise errors.OpPrereqError("Instance '%s' has no disks" %
741 self.op.instance_name, errors.ECODE_INVAL)
742
743
744 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
745 if owned_groups:
746
747
748 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
749 primary_only=True)
750
751
752
753 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
754 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
755 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
756 msg="cannot recreate disks")
757
758 if self.op.disks:
759 self.disks = dict(self.op.disks)
760 else:
761 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
762
763 maxidx = max(self.disks.keys())
764 if maxidx >= len(instance.disks):
765 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
766 errors.ECODE_INVAL)
767
768 if ((self.op.node_uuids or self.op.iallocator) and
769 sorted(self.disks.keys()) != range(len(instance.disks))):
770 raise errors.OpPrereqError("Can't recreate disks partially and"
771 " change the nodes at the same time",
772 errors.ECODE_INVAL)
773
774 self.instance = instance
775
776 if self.op.iallocator:
777 self._RunAllocator()
778
779 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
780 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
781 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
782
783 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
784
785 if self.op.node_uuids:
786 node_uuids = self.op.node_uuids
787 else:
788 node_uuids = instance.all_nodes
789 excl_stor = compat.any(
790 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
791 )
792 for new_params in self.disks.values():
793 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
794
795 - def Exec(self, feedback_fn):
796 """Recreate the disks.
797
798 """
799 assert (self.owned_locks(locking.LEVEL_NODE) ==
800 self.owned_locks(locking.LEVEL_NODE_RES))
801
802 to_skip = []
803 mods = []
804
805 for idx, disk in enumerate(self.instance.disks):
806 try:
807 changes = self.disks[idx]
808 except KeyError:
809
810 to_skip.append(idx)
811 continue
812
813
814 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
815
816 assert len(self.op.node_uuids) == 2
817 assert len(disk.logical_id) == 6
818
819 (_, _, old_port, _, _, old_secret) = disk.logical_id
820 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
821 self.instance.uuid)
822 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
823 new_minors[0], new_minors[1], old_secret)
824 assert len(disk.logical_id) == len(new_id)
825 else:
826 new_id = None
827
828 mods.append((idx, new_id, changes))
829
830
831
832 for idx, new_id, changes in mods:
833 disk = self.instance.disks[idx]
834 if new_id is not None:
835 assert disk.dev_type == constants.DT_DRBD8
836 disk.logical_id = new_id
837 if changes:
838 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
839 mode=changes.get(constants.IDISK_MODE, None),
840 spindles=changes.get(constants.IDISK_SPINDLES, None))
841
842
843 if self.op.node_uuids:
844 self.instance.primary_node = self.op.node_uuids[0]
845 self.LogWarning("Changing the instance's nodes, you will have to"
846 " remove any disks left on the older nodes manually")
847
848 if self.op.node_uuids:
849 self.cfg.Update(self.instance, feedback_fn)
850
851
852 mylocks = self.owned_locks(locking.LEVEL_NODE)
853 assert mylocks.issuperset(frozenset(self.instance.all_nodes))
854 new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
855
856
857 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
858 wipedisks = [(idx, disk, 0)
859 for (idx, disk) in enumerate(self.instance.disks)
860 if idx not in to_skip]
861 WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
862 cleanup=new_disks)
863
884
887 """Checks the vg capacity for a given node.
888
889 @type node_info: tuple (_, list of dicts, _)
890 @param node_info: the result of the node info call for one node
891 @type node_name: string
892 @param node_name: the name of the node
893 @type vg: string
894 @param vg: volume group name
895 @type requested: int
896 @param requested: the amount of disk in MiB to check for
897 @raise errors.OpPrereqError: if the node doesn't have enough disk,
898 or we cannot check the node
899
900 """
901 (_, space_info, _) = node_info
902 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
903 space_info, constants.ST_LVM_VG)
904 if not lvm_vg_info:
905 raise errors.OpPrereqError("Can't retrieve storage information for LVM")
906 vg_free = lvm_vg_info.get("storage_free", None)
907 if not isinstance(vg_free, int):
908 raise errors.OpPrereqError("Can't compute free disk space on node"
909 " %s for vg %s, result was '%s'" %
910 (node_name, vg, vg_free), errors.ECODE_ENVIRON)
911 if requested > vg_free:
912 raise errors.OpPrereqError("Not enough disk space on target node %s"
913 " vg %s: required %d MiB, available %d MiB" %
914 (node_name, vg, requested, vg_free),
915 errors.ECODE_NORES)
916
919 """Checks if nodes have enough free disk space in the specified VG.
920
921 This function checks if all given nodes have the needed amount of
922 free disk. In case any node has less disk or we cannot get the
923 information from the node, this function raises an OpPrereqError
924 exception.
925
926 @type lu: C{LogicalUnit}
927 @param lu: a logical unit from which we get configuration data
928 @type node_uuids: C{list}
929 @param node_uuids: the list of node UUIDs to check
930 @type vg: C{str}
931 @param vg: the volume group to check
932 @type requested: C{int}
933 @param requested: the amount of disk in MiB to check for
934 @raise errors.OpPrereqError: if the node doesn't have enough disk,
935 or we cannot check the node
936
937 """
938 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
939 for node in node_uuids:
940 node_name = lu.cfg.GetNodeName(node)
941 info = nodeinfo[node]
942 info.Raise("Cannot get current information from node %s" % node_name,
943 prereq=True, ecode=errors.ECODE_ENVIRON)
944 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
945
948 """Checks if nodes have enough free disk space in all the VGs.
949
950 This function checks if all given nodes have the needed amount of
951 free disk. In case any node has less disk or we cannot get the
952 information from the node, this function raises an OpPrereqError
953 exception.
954
955 @type lu: C{LogicalUnit}
956 @param lu: a logical unit from which we get configuration data
957 @type node_uuids: C{list}
958 @param node_uuids: the list of node UUIDs to check
959 @type req_sizes: C{dict}
960 @param req_sizes: the hash of vg and corresponding amount of disk in
961 MiB to check for
962 @raise errors.OpPrereqError: if the node doesn't have enough disk,
963 or we cannot check the node
964
965 """
966 for vg, req_size in req_sizes.items():
967 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
968
971 """Converts a disk size in bytes to mebibytes.
972
973 Warns and rounds up if the size isn't an even multiple of 1 MiB.
974
975 """
976 (mib, remainder) = divmod(size, 1024 * 1024)
977
978 if remainder != 0:
979 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
980 " to not overwrite existing data (%s bytes will not be"
981 " wiped)", (1024 * 1024) - remainder)
982 mib += 1
983
984 return mib
985
986
987 -def _CalcEta(time_taken, written, total_size):
988 """Calculates the ETA based on size written and total size.
989
990 @param time_taken: The time taken so far
991 @param written: amount written so far
992 @param total_size: The total size of data to be written
993 @return: The remaining time in seconds
994
995 """
996 avg_time = time_taken / float(written)
997 return (total_size - written) * avg_time
998
999
1000 -def WipeDisks(lu, instance, disks=None):
1001 """Wipes instance disks.
1002
1003 @type lu: L{LogicalUnit}
1004 @param lu: the logical unit on whose behalf we execute
1005 @type instance: L{objects.Instance}
1006 @param instance: the instance whose disks we should create
1007 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1008 @param disks: Disk details; tuple contains disk index, disk object and the
1009 start offset
1010
1011 """
1012 node_uuid = instance.primary_node
1013 node_name = lu.cfg.GetNodeName(node_uuid)
1014
1015 if disks is None:
1016 disks = [(idx, disk, 0)
1017 for (idx, disk) in enumerate(instance.disks)]
1018
1019 for (_, device, _) in disks:
1020 lu.cfg.SetDiskID(device, node_uuid)
1021
1022 logging.info("Pausing synchronization of disks of instance '%s'",
1023 instance.name)
1024 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1025 (map(compat.snd, disks),
1026 instance),
1027 True)
1028 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1029
1030 for idx, success in enumerate(result.payload):
1031 if not success:
1032 logging.warn("Pausing synchronization of disk %s of instance '%s'"
1033 " failed", idx, instance.name)
1034
1035 try:
1036 for (idx, device, offset) in disks:
1037
1038
1039 wipe_chunk_size = \
1040 int(min(constants.MAX_WIPE_CHUNK,
1041 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1042
1043 size = device.size
1044 last_output = 0
1045 start_time = time.time()
1046
1047 if offset == 0:
1048 info_text = ""
1049 else:
1050 info_text = (" (from %s to %s)" %
1051 (utils.FormatUnit(offset, "h"),
1052 utils.FormatUnit(size, "h")))
1053
1054 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1055
1056 logging.info("Wiping disk %d for instance %s on node %s using"
1057 " chunk size %s", idx, instance.name, node_name,
1058 wipe_chunk_size)
1059
1060 while offset < size:
1061 wipe_size = min(wipe_chunk_size, size - offset)
1062
1063 logging.debug("Wiping disk %d, offset %s, chunk %s",
1064 idx, offset, wipe_size)
1065
1066 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1067 offset, wipe_size)
1068 result.Raise("Could not wipe disk %d at offset %d for size %d" %
1069 (idx, offset, wipe_size))
1070
1071 now = time.time()
1072 offset += wipe_size
1073 if now - last_output >= 60:
1074 eta = _CalcEta(now - start_time, offset, size)
1075 lu.LogInfo(" - done: %.1f%% ETA: %s",
1076 offset / float(size) * 100, utils.FormatSeconds(eta))
1077 last_output = now
1078 finally:
1079 logging.info("Resuming synchronization of disks for instance '%s'",
1080 instance.name)
1081
1082 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1083 (map(compat.snd, disks),
1084 instance),
1085 False)
1086
1087 if result.fail_msg:
1088 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1089 node_name, result.fail_msg)
1090 else:
1091 for idx, success in enumerate(result.payload):
1092 if not success:
1093 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1094 " failed", idx, instance.name)
1095
1098 """Wrapper for L{WipeDisks} that handles errors.
1099
1100 @type lu: L{LogicalUnit}
1101 @param lu: the logical unit on whose behalf we execute
1102 @type instance: L{objects.Instance}
1103 @param instance: the instance whose disks we should wipe
1104 @param disks: see L{WipeDisks}
1105 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1106 case of error
1107 @raise errors.OpPrereqError: in case of failure
1108
1109 """
1110 try:
1111 WipeDisks(lu, instance, disks=disks)
1112 except errors.OpExecError:
1113 logging.warning("Wiping disks for instance '%s' failed",
1114 instance.name)
1115 _UndoCreateDisks(lu, cleanup)
1116 raise
1117
1120 """Return the instance disks selected by the disks list
1121
1122 @type disks: list of L{objects.Disk} or None
1123 @param disks: selected disks
1124 @rtype: list of L{objects.Disk}
1125 @return: selected instance disks to act on
1126
1127 """
1128 if disks is None:
1129 return instance.disks
1130 else:
1131 if not set(disks).issubset(instance.disks):
1132 raise errors.ProgrammerError("Can only act on disks belonging to the"
1133 " target instance: expected a subset of %r,"
1134 " got %r" % (instance.disks, disks))
1135 return disks
1136
1137
1138 -def WaitForSync(lu, instance, disks=None, oneshot=False):
1139 """Sleep and poll for an instance's disk to sync.
1140
1141 """
1142 if not instance.disks or disks is not None and not disks:
1143 return True
1144
1145 disks = ExpandCheckDisks(instance, disks)
1146
1147 if not oneshot:
1148 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1149
1150 node_uuid = instance.primary_node
1151 node_name = lu.cfg.GetNodeName(node_uuid)
1152
1153 for dev in disks:
1154 lu.cfg.SetDiskID(dev, node_uuid)
1155
1156
1157
1158 retries = 0
1159 degr_retries = 10
1160 while True:
1161 max_time = 0
1162 done = True
1163 cumul_degraded = False
1164 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1165 msg = rstats.fail_msg
1166 if msg:
1167 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1168 retries += 1
1169 if retries >= 10:
1170 raise errors.RemoteError("Can't contact node %s for mirror data,"
1171 " aborting." % node_name)
1172 time.sleep(6)
1173 continue
1174 rstats = rstats.payload
1175 retries = 0
1176 for i, mstat in enumerate(rstats):
1177 if mstat is None:
1178 lu.LogWarning("Can't compute data for node %s/%s",
1179 node_name, disks[i].iv_name)
1180 continue
1181
1182 cumul_degraded = (cumul_degraded or
1183 (mstat.is_degraded and mstat.sync_percent is None))
1184 if mstat.sync_percent is not None:
1185 done = False
1186 if mstat.estimated_time is not None:
1187 rem_time = ("%s remaining (estimated)" %
1188 utils.FormatSeconds(mstat.estimated_time))
1189 max_time = mstat.estimated_time
1190 else:
1191 rem_time = "no time estimate"
1192 lu.LogInfo("- device %s: %5.2f%% done, %s",
1193 disks[i].iv_name, mstat.sync_percent, rem_time)
1194
1195
1196
1197
1198 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1199 logging.info("Degraded disks found, %d retries left", degr_retries)
1200 degr_retries -= 1
1201 time.sleep(1)
1202 continue
1203
1204 if done or oneshot:
1205 break
1206
1207 time.sleep(min(60, max_time))
1208
1209 if done:
1210 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1211
1212 return not cumul_degraded
1213
1216 """Shutdown block devices of an instance.
1217
1218 This does the shutdown on all nodes of the instance.
1219
1220 If the ignore_primary is false, errors on the primary node are
1221 ignored.
1222
1223 """
1224 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1225 all_result = True
1226 disks = ExpandCheckDisks(instance, disks)
1227
1228 for disk in disks:
1229 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1230 lu.cfg.SetDiskID(top_disk, node_uuid)
1231 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1232 msg = result.fail_msg
1233 if msg:
1234 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1235 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1236 if ((node_uuid == instance.primary_node and not ignore_primary) or
1237 (node_uuid != instance.primary_node and not result.offline)):
1238 all_result = False
1239 return all_result
1240
1251
1252
1253 -def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1254 ignore_size=False):
1255 """Prepare the block devices for an instance.
1256
1257 This sets up the block devices on all nodes.
1258
1259 @type lu: L{LogicalUnit}
1260 @param lu: the logical unit on whose behalf we execute
1261 @type instance: L{objects.Instance}
1262 @param instance: the instance for whose disks we assemble
1263 @type disks: list of L{objects.Disk} or None
1264 @param disks: which disks to assemble (or all, if None)
1265 @type ignore_secondaries: boolean
1266 @param ignore_secondaries: if true, errors on secondary nodes
1267 won't result in an error return from the function
1268 @type ignore_size: boolean
1269 @param ignore_size: if true, the current known size of the disk
1270 will not be used during the disk activation, useful for cases
1271 when the size is wrong
1272 @return: False if the operation failed, otherwise a list of
1273 (host, instance_visible_name, node_visible_name)
1274 with the mapping from node devices to instance devices
1275
1276 """
1277 device_info = []
1278 disks_ok = True
1279 disks = ExpandCheckDisks(instance, disks)
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292 lu.cfg.MarkInstanceDisksActive(instance.uuid)
1293
1294
1295 for idx, inst_disk in enumerate(disks):
1296 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1297 instance.primary_node):
1298 if ignore_size:
1299 node_disk = node_disk.Copy()
1300 node_disk.UnsetSize()
1301 lu.cfg.SetDiskID(node_disk, node_uuid)
1302 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1303 instance.name, False, idx)
1304 msg = result.fail_msg
1305 if msg:
1306 is_offline_secondary = (node_uuid in instance.secondary_nodes and
1307 result.offline)
1308 lu.LogWarning("Could not prepare block device %s on node %s"
1309 " (is_primary=False, pass=1): %s",
1310 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1311 if not (ignore_secondaries or is_offline_secondary):
1312 disks_ok = False
1313
1314
1315
1316
1317 for idx, inst_disk in enumerate(disks):
1318 dev_path = None
1319
1320 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1321 instance.primary_node):
1322 if node_uuid != instance.primary_node:
1323 continue
1324 if ignore_size:
1325 node_disk = node_disk.Copy()
1326 node_disk.UnsetSize()
1327 lu.cfg.SetDiskID(node_disk, node_uuid)
1328 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1329 instance.name, True, idx)
1330 msg = result.fail_msg
1331 if msg:
1332 lu.LogWarning("Could not prepare block device %s on node %s"
1333 " (is_primary=True, pass=2): %s",
1334 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1335 disks_ok = False
1336 else:
1337 dev_path = result.payload
1338
1339 device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1340 inst_disk.iv_name, dev_path))
1341
1342
1343
1344
1345 for disk in disks:
1346 lu.cfg.SetDiskID(disk, instance.primary_node)
1347
1348 if not disks_ok:
1349 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1350
1351 return disks_ok, device_info
1352
1355 """Start the disks of an instance.
1356
1357 """
1358 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1359 ignore_secondaries=force)
1360 if not disks_ok:
1361 ShutdownInstanceDisks(lu, instance)
1362 if force is not None and not force:
1363 lu.LogWarning("",
1364 hint=("If the message above refers to a secondary node,"
1365 " you can retry the operation using '--force'"))
1366 raise errors.OpExecError("Disk consistency error")
1367
1370 """Grow a disk of an instance.
1371
1372 """
1373 HPATH = "disk-grow"
1374 HTYPE = constants.HTYPE_INSTANCE
1375 REQ_BGL = False
1376
1383
1391
1393 """Build hooks env.
1394
1395 This runs on the master, the primary and all the secondaries.
1396
1397 """
1398 env = {
1399 "DISK": self.op.disk,
1400 "AMOUNT": self.op.amount,
1401 "ABSOLUTE": self.op.absolute,
1402 }
1403 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1404 return env
1405
1412
1414 """Check prerequisites.
1415
1416 This checks that the instance is in the cluster.
1417
1418 """
1419 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1420 assert self.instance is not None, \
1421 "Cannot retrieve locked instance %s" % self.op.instance_name
1422 node_uuids = list(self.instance.all_nodes)
1423 for node_uuid in node_uuids:
1424 CheckNodeOnline(self, node_uuid)
1425 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1426
1427 if self.instance.disk_template not in constants.DTS_GROWABLE:
1428 raise errors.OpPrereqError("Instance's disk layout does not support"
1429 " growing", errors.ECODE_INVAL)
1430
1431 self.disk = self.instance.FindDisk(self.op.disk)
1432
1433 if self.op.absolute:
1434 self.target = self.op.amount
1435 self.delta = self.target - self.disk.size
1436 if self.delta < 0:
1437 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1438 "current disk size (%s)" %
1439 (utils.FormatUnit(self.target, "h"),
1440 utils.FormatUnit(self.disk.size, "h")),
1441 errors.ECODE_STATE)
1442 else:
1443 self.delta = self.op.amount
1444 self.target = self.disk.size + self.delta
1445 if self.delta < 0:
1446 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1447 utils.FormatUnit(self.delta, "h"),
1448 errors.ECODE_INVAL)
1449
1450 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1451
1462
1463 - def Exec(self, feedback_fn):
1464 """Execute disk grow.
1465
1466 """
1467 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1468 assert (self.owned_locks(locking.LEVEL_NODE) ==
1469 self.owned_locks(locking.LEVEL_NODE_RES))
1470
1471 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1472
1473 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1474 if not disks_ok:
1475 raise errors.OpExecError("Cannot activate block device to grow")
1476
1477 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1478 (self.op.disk, self.instance.name,
1479 utils.FormatUnit(self.delta, "h"),
1480 utils.FormatUnit(self.target, "h")))
1481
1482
1483 for node_uuid in self.instance.all_nodes:
1484 self.cfg.SetDiskID(self.disk, node_uuid)
1485 result = self.rpc.call_blockdev_grow(node_uuid,
1486 (self.disk, self.instance),
1487 self.delta, True, True,
1488 self.node_es_flags[node_uuid])
1489 result.Raise("Dry-run grow request failed to node %s" %
1490 self.cfg.GetNodeName(node_uuid))
1491
1492 if wipe_disks:
1493
1494 self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1495 result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1496 [self.disk])
1497 result.Raise("Failed to retrieve disk size from node '%s'" %
1498 self.instance.primary_node)
1499
1500 (disk_dimensions, ) = result.payload
1501
1502 if disk_dimensions is None:
1503 raise errors.OpExecError("Failed to retrieve disk size from primary"
1504 " node '%s'" % self.instance.primary_node)
1505 (disk_size_in_bytes, _) = disk_dimensions
1506
1507 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1508
1509 assert old_disk_size >= self.disk.size, \
1510 ("Retrieved disk size too small (got %s, should be at least %s)" %
1511 (old_disk_size, self.disk.size))
1512 else:
1513 old_disk_size = None
1514
1515
1516
1517 for node_uuid in self.instance.all_nodes:
1518 self.cfg.SetDiskID(self.disk, node_uuid)
1519 result = self.rpc.call_blockdev_grow(node_uuid,
1520 (self.disk, self.instance),
1521 self.delta, False, True,
1522 self.node_es_flags[node_uuid])
1523 result.Raise("Grow request failed to node %s" %
1524 self.cfg.GetNodeName(node_uuid))
1525
1526
1527 node_uuid = self.instance.primary_node
1528 self.cfg.SetDiskID(self.disk, node_uuid)
1529 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1530 self.delta, False, False,
1531 self.node_es_flags[node_uuid])
1532 result.Raise("Grow request failed to node %s" %
1533 self.cfg.GetNodeName(node_uuid))
1534
1535 self.disk.RecordGrow(self.delta)
1536 self.cfg.Update(self.instance, feedback_fn)
1537
1538
1539 ReleaseLocks(self, locking.LEVEL_NODE)
1540
1541
1542 self.glm.downgrade(locking.LEVEL_INSTANCE)
1543
1544 assert wipe_disks ^ (old_disk_size is None)
1545
1546 if wipe_disks:
1547 assert self.instance.disks[self.op.disk] == self.disk
1548
1549
1550 WipeDisks(self, self.instance,
1551 disks=[(self.op.disk, self.disk, old_disk_size)])
1552
1553 if self.op.wait_for_sync:
1554 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1555 if disk_abort:
1556 self.LogWarning("Disk syncing has not returned a good status; check"
1557 " the instance")
1558 if not self.instance.disks_active:
1559 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1560 elif not self.instance.disks_active:
1561 self.LogWarning("Not shutting down the disk even if the instance is"
1562 " not supposed to be running because no wait for"
1563 " sync mode was requested")
1564
1565 assert self.owned_locks(locking.LEVEL_NODE_RES)
1566 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1567
1570 """Replace the disks of an instance.
1571
1572 """
1573 HPATH = "mirrors-replace"
1574 HTYPE = constants.HTYPE_INSTANCE
1575 REQ_BGL = False
1576
1594
1596 self._ExpandAndLockInstance()
1597
1598 assert locking.LEVEL_NODE not in self.needed_locks
1599 assert locking.LEVEL_NODE_RES not in self.needed_locks
1600 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1601
1602 assert self.op.iallocator is None or self.op.remote_node is None, \
1603 "Conflicting options"
1604
1605 if self.op.remote_node is not None:
1606 (self.op.remote_node_uuid, self.op.remote_node) = \
1607 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1608 self.op.remote_node)
1609
1610
1611
1612
1613
1614 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1615 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1616 else:
1617 self.needed_locks[locking.LEVEL_NODE] = []
1618 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1619
1620 if self.op.iallocator is not None:
1621
1622 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1623 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1624
1625 self.needed_locks[locking.LEVEL_NODE_RES] = []
1626
1627 self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1628 self.op.instance_name, self.op.mode,
1629 self.op.iallocator, self.op.remote_node_uuid,
1630 self.op.disks, self.op.early_release,
1631 self.op.ignore_ipolicy)
1632
1633 self.tasklets = [self.replacer]
1634
1636 if level == locking.LEVEL_NODEGROUP:
1637 assert self.op.remote_node_uuid is None
1638 assert self.op.iallocator is not None
1639 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1640
1641 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1642
1643
1644 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1645 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1646
1647 elif level == locking.LEVEL_NODE:
1648 if self.op.iallocator is not None:
1649 assert self.op.remote_node_uuid is None
1650 assert not self.needed_locks[locking.LEVEL_NODE]
1651 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1652
1653
1654 self.needed_locks[locking.LEVEL_NODE] = \
1655 [node_uuid
1656 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1657 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1658 else:
1659 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1660
1661 self._LockInstancesNodes()
1662
1663 elif level == locking.LEVEL_NODE_RES:
1664
1665 self.needed_locks[locking.LEVEL_NODE_RES] = \
1666 self.needed_locks[locking.LEVEL_NODE]
1667
1682
1684 """Build hooks nodes.
1685
1686 """
1687 instance = self.replacer.instance
1688 nl = [
1689 self.cfg.GetMasterNode(),
1690 instance.primary_node,
1691 ]
1692 if self.op.remote_node_uuid is not None:
1693 nl.append(self.op.remote_node_uuid)
1694 return nl, nl
1695
1709
1712 """Bring up an instance's disks.
1713
1714 """
1715 REQ_BGL = False
1716
1721
1725
1727 """Check prerequisites.
1728
1729 This checks that the instance is in the cluster.
1730
1731 """
1732 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1733 assert self.instance is not None, \
1734 "Cannot retrieve locked instance %s" % self.op.instance_name
1735 CheckNodeOnline(self, self.instance.primary_node)
1736
1737 - def Exec(self, feedback_fn):
1753
1756 """Shutdown an instance's disks.
1757
1758 """
1759 REQ_BGL = False
1760
1765
1769
1771 """Check prerequisites.
1772
1773 This checks that the instance is in the cluster.
1774
1775 """
1776 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1777 assert self.instance is not None, \
1778 "Cannot retrieve locked instance %s" % self.op.instance_name
1779
1780 - def Exec(self, feedback_fn):
1788
1792 """Check that mirrors are not degraded.
1793
1794 @attention: The device has to be annotated already.
1795
1796 The ldisk parameter, if True, will change the test from the
1797 is_degraded attribute (which represents overall non-ok status for
1798 the device(s)) to the ldisk (representing the local storage status).
1799
1800 """
1801 lu.cfg.SetDiskID(dev, node_uuid)
1802
1803 result = True
1804
1805 if on_primary or dev.AssembleOnSecondary():
1806 rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1807 msg = rstats.fail_msg
1808 if msg:
1809 lu.LogWarning("Can't find disk on node %s: %s",
1810 lu.cfg.GetNodeName(node_uuid), msg)
1811 result = False
1812 elif not rstats.payload:
1813 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1814 result = False
1815 else:
1816 if ldisk:
1817 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1818 else:
1819 result = result and not rstats.payload.is_degraded
1820
1821 if dev.children:
1822 for child in dev.children:
1823 result = result and _CheckDiskConsistencyInner(lu, instance, child,
1824 node_uuid, on_primary)
1825
1826 return result
1827
1836
1839 """Wrapper around call_blockdev_find to annotate diskparams.
1840
1841 @param lu: A reference to the lu object
1842 @param node_uuid: The node to call out
1843 @param dev: The device to find
1844 @param instance: The instance object the device belongs to
1845 @returns The result of the rpc call
1846
1847 """
1848 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1849 return lu.rpc.call_blockdev_find(node_uuid, disk)
1850
1853 """Generate a suitable LV name.
1854
1855 This will generate a logical volume name for the given instance.
1856
1857 """
1858 results = []
1859 for val in exts:
1860 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1861 results.append("%s%s" % (new_id, val))
1862 return results
1863
1866 """Replaces disks for an instance.
1867
1868 Note: Locking is not within the scope of this class.
1869
1870 """
1871 - def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1872 remote_node_uuid, disks, early_release, ignore_ipolicy):
1873 """Initializes this class.
1874
1875 """
1876 Tasklet.__init__(self, lu)
1877
1878
1879 self.instance_uuid = instance_uuid
1880 self.instance_name = instance_name
1881 self.mode = mode
1882 self.iallocator_name = iallocator_name
1883 self.remote_node_uuid = remote_node_uuid
1884 self.disks = disks
1885 self.early_release = early_release
1886 self.ignore_ipolicy = ignore_ipolicy
1887
1888
1889 self.instance = None
1890 self.new_node_uuid = None
1891 self.target_node_uuid = None
1892 self.other_node_uuid = None
1893 self.remote_node_info = None
1894 self.node_secondary_ip = None
1895
1896 @staticmethod
1897 - def _RunAllocator(lu, iallocator_name, instance_uuid,
1898 relocate_from_node_uuids):
1899 """Compute a new secondary node using an IAllocator.
1900
1901 """
1902 req = iallocator.IAReqRelocate(
1903 inst_uuid=instance_uuid,
1904 relocate_from_node_uuids=list(relocate_from_node_uuids))
1905 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1906
1907 ial.Run(iallocator_name)
1908
1909 if not ial.success:
1910 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1911 " %s" % (iallocator_name, ial.info),
1912 errors.ECODE_NORES)
1913
1914 remote_node_name = ial.result[0]
1915 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1916
1917 if remote_node is None:
1918 raise errors.OpPrereqError("Node %s not found in configuration" %
1919 remote_node_name, errors.ECODE_NOENT)
1920
1921 lu.LogInfo("Selected new secondary for instance '%s': %s",
1922 instance_uuid, remote_node_name)
1923
1924 return remote_node.uuid
1925
1932
1934 """Checks if the instance disks are activated.
1935
1936 @param instance: The instance to check disks
1937 @return: True if they are activated, False otherwise
1938
1939 """
1940 node_uuids = instance.all_nodes
1941
1942 for idx, dev in enumerate(instance.disks):
1943 for node_uuid in node_uuids:
1944 self.lu.LogInfo("Checking disk/%d on %s", idx,
1945 self.cfg.GetNodeName(node_uuid))
1946 self.cfg.SetDiskID(dev, node_uuid)
1947
1948 result = _BlockdevFind(self, node_uuid, dev, instance)
1949
1950 if result.offline:
1951 continue
1952 elif result.fail_msg or not result.payload:
1953 return False
1954
1955 return True
1956
1958 """Check prerequisites.
1959
1960 This checks that the instance is in the cluster.
1961
1962 """
1963 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1964 assert self.instance is not None, \
1965 "Cannot retrieve locked instance %s" % self.instance_name
1966
1967 if self.instance.disk_template != constants.DT_DRBD8:
1968 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1969 " instances", errors.ECODE_INVAL)
1970
1971 if len(self.instance.secondary_nodes) != 1:
1972 raise errors.OpPrereqError("The instance has a strange layout,"
1973 " expected one secondary but found %d" %
1974 len(self.instance.secondary_nodes),
1975 errors.ECODE_FAULT)
1976
1977 secondary_node_uuid = self.instance.secondary_nodes[0]
1978
1979 if self.iallocator_name is None:
1980 remote_node_uuid = self.remote_node_uuid
1981 else:
1982 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1983 self.instance.uuid,
1984 self.instance.secondary_nodes)
1985
1986 if remote_node_uuid is None:
1987 self.remote_node_info = None
1988 else:
1989 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1990 "Remote node '%s' is not locked" % remote_node_uuid
1991
1992 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1993 assert self.remote_node_info is not None, \
1994 "Cannot retrieve locked node %s" % remote_node_uuid
1995
1996 if remote_node_uuid == self.instance.primary_node:
1997 raise errors.OpPrereqError("The specified node is the primary node of"
1998 " the instance", errors.ECODE_INVAL)
1999
2000 if remote_node_uuid == secondary_node_uuid:
2001 raise errors.OpPrereqError("The specified node is already the"
2002 " secondary node of the instance",
2003 errors.ECODE_INVAL)
2004
2005 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2006 constants.REPLACE_DISK_CHG):
2007 raise errors.OpPrereqError("Cannot specify disks to be replaced",
2008 errors.ECODE_INVAL)
2009
2010 if self.mode == constants.REPLACE_DISK_AUTO:
2011 if not self._CheckDisksActivated(self.instance):
2012 raise errors.OpPrereqError("Please run activate-disks on instance %s"
2013 " first" % self.instance_name,
2014 errors.ECODE_STATE)
2015 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2016 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2017
2018 if faulty_primary and faulty_secondary:
2019 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2020 " one node and can not be repaired"
2021 " automatically" % self.instance_name,
2022 errors.ECODE_STATE)
2023
2024 if faulty_primary:
2025 self.disks = faulty_primary
2026 self.target_node_uuid = self.instance.primary_node
2027 self.other_node_uuid = secondary_node_uuid
2028 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2029 elif faulty_secondary:
2030 self.disks = faulty_secondary
2031 self.target_node_uuid = secondary_node_uuid
2032 self.other_node_uuid = self.instance.primary_node
2033 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2034 else:
2035 self.disks = []
2036 check_nodes = []
2037
2038 else:
2039
2040 if self.mode == constants.REPLACE_DISK_PRI:
2041 self.target_node_uuid = self.instance.primary_node
2042 self.other_node_uuid = secondary_node_uuid
2043 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2044
2045 elif self.mode == constants.REPLACE_DISK_SEC:
2046 self.target_node_uuid = secondary_node_uuid
2047 self.other_node_uuid = self.instance.primary_node
2048 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2049
2050 elif self.mode == constants.REPLACE_DISK_CHG:
2051 self.new_node_uuid = remote_node_uuid
2052 self.other_node_uuid = self.instance.primary_node
2053 self.target_node_uuid = secondary_node_uuid
2054 check_nodes = [self.new_node_uuid, self.other_node_uuid]
2055
2056 CheckNodeNotDrained(self.lu, remote_node_uuid)
2057 CheckNodeVmCapable(self.lu, remote_node_uuid)
2058
2059 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2060 assert old_node_info is not None
2061 if old_node_info.offline and not self.early_release:
2062
2063 self.early_release = True
2064 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2065 " early-release mode", secondary_node_uuid)
2066
2067 else:
2068 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2069 self.mode)
2070
2071
2072 if not self.disks:
2073 self.disks = range(len(self.instance.disks))
2074
2075
2076
2077 if self.remote_node_info:
2078
2079 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2080 cluster = self.cfg.GetClusterInfo()
2081 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2082 new_group_info)
2083 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance,
2084 self.remote_node_info, self.cfg,
2085 ignore=self.ignore_ipolicy)
2086
2087 for node_uuid in check_nodes:
2088 CheckNodeOnline(self.lu, node_uuid)
2089
2090 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2091 self.other_node_uuid,
2092 self.target_node_uuid]
2093 if node_uuid is not None)
2094
2095
2096 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2097 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2098 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2099
2100
2101 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2102
2103
2104 for disk_idx in self.disks:
2105 self.instance.FindDisk(disk_idx)
2106
2107
2108 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2109 in self.cfg.GetMultiNodeInfo(touched_nodes))
2110
2111 - def Exec(self, feedback_fn):
2112 """Execute disk replacement.
2113
2114 This dispatches the disk replacement to the appropriate handler.
2115
2116 """
2117 if __debug__:
2118
2119 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2120 assert set(owned_nodes) == set(self.node_secondary_ip), \
2121 ("Incorrect node locks, owning %s, expected %s" %
2122 (owned_nodes, self.node_secondary_ip.keys()))
2123 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2124 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2125 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2126
2127 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2128 assert list(owned_instances) == [self.instance_name], \
2129 "Instance '%s' not locked" % self.instance_name
2130
2131 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2132 "Should not own any node group lock at this point"
2133
2134 if not self.disks:
2135 feedback_fn("No disks need replacement for instance '%s'" %
2136 self.instance.name)
2137 return
2138
2139 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2140 (utils.CommaJoin(self.disks), self.instance.name))
2141 feedback_fn("Current primary node: %s" %
2142 self.cfg.GetNodeName(self.instance.primary_node))
2143 feedback_fn("Current seconary node: %s" %
2144 utils.CommaJoin(self.cfg.GetNodeNames(
2145 self.instance.secondary_nodes)))
2146
2147 activate_disks = not self.instance.disks_active
2148
2149
2150 if activate_disks:
2151 StartInstanceDisks(self.lu, self.instance, True)
2152
2153 try:
2154
2155 if self.new_node_uuid is not None:
2156 fn = self._ExecDrbd8Secondary
2157 else:
2158 fn = self._ExecDrbd8DiskOnly
2159
2160 result = fn(feedback_fn)
2161 finally:
2162
2163
2164 if activate_disks:
2165 _SafeShutdownInstanceDisks(self.lu, self.instance)
2166
2167 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2168
2169 if __debug__:
2170
2171 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2172 nodes = frozenset(self.node_secondary_ip)
2173 assert ((self.early_release and not owned_nodes) or
2174 (not self.early_release and not (set(owned_nodes) - nodes))), \
2175 ("Not owning the correct locks, early_release=%s, owned=%r,"
2176 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2177
2178 return result
2179
2181 self.lu.LogInfo("Checking volume groups")
2182
2183 vgname = self.cfg.GetVGName()
2184
2185
2186 results = self.rpc.call_vg_list(node_uuids)
2187 if not results:
2188 raise errors.OpExecError("Can't list volume groups on the nodes")
2189
2190 for node_uuid in node_uuids:
2191 res = results[node_uuid]
2192 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2193 if vgname not in res.payload:
2194 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2195 (vgname, self.cfg.GetNodeName(node_uuid)))
2196
2198
2199 for idx, dev in enumerate(self.instance.disks):
2200 if idx not in self.disks:
2201 continue
2202
2203 for node_uuid in node_uuids:
2204 self.lu.LogInfo("Checking disk/%d on %s", idx,
2205 self.cfg.GetNodeName(node_uuid))
2206 self.cfg.SetDiskID(dev, node_uuid)
2207
2208 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2209
2210 msg = result.fail_msg
2211 if msg or not result.payload:
2212 if not msg:
2213 msg = "disk not found"
2214 if not self._CheckDisksActivated(self.instance):
2215 extra_hint = ("\nDisks seem to be not properly activated. Try"
2216 " running activate-disks on the instance before"
2217 " using replace-disks.")
2218 else:
2219 extra_hint = ""
2220 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2221 (idx, self.cfg.GetNodeName(node_uuid), msg,
2222 extra_hint))
2223
2225 for idx, dev in enumerate(self.instance.disks):
2226 if idx not in self.disks:
2227 continue
2228
2229 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2230 (idx, self.cfg.GetNodeName(node_uuid)))
2231
2232 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2233 on_primary, ldisk=ldisk):
2234 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2235 " replace disks for instance %s" %
2236 (self.cfg.GetNodeName(node_uuid),
2237 self.instance.name))
2238
2240 """Create new storage on the primary or secondary node.
2241
2242 This is only used for same-node replaces, not for changing the
2243 secondary node, hence we don't want to modify the existing disk.
2244
2245 """
2246 iv_names = {}
2247
2248 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2249 for idx, dev in enumerate(disks):
2250 if idx not in self.disks:
2251 continue
2252
2253 self.lu.LogInfo("Adding storage on %s for disk/%d",
2254 self.cfg.GetNodeName(node_uuid), idx)
2255
2256 self.cfg.SetDiskID(dev, node_uuid)
2257
2258 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2259 names = _GenerateUniqueNames(self.lu, lv_names)
2260
2261 (data_disk, meta_disk) = dev.children
2262 vg_data = data_disk.logical_id[0]
2263 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2264 logical_id=(vg_data, names[0]),
2265 params=data_disk.params)
2266 vg_meta = meta_disk.logical_id[0]
2267 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2268 size=constants.DRBD_META_SIZE,
2269 logical_id=(vg_meta, names[1]),
2270 params=meta_disk.params)
2271
2272 new_lvs = [lv_data, lv_meta]
2273 old_lvs = [child.Copy() for child in dev.children]
2274 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2275 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2276
2277
2278 for new_lv in new_lvs:
2279 try:
2280 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2281 GetInstanceInfoText(self.instance), False,
2282 excl_stor)
2283 except errors.DeviceCreationError, e:
2284 raise errors.OpExecError("Can't create block device: %s" % e.message)
2285
2286 return iv_names
2287
2289 for name, (dev, _, _) in iv_names.iteritems():
2290 self.cfg.SetDiskID(dev, node_uuid)
2291
2292 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2293
2294 msg = result.fail_msg
2295 if msg or not result.payload:
2296 if not msg:
2297 msg = "disk not found"
2298 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2299 (name, msg))
2300
2301 if result.payload.is_degraded:
2302 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2303
2305 for name, (_, old_lvs, _) in iv_names.iteritems():
2306 self.lu.LogInfo("Remove logical volumes for %s", name)
2307
2308 for lv in old_lvs:
2309 self.cfg.SetDiskID(lv, node_uuid)
2310
2311 msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2312 if msg:
2313 self.lu.LogWarning("Can't remove old LV: %s", msg,
2314 hint="remove unused LVs manually")
2315
2317 """Replace a disk on the primary or secondary for DRBD 8.
2318
2319 The algorithm for replace is quite complicated:
2320
2321 1. for each disk to be replaced:
2322
2323 1. create new LVs on the target node with unique names
2324 1. detach old LVs from the drbd device
2325 1. rename old LVs to name_replaced.<time_t>
2326 1. rename new LVs to old LVs
2327 1. attach the new LVs (with the old names now) to the drbd device
2328
2329 1. wait for sync across all devices
2330
2331 1. for each modified disk:
2332
2333 1. remove old LVs (which have the name name_replaces.<time_t>)
2334
2335 Failures are not very well handled.
2336
2337 """
2338 steps_total = 6
2339
2340
2341 self.lu.LogStep(1, steps_total, "Check device existence")
2342 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2343 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2344
2345
2346 self.lu.LogStep(2, steps_total, "Check peer consistency")
2347 self._CheckDisksConsistency(
2348 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2349 False)
2350
2351
2352 self.lu.LogStep(3, steps_total, "Allocate new storage")
2353 iv_names = self._CreateNewStorage(self.target_node_uuid)
2354
2355
2356 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2357 for dev, old_lvs, new_lvs in iv_names.itervalues():
2358 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2359
2360 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2361 old_lvs)
2362 result.Raise("Can't detach drbd from local storage on node"
2363 " %s for device %s" %
2364 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375 temp_suffix = int(time.time())
2376 ren_fn = lambda d, suff: (d.physical_id[0],
2377 d.physical_id[1] + "_replaced-%s" % suff)
2378
2379
2380 rename_old_to_new = []
2381 for to_ren in old_lvs:
2382 result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2383 if not result.fail_msg and result.payload:
2384
2385 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2386
2387 self.lu.LogInfo("Renaming the old LVs on the target node")
2388 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2389 rename_old_to_new)
2390 result.Raise("Can't rename old LVs on node %s" %
2391 self.cfg.GetNodeName(self.target_node_uuid))
2392
2393
2394 self.lu.LogInfo("Renaming the new LVs on the target node")
2395 rename_new_to_old = [(new, old.physical_id)
2396 for old, new in zip(old_lvs, new_lvs)]
2397 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2398 rename_new_to_old)
2399 result.Raise("Can't rename new LVs on node %s" %
2400 self.cfg.GetNodeName(self.target_node_uuid))
2401
2402
2403 for old, new in zip(old_lvs, new_lvs):
2404 new.logical_id = old.logical_id
2405 self.cfg.SetDiskID(new, self.target_node_uuid)
2406
2407
2408
2409
2410 for disk in old_lvs:
2411 disk.logical_id = ren_fn(disk, temp_suffix)
2412 self.cfg.SetDiskID(disk, self.target_node_uuid)
2413
2414
2415 self.lu.LogInfo("Adding new mirror component on %s",
2416 self.cfg.GetNodeName(self.target_node_uuid))
2417 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2418 (dev, self.instance), new_lvs)
2419 msg = result.fail_msg
2420 if msg:
2421 for new_lv in new_lvs:
2422 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2423 new_lv).fail_msg
2424 if msg2:
2425 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2426 hint=("cleanup manually the unused logical"
2427 "volumes"))
2428 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2429
2430 cstep = itertools.count(5)
2431
2432 if self.early_release:
2433 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2434 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2435
2436 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2437 else:
2438
2439 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2440 keep=self.node_secondary_ip.keys())
2441
2442
2443 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2444
2445
2446
2447
2448
2449
2450
2451 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2452 WaitForSync(self.lu, self.instance)
2453
2454
2455 self._CheckDevices(self.instance.primary_node, iv_names)
2456
2457
2458 if not self.early_release:
2459 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2460 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2461
2463 """Replace the secondary node for DRBD 8.
2464
2465 The algorithm for replace is quite complicated:
2466 - for all disks of the instance:
2467 - create new LVs on the new node with same names
2468 - shutdown the drbd device on the old secondary
2469 - disconnect the drbd network on the primary
2470 - create the drbd device on the new secondary
2471 - network attach the drbd on the primary, using an artifice:
2472 the drbd code for Attach() will connect to the network if it
2473 finds a device which is connected to the good local disks but
2474 not network enabled
2475 - wait for sync across all devices
2476 - remove all disks from the old secondary
2477
2478 Failures are not very well handled.
2479
2480 """
2481 steps_total = 6
2482
2483 pnode = self.instance.primary_node
2484
2485
2486 self.lu.LogStep(1, steps_total, "Check device existence")
2487 self._CheckDisksExistence([self.instance.primary_node])
2488 self._CheckVolumeGroup([self.instance.primary_node])
2489
2490
2491 self.lu.LogStep(2, steps_total, "Check peer consistency")
2492 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2493
2494
2495 self.lu.LogStep(3, steps_total, "Allocate new storage")
2496 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2497 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2498 self.new_node_uuid)
2499 for idx, dev in enumerate(disks):
2500 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2501 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2502
2503 for new_lv in dev.children:
2504 try:
2505 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2506 new_lv, True, GetInstanceInfoText(self.instance),
2507 False, excl_stor)
2508 except errors.DeviceCreationError, e:
2509 raise errors.OpExecError("Can't create block device: %s" % e.message)
2510
2511
2512
2513
2514 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2515 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2516 for _ in self.instance.disks],
2517 self.instance.uuid)
2518 logging.debug("Allocated minors %r", minors)
2519
2520 iv_names = {}
2521 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2522 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2523 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2524
2525
2526
2527
2528 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2529 if self.instance.primary_node == o_node1:
2530 p_minor = o_minor1
2531 else:
2532 assert self.instance.primary_node == o_node2, "Three-node instance?"
2533 p_minor = o_minor2
2534
2535 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2536 p_minor, new_minor, o_secret)
2537 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2538 p_minor, new_minor, o_secret)
2539
2540 iv_names[idx] = (dev, dev.children, new_net_id)
2541 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2542 new_net_id)
2543 new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2544 logical_id=new_alone_id,
2545 children=dev.children,
2546 size=dev.size,
2547 params={})
2548 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2549 self.cfg)
2550 try:
2551 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2552 anno_new_drbd,
2553 GetInstanceInfoText(self.instance), False,
2554 excl_stor)
2555 except errors.GenericError:
2556 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2557 raise
2558
2559
2560 for idx, dev in enumerate(self.instance.disks):
2561 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2562 self.cfg.SetDiskID(dev, self.target_node_uuid)
2563 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2564 (dev, self.instance)).fail_msg
2565 if msg:
2566 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2567 "node: %s" % (idx, msg),
2568 hint=("Please cleanup this device manually as"
2569 " soon as possible"))
2570
2571 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2572 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2573 self.instance.disks)[pnode]
2574
2575 msg = result.fail_msg
2576 if msg:
2577
2578 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2579 raise errors.OpExecError("Can't detach the disks from the network on"
2580 " old node: %s" % (msg,))
2581
2582
2583
2584 self.lu.LogInfo("Updating instance configuration")
2585 for dev, _, new_logical_id in iv_names.itervalues():
2586 dev.logical_id = new_logical_id
2587 self.cfg.SetDiskID(dev, self.instance.primary_node)
2588
2589 self.cfg.Update(self.instance, feedback_fn)
2590
2591
2592 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2593
2594
2595 self.lu.LogInfo("Attaching primary drbds to new secondary"
2596 " (standalone => connected)")
2597 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2598 self.new_node_uuid],
2599 self.node_secondary_ip,
2600 (self.instance.disks, self.instance),
2601 self.instance.name,
2602 False)
2603 for to_node, to_result in result.items():
2604 msg = to_result.fail_msg
2605 if msg:
2606 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2607 self.cfg.GetNodeName(to_node), msg,
2608 hint=("please do a gnt-instance info to see the"
2609 " status of disks"))
2610
2611 cstep = itertools.count(5)
2612
2613 if self.early_release:
2614 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2615 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2616
2617 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2618 else:
2619
2620 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2621 keep=self.node_secondary_ip.keys())
2622
2623
2624
2625
2626
2627
2628
2629 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2630 WaitForSync(self.lu, self.instance)
2631
2632
2633 self._CheckDevices(self.instance.primary_node, iv_names)
2634
2635
2636 if not self.early_release:
2637 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2638 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2639