1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with storage of instances."""
32
33 import itertools
34 import logging
35 import os
36 import time
37
38 from ganeti import compat
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ht
42 from ganeti import locking
43 from ganeti.masterd import iallocator
44 from ganeti import objects
45 from ganeti import utils
46 import ganeti.rpc.node as rpc
47 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
48 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
49 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
50 ComputeIPolicyDiskSizesViolation, \
51 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
52 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
53 CheckDiskTemplateEnabled
54 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
55 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
56 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
57
58 import ganeti.masterd.instance
59
60
61 _DISK_TEMPLATE_NAME_PREFIX = {
62 constants.DT_PLAIN: "",
63 constants.DT_RBD: ".rbd",
64 constants.DT_EXT: ".ext",
65 constants.DT_FILE: ".file",
66 constants.DT_SHARED_FILE: ".sharedfile",
67 }
72 """Create a single block device on a given node.
73
74 This will not recurse over children of the device, so they must be
75 created in advance.
76
77 @param lu: the lu on whose behalf we execute
78 @param node_uuid: the node on which to create the device
79 @type instance: L{objects.Instance}
80 @param instance: the instance which owns the device
81 @type device: L{objects.Disk}
82 @param device: the device to create
83 @param info: the extra 'metadata' we should attach to the device
84 (this will be represented as a LVM tag)
85 @type force_open: boolean
86 @param force_open: this parameter will be passes to the
87 L{backend.BlockdevCreate} function where it specifies
88 whether we run on primary or not, and it affects both
89 the child assembly and the device own Open() execution
90 @type excl_stor: boolean
91 @param excl_stor: Whether exclusive_storage is active for the node
92
93 """
94 result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
95 device.size, instance.name, force_open,
96 info, excl_stor)
97 result.Raise("Can't create block device %s on"
98 " node %s for instance %s" % (device,
99 lu.cfg.GetNodeName(node_uuid),
100 instance.name))
101
102
103 -def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
104 info, force_open, excl_stor):
105 """Create a tree of block devices on a given node.
106
107 If this device type has to be created on secondaries, create it and
108 all its children.
109
110 If not, just recurse to children keeping the same 'force' value.
111
112 @attention: The device has to be annotated already.
113
114 @param lu: the lu on whose behalf we execute
115 @param node_uuid: the node on which to create the device
116 @type instance: L{objects.Instance}
117 @param instance: the instance which owns the device
118 @type device: L{objects.Disk}
119 @param device: the device to create
120 @type force_create: boolean
121 @param force_create: whether to force creation of this device; this
122 will be change to True whenever we find a device which has
123 CreateOnSecondary() attribute
124 @param info: the extra 'metadata' we should attach to the device
125 (this will be represented as a LVM tag)
126 @type force_open: boolean
127 @param force_open: this parameter will be passes to the
128 L{backend.BlockdevCreate} function where it specifies
129 whether we run on primary or not, and it affects both
130 the child assembly and the device own Open() execution
131 @type excl_stor: boolean
132 @param excl_stor: Whether exclusive_storage is active for the node
133
134 @return: list of created devices
135 """
136 created_devices = []
137 try:
138 if device.CreateOnSecondary():
139 force_create = True
140
141 if device.children:
142 for child in device.children:
143 devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
144 force_create, info, force_open, excl_stor)
145 created_devices.extend(devs)
146
147 if not force_create:
148 return created_devices
149
150 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
151 excl_stor)
152
153
154 created_devices = [(node_uuid, device)]
155 return created_devices
156
157 except errors.DeviceCreationError, e:
158 e.created_devices.extend(created_devices)
159 raise e
160 except errors.OpExecError, e:
161 raise errors.DeviceCreationError(str(e), created_devices)
162
165 """Whether exclusive_storage is in effect for the given node.
166
167 @type cfg: L{config.ConfigWriter}
168 @param cfg: The cluster configuration
169 @type node_uuid: string
170 @param node_uuid: The node UUID
171 @rtype: bool
172 @return: The effective value of exclusive_storage
173 @raise errors.OpPrereqError: if no node exists with the given name
174
175 """
176 ni = cfg.GetNodeInfo(node_uuid)
177 if ni is None:
178 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
179 errors.ECODE_NOENT)
180 return IsExclusiveStorageEnabledNode(cfg, ni)
181
182
183 -def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
184 force_open):
185 """Wrapper around L{_CreateBlockDevInner}.
186
187 This method annotates the root device first.
188
189 """
190 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
191 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
192 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
193 force_open, excl_stor)
194
197 """Undo the work performed by L{CreateDisks}.
198
199 This function is called in case of an error to undo the work of
200 L{CreateDisks}.
201
202 @type lu: L{LogicalUnit}
203 @param lu: the logical unit on whose behalf we execute
204 @param disks_created: the result returned by L{CreateDisks}
205 @type instance: L{objects.Instance}
206 @param instance: the instance for which disks were created
207
208 """
209 for (node_uuid, disk) in disks_created:
210 result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
211 result.Warn("Failed to remove newly-created disk %s on node %s" %
212 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
213
214
215 -def CreateDisks(lu, instance, instance_disks=None,
216 to_skip=None, target_node_uuid=None, disks=None):
217 """Create all disks for an instance.
218
219 This abstracts away some work from AddInstance.
220
221 Since the instance may not have been saved to the config file yet, this
222 function can not query the config file for the instance's disks; in that
223 case they need to be passed as an argument.
224
225 @type lu: L{LogicalUnit}
226 @param lu: the logical unit on whose behalf we execute
227 @type instance: L{objects.Instance}
228 @param instance: the instance whose disks we should create
229 @type instance_disks: list of L{objects.Disk}
230 @param instance_disks: the disks that belong to the instance; if not
231 specified, retrieve them from config file
232 @type to_skip: list
233 @param to_skip: list of indices to skip
234 @type target_node_uuid: string
235 @param target_node_uuid: if passed, overrides the target node for creation
236 @type disks: list of {objects.Disk}
237 @param disks: the disks to create; if not specified, all the disks of the
238 instance are created
239 @return: information about the created disks, to be used to call
240 L{_UndoCreateDisks}
241 @raise errors.OpPrereqError: in case of error
242
243 """
244 info = GetInstanceInfoText(instance)
245 if instance_disks is None:
246 instance_disks = lu.cfg.GetInstanceDisks(instance.uuid)
247 if target_node_uuid is None:
248 pnode_uuid = instance.primary_node
249
250
251
252 all_node_uuids = []
253 for disk in instance_disks:
254 all_node_uuids.extend(disk.all_nodes)
255 all_node_uuids = set(all_node_uuids)
256
257 all_node_uuids.discard(pnode_uuid)
258 all_node_uuids = [pnode_uuid] + list(all_node_uuids)
259 else:
260 pnode_uuid = target_node_uuid
261 all_node_uuids = [pnode_uuid]
262
263 if disks is None:
264 disks = instance_disks
265
266 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
267
268 if instance.disk_template in constants.DTS_FILEBASED:
269 file_storage_dir = os.path.dirname(instance_disks[0].logical_id[1])
270 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
271
272 result.Raise("Failed to create directory '%s' on"
273 " node %s" % (file_storage_dir,
274 lu.cfg.GetNodeName(pnode_uuid)))
275
276 disks_created = []
277 for idx, device in enumerate(disks):
278 if to_skip and idx in to_skip:
279 continue
280 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
281 for node_uuid in all_node_uuids:
282 f_create = node_uuid == pnode_uuid
283 try:
284 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
285 f_create)
286 disks_created.append((node_uuid, device))
287 except errors.DeviceCreationError, e:
288 logging.warning("Creating disk %s for instance '%s' failed",
289 idx, instance.name)
290 disks_created.extend(e.created_devices)
291 _UndoCreateDisks(lu, disks_created, instance)
292 raise errors.OpExecError(e.message)
293 return disks_created
294
297 """Compute disk size requirements in the volume group
298
299 """
300 def _compute(disks, payload):
301 """Universal algorithm.
302
303 """
304 vgs = {}
305 for disk in disks:
306 vgs[disk[constants.IDISK_VG]] = \
307 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
308
309 return vgs
310
311
312 req_size_dict = {
313 constants.DT_DISKLESS: {},
314 constants.DT_PLAIN: _compute(disks, 0),
315
316 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
317 constants.DT_FILE: {},
318 constants.DT_SHARED_FILE: {},
319 constants.DT_GLUSTER: {},
320 }
321
322 if disk_template not in req_size_dict:
323 raise errors.ProgrammerError("Disk template '%s' size requirement"
324 " is unknown" % disk_template)
325
326 return req_size_dict[disk_template]
327
330 """Computes the instance disks.
331
332 @param op: The instance opcode
333 @param default_vg: The default_vg to assume
334
335 @return: The computed disks
336
337 """
338 disks = []
339 for disk in op.disks:
340 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
341 if mode not in constants.DISK_ACCESS_SET:
342 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
343 mode, errors.ECODE_INVAL)
344 size = disk.get(constants.IDISK_SIZE, None)
345 if size is None:
346 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
347 try:
348 size = int(size)
349 except (TypeError, ValueError):
350 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
351 errors.ECODE_INVAL)
352
353 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
354 if ext_provider and op.disk_template != constants.DT_EXT:
355 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
356 " disk template, not %s" %
357 (constants.IDISK_PROVIDER, constants.DT_EXT,
358 op.disk_template), errors.ECODE_INVAL)
359
360 data_vg = disk.get(constants.IDISK_VG, default_vg)
361 name = disk.get(constants.IDISK_NAME, None)
362 if name is not None and name.lower() == constants.VALUE_NONE:
363 name = None
364 new_disk = {
365 constants.IDISK_SIZE: size,
366 constants.IDISK_MODE: mode,
367 constants.IDISK_VG: data_vg,
368 constants.IDISK_NAME: name,
369 }
370
371 for key in [
372 constants.IDISK_METAVG,
373 constants.IDISK_ADOPT,
374 constants.IDISK_SPINDLES,
375 ]:
376 if key in disk:
377 new_disk[key] = disk[key]
378
379
380
381 if op.disk_template == constants.DT_EXT:
382 if ext_provider:
383 new_disk[constants.IDISK_PROVIDER] = ext_provider
384 for key in disk:
385 if key not in constants.IDISK_PARAMS:
386 new_disk[key] = disk[key]
387 else:
388 raise errors.OpPrereqError("Missing provider for template '%s'" %
389 constants.DT_EXT, errors.ECODE_INVAL)
390
391 disks.append(new_disk)
392
393 return disks
394
397 """Compute disk size requirements inside the RADOS cluster.
398
399 """
400
401 pass
402
403
404 -def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
405 iv_name, p_minor, s_minor):
406 """Generate a drbd8 device complete with its children.
407
408 """
409 assert len(vgnames) == len(names) == 2
410 port = lu.cfg.AllocatePort()
411 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
412
413 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
414 logical_id=(vgnames[0], names[0]),
415 params={})
416 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
417 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
418 size=constants.DRBD_META_SIZE,
419 logical_id=(vgnames[1], names[1]),
420 params={})
421 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
422 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
423 logical_id=(primary_uuid, secondary_uuid, port,
424 p_minor, s_minor,
425 shared_secret),
426 children=[dev_data, dev_meta],
427 iv_name=iv_name, params={})
428 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
429 return drbd_dev
430
431
432 -def GenerateDiskTemplate(
433 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
434 disk_info, file_storage_dir, file_driver, base_index,
435 feedback_fn, full_disk_params):
436 """Generate the entire disk layout for a given template type.
437
438 """
439 vgname = lu.cfg.GetVGName()
440 disk_count = len(disk_info)
441 disks = []
442
443 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
444
445 if template_name == constants.DT_DISKLESS:
446 pass
447 elif template_name == constants.DT_DRBD8:
448 if len(secondary_node_uuids) != 1:
449 raise errors.ProgrammerError("Wrong template configuration")
450 remote_node_uuid = secondary_node_uuids[0]
451 minors = lu.cfg.AllocateDRBDMinor(
452 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
453
454 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
455 full_disk_params)
456 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
457
458 names = []
459 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
460 for i in range(disk_count)]):
461 names.append(lv_prefix + "_data")
462 names.append(lv_prefix + "_meta")
463 for idx, disk in enumerate(disk_info):
464 disk_index = idx + base_index
465 data_vg = disk.get(constants.IDISK_VG, vgname)
466 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
467 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
468 disk[constants.IDISK_SIZE],
469 [data_vg, meta_vg],
470 names[idx * 2:idx * 2 + 2],
471 "disk/%d" % disk_index,
472 minors[idx * 2], minors[idx * 2 + 1])
473 disk_dev.mode = disk[constants.IDISK_MODE]
474 disk_dev.name = disk.get(constants.IDISK_NAME, None)
475 disks.append(disk_dev)
476 else:
477 if secondary_node_uuids:
478 raise errors.ProgrammerError("Wrong template configuration")
479
480 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
481 if name_prefix is None:
482 names = None
483 else:
484 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
485 (name_prefix, base_index + i)
486 for i in range(disk_count)])
487
488 if template_name == constants.DT_PLAIN:
489
490 def logical_id_fn(idx, _, disk):
491 vg = disk.get(constants.IDISK_VG, vgname)
492 return (vg, names[idx])
493
494 elif template_name == constants.DT_GLUSTER:
495 logical_id_fn = lambda _1, disk_index, _2: \
496 (file_driver, "ganeti/%s.%d" % (instance_uuid,
497 disk_index))
498
499 elif template_name in constants.DTS_FILEBASED:
500 logical_id_fn = \
501 lambda _, disk_index, disk: (file_driver,
502 "%s/%s" % (file_storage_dir,
503 names[idx]))
504 elif template_name == constants.DT_BLOCK:
505 logical_id_fn = \
506 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
507 disk[constants.IDISK_ADOPT])
508 elif template_name == constants.DT_RBD:
509 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
510 elif template_name == constants.DT_EXT:
511 def logical_id_fn(idx, _, disk):
512 provider = disk.get(constants.IDISK_PROVIDER, None)
513 if provider is None:
514 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
515 " not found", constants.DT_EXT,
516 constants.IDISK_PROVIDER)
517 return (provider, names[idx])
518 else:
519 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
520
521 dev_type = template_name
522
523 for idx, disk in enumerate(disk_info):
524 params = {}
525
526 if template_name == constants.DT_EXT:
527 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
528 for key in disk:
529 if key not in constants.IDISK_PARAMS:
530 params[key] = disk[key]
531 disk_index = idx + base_index
532 size = disk[constants.IDISK_SIZE]
533 feedback_fn("* disk %s, size %s" %
534 (disk_index, utils.FormatUnit(size, "h")))
535 disk_dev = objects.Disk(dev_type=dev_type, size=size,
536 logical_id=logical_id_fn(idx, disk_index, disk),
537 iv_name="disk/%d" % disk_index,
538 mode=disk[constants.IDISK_MODE],
539 params=params,
540 spindles=disk.get(constants.IDISK_SPINDLES))
541 disk_dev.name = disk.get(constants.IDISK_NAME, None)
542 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
543 disks.append(disk_dev)
544
545 return disks
546
549 """Check the presence of the spindle options with exclusive_storage.
550
551 @type diskdict: dict
552 @param diskdict: disk parameters
553 @type es_flag: bool
554 @param es_flag: the effective value of the exlusive_storage flag
555 @type required: bool
556 @param required: whether spindles are required or just optional
557 @raise errors.OpPrereqError when spindles are given and they should not
558
559 """
560 if (not es_flag and constants.IDISK_SPINDLES in diskdict and
561 diskdict[constants.IDISK_SPINDLES] is not None):
562 raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
563 " when exclusive storage is not active",
564 errors.ECODE_INVAL)
565 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
566 diskdict[constants.IDISK_SPINDLES] is None)):
567 raise errors.OpPrereqError("You must specify spindles in instance disks"
568 " when exclusive storage is active",
569 errors.ECODE_INVAL)
570
573 """Recreate an instance's missing disks.
574
575 """
576 HPATH = "instance-recreate-disks"
577 HTYPE = constants.HTYPE_INSTANCE
578 REQ_BGL = False
579
580 _MODIFYABLE = compat.UniqueFrozenset([
581 constants.IDISK_SIZE,
582 constants.IDISK_MODE,
583 constants.IDISK_SPINDLES,
584 ])
585
586
587 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
588 constants.IDISK_ADOPT,
589
590
591 constants.IDISK_VG,
592 constants.IDISK_METAVG,
593 constants.IDISK_PROVIDER,
594 constants.IDISK_NAME,
595 ]))
596
598 """Run the allocator based on input opcode.
599
600 """
601 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620 disk_template = self.instance.disk_template
621 spindle_use = be_full[constants.BE_SPINDLE_USE]
622 disks = [{
623 constants.IDISK_SIZE: d.size,
624 constants.IDISK_MODE: d.mode,
625 constants.IDISK_SPINDLES: d.spindles,
626 } for d in self.cfg.GetInstanceDisks(self.instance.uuid)]
627 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
628 disk_template=disk_template,
629 tags=list(self.instance.GetTags()),
630 os=self.instance.os,
631 nics=[{}],
632 vcpus=be_full[constants.BE_VCPUS],
633 memory=be_full[constants.BE_MAXMEM],
634 spindle_use=spindle_use,
635 disks=disks,
636 hypervisor=self.instance.hypervisor,
637 node_whitelist=None)
638 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
639
640 ial.Run(self.op.iallocator)
641
642 assert req.RequiredNodes() == \
643 len(self.cfg.GetInstanceNodes(self.instance.uuid))
644
645 if not ial.success:
646 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
647 " %s" % (self.op.iallocator, ial.info),
648 errors.ECODE_NORES)
649
650 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
651 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
652 self.op.instance_name, self.op.iallocator,
653 utils.CommaJoin(self.op.nodes))
654
679
695
731
739
747
749 """Check prerequisites.
750
751 This checks that the instance is in the cluster and is not running.
752
753 """
754 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
755 assert instance is not None, \
756 "Cannot retrieve locked instance %s" % self.op.instance_name
757 if self.op.node_uuids:
758 inst_nodes = self.cfg.GetInstanceNodes(instance.uuid)
759 if len(self.op.node_uuids) != len(inst_nodes):
760 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
761 " %d replacement nodes were specified" %
762 (instance.name, len(inst_nodes),
763 len(self.op.node_uuids)),
764 errors.ECODE_INVAL)
765 assert instance.disk_template != constants.DT_DRBD8 or \
766 len(self.op.node_uuids) == 2
767 assert instance.disk_template != constants.DT_PLAIN or \
768 len(self.op.node_uuids) == 1
769 primary_node = self.op.node_uuids[0]
770 else:
771 primary_node = instance.primary_node
772 if not self.op.iallocator:
773 CheckNodeOnline(self, primary_node)
774
775 if instance.disk_template == constants.DT_DISKLESS:
776 raise errors.OpPrereqError("Instance '%s' has no disks" %
777 self.op.instance_name, errors.ECODE_INVAL)
778
779
780 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
781 if owned_groups:
782
783
784 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
785 primary_only=True)
786
787
788
789 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
790 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
791 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
792 msg="cannot recreate disks")
793
794 if self.op.disks:
795 self.disks = dict(self.op.disks)
796 else:
797 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
798
799 maxidx = max(self.disks.keys())
800 if maxidx >= len(instance.disks):
801 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
802 errors.ECODE_INVAL)
803
804 if ((self.op.node_uuids or self.op.iallocator) and
805 sorted(self.disks.keys()) != range(len(instance.disks))):
806 raise errors.OpPrereqError("Can't recreate disks partially and"
807 " change the nodes at the same time",
808 errors.ECODE_INVAL)
809
810 self.instance = instance
811
812 if self.op.iallocator:
813 self._RunAllocator()
814
815 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
816 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
817 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
818
819 if self.op.node_uuids:
820 node_uuids = self.op.node_uuids
821 else:
822 node_uuids = self.cfg.GetInstanceNodes(instance.uuid)
823 excl_stor = compat.any(
824 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
825 )
826 for new_params in self.disks.values():
827 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
828
829 - def Exec(self, feedback_fn):
830 """Recreate the disks.
831
832 """
833 assert (self.owned_locks(locking.LEVEL_NODE) ==
834 self.owned_locks(locking.LEVEL_NODE_RES))
835
836 to_skip = []
837 mods = []
838
839 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
840 for idx, disk in enumerate(inst_disks):
841 try:
842 changes = self.disks[idx]
843 except KeyError:
844
845 to_skip.append(idx)
846 continue
847
848
849 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
850
851 assert len(self.op.node_uuids) == 2
852 assert len(disk.logical_id) == 6
853
854 (_, _, old_port, _, _, old_secret) = disk.logical_id
855 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
856 self.instance.uuid)
857 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
858 new_minors[0], new_minors[1], old_secret)
859 assert len(disk.logical_id) == len(new_id)
860 else:
861 new_id = None
862
863 mods.append((idx, new_id, changes))
864
865
866
867 for idx, new_id, changes in mods:
868 disk = inst_disks[idx]
869 if new_id is not None:
870 assert disk.dev_type == constants.DT_DRBD8
871 disk.logical_id = new_id
872 if changes:
873 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
874 mode=changes.get(constants.IDISK_MODE, None),
875 spindles=changes.get(constants.IDISK_SPINDLES, None))
876 self.cfg.Update(disk, feedback_fn)
877
878
879 if self.op.node_uuids:
880 self.instance.primary_node = self.op.node_uuids[0]
881 self.LogWarning("Changing the instance's nodes, you will have to"
882 " remove any disks left on the older nodes manually")
883
884 if self.op.node_uuids:
885 self.cfg.Update(self.instance, feedback_fn)
886
887
888 mylocks = self.owned_locks(locking.LEVEL_NODE)
889 inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid)
890 assert mylocks.issuperset(frozenset(inst_nodes))
891 new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
892
893
894 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
895 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
896 wipedisks = [(idx, disk, 0)
897 for (idx, disk) in enumerate(inst_disks)
898 if idx not in to_skip]
899 WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
900 cleanup=new_disks)
901
922
925 """Checks the vg capacity for a given node.
926
927 @type node_info: tuple (_, list of dicts, _)
928 @param node_info: the result of the node info call for one node
929 @type node_name: string
930 @param node_name: the name of the node
931 @type vg: string
932 @param vg: volume group name
933 @type requested: int
934 @param requested: the amount of disk in MiB to check for
935 @raise errors.OpPrereqError: if the node doesn't have enough disk,
936 or we cannot check the node
937
938 """
939 (_, space_info, _) = node_info
940 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
941 space_info, constants.ST_LVM_VG)
942 if not lvm_vg_info:
943 raise errors.OpPrereqError("Can't retrieve storage information for LVM")
944 vg_free = lvm_vg_info.get("storage_free", None)
945 if not isinstance(vg_free, int):
946 raise errors.OpPrereqError("Can't compute free disk space on node"
947 " %s for vg %s, result was '%s'" %
948 (node_name, vg, vg_free), errors.ECODE_ENVIRON)
949 if requested > vg_free:
950 raise errors.OpPrereqError("Not enough disk space on target node %s"
951 " vg %s: required %d MiB, available %d MiB" %
952 (node_name, vg, requested, vg_free),
953 errors.ECODE_NORES)
954
957 """Checks if nodes have enough free disk space in the specified VG.
958
959 This function checks if all given nodes have the needed amount of
960 free disk. In case any node has less disk or we cannot get the
961 information from the node, this function raises an OpPrereqError
962 exception.
963
964 @type lu: C{LogicalUnit}
965 @param lu: a logical unit from which we get configuration data
966 @type node_uuids: C{list}
967 @param node_uuids: the list of node UUIDs to check
968 @type vg: C{str}
969 @param vg: the volume group to check
970 @type requested: C{int}
971 @param requested: the amount of disk in MiB to check for
972 @raise errors.OpPrereqError: if the node doesn't have enough disk,
973 or we cannot check the node
974
975 """
976 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
977 for node_uuid in node_uuids:
978 node_name = lu.cfg.GetNodeName(node_uuid)
979 info = nodeinfo[node_uuid]
980 info.Raise("Cannot get current information from node %s" % node_name,
981 prereq=True, ecode=errors.ECODE_ENVIRON)
982 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
983
986 """Checks if nodes have enough free disk space in all the VGs.
987
988 This function checks if all given nodes have the needed amount of
989 free disk. In case any node has less disk or we cannot get the
990 information from the node, this function raises an OpPrereqError
991 exception.
992
993 @type lu: C{LogicalUnit}
994 @param lu: a logical unit from which we get configuration data
995 @type node_uuids: C{list}
996 @param node_uuids: the list of node UUIDs to check
997 @type req_sizes: C{dict}
998 @param req_sizes: the hash of vg and corresponding amount of disk in
999 MiB to check for
1000 @raise errors.OpPrereqError: if the node doesn't have enough disk,
1001 or we cannot check the node
1002
1003 """
1004 for vg, req_size in req_sizes.items():
1005 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
1006
1009 """Converts a disk size in bytes to mebibytes.
1010
1011 Warns and rounds up if the size isn't an even multiple of 1 MiB.
1012
1013 """
1014 (mib, remainder) = divmod(size, 1024 * 1024)
1015
1016 if remainder != 0:
1017 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
1018 " to not overwrite existing data (%s bytes will not be"
1019 " wiped)", (1024 * 1024) - remainder)
1020 mib += 1
1021
1022 return mib
1023
1024
1025 -def _CalcEta(time_taken, written, total_size):
1026 """Calculates the ETA based on size written and total size.
1027
1028 @param time_taken: The time taken so far
1029 @param written: amount written so far
1030 @param total_size: The total size of data to be written
1031 @return: The remaining time in seconds
1032
1033 """
1034 avg_time = time_taken / float(written)
1035 return (total_size - written) * avg_time
1036
1037
1038 -def WipeDisks(lu, instance, disks=None):
1039 """Wipes instance disks.
1040
1041 @type lu: L{LogicalUnit}
1042 @param lu: the logical unit on whose behalf we execute
1043 @type instance: L{objects.Instance}
1044 @param instance: the instance whose disks we should create
1045 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1046 @param disks: Disk details; tuple contains disk index, disk object and the
1047 start offset
1048
1049 """
1050 node_uuid = instance.primary_node
1051 node_name = lu.cfg.GetNodeName(node_uuid)
1052
1053 if disks is None:
1054 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1055 disks = [(idx, disk, 0)
1056 for (idx, disk) in enumerate(inst_disks)]
1057
1058 logging.info("Pausing synchronization of disks of instance '%s'",
1059 instance.name)
1060 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1061 (map(compat.snd, disks),
1062 instance),
1063 True)
1064 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1065
1066 for idx, success in enumerate(result.payload):
1067 if not success:
1068 logging.warn("Pausing synchronization of disk %s of instance '%s'"
1069 " failed", idx, instance.name)
1070
1071 try:
1072 for (idx, device, offset) in disks:
1073
1074
1075 wipe_chunk_size = \
1076 int(min(constants.MAX_WIPE_CHUNK,
1077 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1078
1079 size = device.size
1080 last_output = 0
1081 start_time = time.time()
1082
1083 if offset == 0:
1084 info_text = ""
1085 else:
1086 info_text = (" (from %s to %s)" %
1087 (utils.FormatUnit(offset, "h"),
1088 utils.FormatUnit(size, "h")))
1089
1090 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1091
1092 logging.info("Wiping disk %d for instance %s on node %s using"
1093 " chunk size %s", idx, instance.name, node_name,
1094 wipe_chunk_size)
1095
1096 while offset < size:
1097 wipe_size = min(wipe_chunk_size, size - offset)
1098
1099 logging.debug("Wiping disk %d, offset %s, chunk %s",
1100 idx, offset, wipe_size)
1101
1102 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1103 offset, wipe_size)
1104 result.Raise("Could not wipe disk %d at offset %d for size %d" %
1105 (idx, offset, wipe_size))
1106
1107 now = time.time()
1108 offset += wipe_size
1109 if now - last_output >= 60:
1110 eta = _CalcEta(now - start_time, offset, size)
1111 lu.LogInfo(" - done: %.1f%% ETA: %s",
1112 offset / float(size) * 100, utils.FormatSeconds(eta))
1113 last_output = now
1114 finally:
1115 logging.info("Resuming synchronization of disks for instance '%s'",
1116 instance.name)
1117
1118 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1119 (map(compat.snd, disks),
1120 instance),
1121 False)
1122
1123 if result.fail_msg:
1124 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1125 node_name, result.fail_msg)
1126 else:
1127 for idx, success in enumerate(result.payload):
1128 if not success:
1129 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1130 " failed", idx, instance.name)
1131
1132
1133 -def ImageDisks(lu, instance, image, disks=None):
1134 """Dumps an image onto an instance disk.
1135
1136 @type lu: L{LogicalUnit}
1137 @param lu: the logical unit on whose behalf we execute
1138 @type instance: L{objects.Instance}
1139 @param instance: the instance whose disks we should create
1140 @type image: string
1141 @param image: the image whose disks we should create
1142 @type disks: None or list of ints
1143 @param disks: disk indices
1144
1145 """
1146 node_uuid = instance.primary_node
1147 node_name = lu.cfg.GetNodeName(node_uuid)
1148
1149 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1150 if disks is None:
1151 disks = [(0, inst_disks[0])]
1152 else:
1153 disks = map(lambda idx: (idx, inst_disks[idx]), disks)
1154
1155 logging.info("Pausing synchronization of disks of instance '%s'",
1156 instance.name)
1157 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1158 (map(compat.snd, disks),
1159 instance),
1160 True)
1161 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1162
1163 for idx, success in enumerate(result.payload):
1164 if not success:
1165 logging.warn("Pausing synchronization of disk %s of instance '%s'"
1166 " failed", idx, instance.name)
1167
1168 try:
1169 for (idx, device) in disks:
1170 lu.LogInfo("Imaging disk '%d' for instance '%s' on node '%s'",
1171 idx, instance.name, node_name)
1172
1173 result = lu.rpc.call_blockdev_image(node_uuid, (device, instance),
1174 image, device.size)
1175 result.Raise("Could not image disk '%d' for instance '%s' on node '%s'" %
1176 (idx, instance.name, node_name))
1177 finally:
1178 logging.info("Resuming synchronization of disks for instance '%s'",
1179 instance.name)
1180
1181 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1182 (map(compat.snd, disks),
1183 instance),
1184 False)
1185
1186 if result.fail_msg:
1187 lu.LogWarning("Failed to resume disk synchronization for instance '%s' on"
1188 " node '%s'", node_name, result.fail_msg)
1189 else:
1190 for idx, success in enumerate(result.payload):
1191 if not success:
1192 lu.LogWarning("Failed to resume synchronization of disk '%d' of"
1193 " instance '%s'", idx, instance.name)
1194
1197 """Wrapper for L{WipeDisks} that handles errors.
1198
1199 @type lu: L{LogicalUnit}
1200 @param lu: the logical unit on whose behalf we execute
1201 @type instance: L{objects.Instance}
1202 @param instance: the instance whose disks we should wipe
1203 @param disks: see L{WipeDisks}
1204 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1205 case of error
1206 @raise errors.OpPrereqError: in case of failure
1207
1208 """
1209 try:
1210 WipeDisks(lu, instance, disks=disks)
1211 except errors.OpExecError:
1212 logging.warning("Wiping disks for instance '%s' failed",
1213 instance.name)
1214 _UndoCreateDisks(lu, cleanup, instance)
1215 raise
1216
1219 """Return the instance disks selected by the disks list
1220
1221 @type disks: list of L{objects.Disk} or None
1222 @param disks: selected disks
1223 @rtype: list of L{objects.Disk}
1224 @return: selected instance disks to act on
1225
1226 """
1227 if disks is None:
1228 return instance_disks
1229 else:
1230 inst_disks_uuids = [d.uuid for d in instance_disks]
1231 disks_uuids = [d.uuid for d in disks]
1232 if not set(disks_uuids).issubset(inst_disks_uuids):
1233 raise errors.ProgrammerError("Can only act on disks belonging to the"
1234 " target instance: expected a subset of %s,"
1235 " got %s" % (inst_disks_uuids, disks_uuids))
1236 return disks
1237
1238
1239 -def WaitForSync(lu, instance, disks=None, oneshot=False):
1240 """Sleep and poll for an instance's disk to sync.
1241
1242 """
1243 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1244 if not inst_disks or disks is not None and not disks:
1245 return True
1246
1247 disks = ExpandCheckDisks(inst_disks, disks)
1248
1249 if not oneshot:
1250 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1251
1252 node_uuid = instance.primary_node
1253 node_name = lu.cfg.GetNodeName(node_uuid)
1254
1255
1256
1257 retries = 0
1258 degr_retries = 10
1259 while True:
1260 max_time = 0
1261 done = True
1262 cumul_degraded = False
1263 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1264 msg = rstats.fail_msg
1265 if msg:
1266 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1267 retries += 1
1268 if retries >= 10:
1269 raise errors.RemoteError("Can't contact node %s for mirror data,"
1270 " aborting." % node_name)
1271 time.sleep(6)
1272 continue
1273 rstats = rstats.payload
1274 retries = 0
1275 for i, mstat in enumerate(rstats):
1276 if mstat is None:
1277 lu.LogWarning("Can't compute data for node %s/%s",
1278 node_name, disks[i].iv_name)
1279 continue
1280
1281 cumul_degraded = (cumul_degraded or
1282 (mstat.is_degraded and mstat.sync_percent is None))
1283 if mstat.sync_percent is not None:
1284 done = False
1285 if mstat.estimated_time is not None:
1286 rem_time = ("%s remaining (estimated)" %
1287 utils.FormatSeconds(mstat.estimated_time))
1288 max_time = mstat.estimated_time
1289 else:
1290 rem_time = "no time estimate"
1291 max_time = 5
1292 lu.LogInfo("- device %s: %5.2f%% done, %s",
1293 disks[i].iv_name, mstat.sync_percent, rem_time)
1294
1295
1296
1297
1298 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1299 logging.info("Degraded disks found, %d retries left", degr_retries)
1300 degr_retries -= 1
1301 time.sleep(1)
1302 continue
1303
1304 if done or oneshot:
1305 break
1306
1307 time.sleep(min(60, max_time))
1308
1309 if done:
1310 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1311
1312 return not cumul_degraded
1313
1316 """Shutdown block devices of an instance.
1317
1318 This does the shutdown on all nodes of the instance.
1319
1320 If the ignore_primary is false, errors on the primary node are
1321 ignored.
1322
1323 Modifies the configuration of the instance, so the caller should re-read the
1324 instance configuration, if needed.
1325
1326 """
1327 all_result = True
1328
1329 if disks is None:
1330
1331 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1332 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1333 disks = ExpandCheckDisks(inst_disks, disks)
1334
1335 for disk in disks:
1336 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1337 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1338 msg = result.fail_msg
1339 if msg:
1340 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1341 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1342 if ((node_uuid == instance.primary_node and not ignore_primary) or
1343 (node_uuid != instance.primary_node and not result.offline)):
1344 all_result = False
1345 return all_result
1346
1349 """Shutdown block devices of an instance.
1350
1351 This function checks if an instance is running, before calling
1352 _ShutdownInstanceDisks.
1353
1354 """
1355 if req_states is None:
1356 req_states = INSTANCE_DOWN
1357 CheckInstanceState(lu, instance, req_states, msg="cannot shutdown disks")
1358 ShutdownInstanceDisks(lu, instance, disks=disks)
1359
1360
1361 -def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1362 ignore_size=False):
1363 """Prepare the block devices for an instance.
1364
1365 This sets up the block devices on all nodes.
1366
1367 Modifies the configuration of the instance, so the caller should re-read the
1368 instance configuration, if needed.
1369
1370 @type lu: L{LogicalUnit}
1371 @param lu: the logical unit on whose behalf we execute
1372 @type instance: L{objects.Instance}
1373 @param instance: the instance for whose disks we assemble
1374 @type disks: list of L{objects.Disk} or None
1375 @param disks: which disks to assemble (or all, if None)
1376 @type ignore_secondaries: boolean
1377 @param ignore_secondaries: if true, errors on secondary nodes
1378 won't result in an error return from the function
1379 @type ignore_size: boolean
1380 @param ignore_size: if true, the current known size of the disk
1381 will not be used during the disk activation, useful for cases
1382 when the size is wrong
1383 @return: False if the operation failed, otherwise a list of
1384 (host, instance_visible_name, node_visible_name)
1385 with the mapping from node devices to instance devices
1386
1387 """
1388 device_info = []
1389 disks_ok = True
1390
1391 if disks is None:
1392
1393 instance = lu.cfg.MarkInstanceDisksActive(instance.uuid)
1394
1395 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1396 disks = ExpandCheckDisks(inst_disks, disks)
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408 for idx, inst_disk in enumerate(disks):
1409 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1410 instance.primary_node):
1411 if ignore_size:
1412 node_disk = node_disk.Copy()
1413 node_disk.UnsetSize()
1414 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1415 instance, False, idx)
1416 msg = result.fail_msg
1417 if msg:
1418 secondary_nodes = lu.cfg.GetInstanceSecondaryNodes(instance.uuid)
1419 is_offline_secondary = (node_uuid in secondary_nodes and
1420 result.offline)
1421 lu.LogWarning("Could not prepare block device %s on node %s"
1422 " (is_primary=False, pass=1): %s",
1423 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1424 if not (ignore_secondaries or is_offline_secondary):
1425 disks_ok = False
1426
1427
1428
1429
1430 for idx, inst_disk in enumerate(disks):
1431 dev_path = None
1432
1433 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1434 instance.primary_node):
1435 if node_uuid != instance.primary_node:
1436 continue
1437 if ignore_size:
1438 node_disk = node_disk.Copy()
1439 node_disk.UnsetSize()
1440 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1441 instance, True, idx)
1442 msg = result.fail_msg
1443 if msg:
1444 lu.LogWarning("Could not prepare block device %s on node %s"
1445 " (is_primary=True, pass=2): %s",
1446 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1447 disks_ok = False
1448 else:
1449 dev_path, _, __ = result.payload
1450
1451 device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1452 inst_disk.iv_name, dev_path))
1453
1454 if not disks_ok:
1455 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1456
1457 return disks_ok, device_info
1458
1461 """Start the disks of an instance.
1462
1463 Modifies the configuration of the instance, so the caller should re-read the
1464 instance configuration, if needed.
1465
1466 """
1467 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1468 ignore_secondaries=force)
1469 if not disks_ok:
1470 ShutdownInstanceDisks(lu, instance)
1471 if force is not None and not force:
1472 lu.LogWarning("",
1473 hint=("If the message above refers to a secondary node,"
1474 " you can retry the operation using '--force'"))
1475 raise errors.OpExecError("Disk consistency error")
1476
1479 """Grow a disk of an instance.
1480
1481 """
1482 HPATH = "disk-grow"
1483 HTYPE = constants.HTYPE_INSTANCE
1484 REQ_BGL = False
1485
1492
1500
1502 """Build hooks env.
1503
1504 This runs on the master, the primary and all the secondaries.
1505
1506 """
1507 env = {
1508 "DISK": self.op.disk,
1509 "AMOUNT": self.op.amount,
1510 "ABSOLUTE": self.op.absolute,
1511 }
1512 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1513 return env
1514
1522
1524 """Check prerequisites.
1525
1526 This checks that the instance is in the cluster.
1527
1528 """
1529 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1530 assert self.instance is not None, \
1531 "Cannot retrieve locked instance %s" % self.op.instance_name
1532 node_uuids = list(self.cfg.GetInstanceNodes(self.instance.uuid))
1533 for node_uuid in node_uuids:
1534 CheckNodeOnline(self, node_uuid)
1535 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1536
1537 if self.instance.disk_template not in constants.DTS_GROWABLE:
1538 raise errors.OpPrereqError("Instance's disk layout does not support"
1539 " growing", errors.ECODE_INVAL)
1540
1541 self.disk = self.cfg.GetDiskInfo(self.instance.FindDisk(self.op.disk))
1542
1543 if self.op.absolute:
1544 self.target = self.op.amount
1545 self.delta = self.target - self.disk.size
1546 if self.delta < 0:
1547 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1548 "current disk size (%s)" %
1549 (utils.FormatUnit(self.target, "h"),
1550 utils.FormatUnit(self.disk.size, "h")),
1551 errors.ECODE_STATE)
1552 else:
1553 self.delta = self.op.amount
1554 self.target = self.disk.size + self.delta
1555 if self.delta < 0:
1556 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1557 utils.FormatUnit(self.delta, "h"),
1558 errors.ECODE_INVAL)
1559
1560 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1561
1562 self._CheckIPolicy(self.target)
1563
1574
1597
1598 - def Exec(self, feedback_fn):
1599 """Execute disk grow.
1600
1601 """
1602 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1603 assert (self.owned_locks(locking.LEVEL_NODE) ==
1604 self.owned_locks(locking.LEVEL_NODE_RES))
1605
1606 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1607
1608 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1609 if not disks_ok:
1610 raise errors.OpExecError("Cannot activate block device to grow")
1611
1612 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1613 (self.op.disk, self.instance.name,
1614 utils.FormatUnit(self.delta, "h"),
1615 utils.FormatUnit(self.target, "h")))
1616
1617
1618 inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid)
1619 for node_uuid in inst_nodes:
1620 result = self.rpc.call_blockdev_grow(node_uuid,
1621 (self.disk, self.instance),
1622 self.delta, True, True,
1623 self.node_es_flags[node_uuid])
1624 result.Raise("Dry-run grow request failed to node %s" %
1625 self.cfg.GetNodeName(node_uuid))
1626
1627 if wipe_disks:
1628
1629 result = self.rpc.call_blockdev_getdimensions(
1630 self.instance.primary_node, [([self.disk], self.instance)])
1631 result.Raise("Failed to retrieve disk size from node '%s'" %
1632 self.instance.primary_node)
1633
1634 (disk_dimensions, ) = result.payload
1635
1636 if disk_dimensions is None:
1637 raise errors.OpExecError("Failed to retrieve disk size from primary"
1638 " node '%s'" % self.instance.primary_node)
1639 (disk_size_in_bytes, _) = disk_dimensions
1640
1641 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1642
1643 assert old_disk_size >= self.disk.size, \
1644 ("Retrieved disk size too small (got %s, should be at least %s)" %
1645 (old_disk_size, self.disk.size))
1646 else:
1647 old_disk_size = None
1648
1649
1650
1651 for node_uuid in inst_nodes:
1652 result = self.rpc.call_blockdev_grow(node_uuid,
1653 (self.disk, self.instance),
1654 self.delta, False, True,
1655 self.node_es_flags[node_uuid])
1656 result.Raise("Grow request failed to node %s" %
1657 self.cfg.GetNodeName(node_uuid))
1658
1659
1660 node_uuid = self.instance.primary_node
1661 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1662 self.delta, False, False,
1663 self.node_es_flags[node_uuid])
1664 result.Raise("Grow request failed to node %s" %
1665 self.cfg.GetNodeName(node_uuid))
1666
1667 self.disk.RecordGrow(self.delta)
1668 self.cfg.Update(self.instance, feedback_fn)
1669 self.cfg.Update(self.disk, feedback_fn)
1670
1671
1672 ReleaseLocks(self, locking.LEVEL_NODE)
1673
1674
1675 self.WConfdClient().DownGradeLocksLevel(
1676 locking.LEVEL_NAMES[locking.LEVEL_INSTANCE])
1677
1678 assert wipe_disks ^ (old_disk_size is None)
1679
1680 if wipe_disks:
1681 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
1682 assert inst_disks[self.op.disk] == self.disk
1683
1684
1685 WipeDisks(self, self.instance,
1686 disks=[(self.op.disk, self.disk, old_disk_size)])
1687
1688 if self.op.wait_for_sync:
1689 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1690 if disk_abort:
1691 self.LogWarning("Disk syncing has not returned a good status; check"
1692 " the instance")
1693 if not self.instance.disks_active:
1694 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1695 elif not self.instance.disks_active:
1696 self.LogWarning("Not shutting down the disk even if the instance is"
1697 " not supposed to be running because no wait for"
1698 " sync mode was requested")
1699
1700 assert self.owned_locks(locking.LEVEL_NODE_RES)
1701 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1702
1705 """Replace the disks of an instance.
1706
1707 """
1708 HPATH = "mirrors-replace"
1709 HTYPE = constants.HTYPE_INSTANCE
1710 REQ_BGL = False
1711
1729
1731 self._ExpandAndLockInstance()
1732
1733 assert locking.LEVEL_NODE not in self.needed_locks
1734 assert locking.LEVEL_NODE_RES not in self.needed_locks
1735 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1736
1737 assert self.op.iallocator is None or self.op.remote_node is None, \
1738 "Conflicting options"
1739
1740 if self.op.remote_node is not None:
1741 (self.op.remote_node_uuid, self.op.remote_node) = \
1742 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1743 self.op.remote_node)
1744
1745
1746
1747
1748
1749 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1750 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1751 else:
1752 self.needed_locks[locking.LEVEL_NODE] = []
1753 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1754
1755 if self.op.iallocator is not None:
1756
1757 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1758 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1759
1760 self.needed_locks[locking.LEVEL_NODE_RES] = []
1761
1762 self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1763 self.op.instance_name, self.op.mode,
1764 self.op.iallocator, self.op.remote_node_uuid,
1765 self.op.disks, self.op.early_release,
1766 self.op.ignore_ipolicy)
1767
1768 self.tasklets = [self.replacer]
1769
1771 if level == locking.LEVEL_NODEGROUP:
1772 assert self.op.remote_node_uuid is None
1773 assert self.op.iallocator is not None
1774 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1775
1776 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1777
1778
1779 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1780 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1781
1782 elif level == locking.LEVEL_NODE:
1783 if self.op.iallocator is not None:
1784 assert self.op.remote_node_uuid is None
1785 assert not self.needed_locks[locking.LEVEL_NODE]
1786 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1787
1788
1789 self.needed_locks[locking.LEVEL_NODE] = \
1790 [node_uuid
1791 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1792 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1793 else:
1794 self._LockInstancesNodes()
1795
1796 elif level == locking.LEVEL_NODE_RES:
1797
1798 self.needed_locks[locking.LEVEL_NODE_RES] = \
1799 self.needed_locks[locking.LEVEL_NODE]
1800
1816
1818 """Build hooks nodes.
1819
1820 """
1821 instance = self.replacer.instance
1822 nl = [
1823 self.cfg.GetMasterNode(),
1824 instance.primary_node,
1825 ]
1826 if self.op.remote_node_uuid is not None:
1827 nl.append(self.op.remote_node_uuid)
1828 return nl, nl
1829
1840
1843 """Bring up an instance's disks.
1844
1845 """
1846 REQ_BGL = False
1847
1852
1856
1858 """Check prerequisites.
1859
1860 This checks that the instance is in the cluster.
1861
1862 """
1863 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1864 assert self.instance is not None, \
1865 "Cannot retrieve locked instance %s" % self.op.instance_name
1866 CheckNodeOnline(self, self.instance.primary_node)
1867
1868 - def Exec(self, feedback_fn):
1884
1887 """Shutdown an instance's disks.
1888
1889 """
1890 REQ_BGL = False
1891
1896
1900
1902 """Check prerequisites.
1903
1904 This checks that the instance is in the cluster.
1905
1906 """
1907 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1908 assert self.instance is not None, \
1909 "Cannot retrieve locked instance %s" % self.op.instance_name
1910
1911 - def Exec(self, feedback_fn):
1919
1923 """Check that mirrors are not degraded.
1924
1925 @attention: The device has to be annotated already.
1926
1927 The ldisk parameter, if True, will change the test from the
1928 is_degraded attribute (which represents overall non-ok status for
1929 the device(s)) to the ldisk (representing the local storage status).
1930
1931 """
1932 result = True
1933
1934 if on_primary or dev.AssembleOnSecondary():
1935 rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
1936 msg = rstats.fail_msg
1937 if msg:
1938 lu.LogWarning("Can't find disk on node %s: %s",
1939 lu.cfg.GetNodeName(node_uuid), msg)
1940 result = False
1941 elif not rstats.payload:
1942 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1943 result = False
1944 else:
1945 if ldisk:
1946 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1947 else:
1948 result = result and not rstats.payload.is_degraded
1949
1950 if dev.children:
1951 for child in dev.children:
1952 result = result and _CheckDiskConsistencyInner(lu, instance, child,
1953 node_uuid, on_primary)
1954
1955 return result
1956
1965
1968 """Wrapper around call_blockdev_find to annotate diskparams.
1969
1970 @param lu: A reference to the lu object
1971 @param node_uuid: The node to call out
1972 @param dev: The device to find
1973 @param instance: The instance object the device belongs to
1974 @returns The result of the rpc call
1975
1976 """
1977 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1978 return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1979
1982 """Generate a suitable LV name.
1983
1984 This will generate a logical volume name for the given instance.
1985
1986 """
1987 results = []
1988 for val in exts:
1989 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1990 results.append("%s%s" % (new_id, val))
1991 return results
1992
1995 """Replaces disks for an instance.
1996
1997 Note: Locking is not within the scope of this class.
1998
1999 """
2000 - def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
2001 remote_node_uuid, disks, early_release, ignore_ipolicy):
2002 """Initializes this class.
2003
2004 """
2005 Tasklet.__init__(self, lu)
2006
2007
2008 self.instance_uuid = instance_uuid
2009 self.instance_name = instance_name
2010 self.mode = mode
2011 self.iallocator_name = iallocator_name
2012 self.remote_node_uuid = remote_node_uuid
2013 self.disks = disks
2014 self.early_release = early_release
2015 self.ignore_ipolicy = ignore_ipolicy
2016
2017
2018 self.instance = None
2019 self.new_node_uuid = None
2020 self.target_node_uuid = None
2021 self.other_node_uuid = None
2022 self.remote_node_info = None
2023 self.node_secondary_ip = None
2024
2025 @staticmethod
2026 - def _RunAllocator(lu, iallocator_name, instance_uuid,
2027 relocate_from_node_uuids):
2028 """Compute a new secondary node using an IAllocator.
2029
2030 """
2031 req = iallocator.IAReqRelocate(
2032 inst_uuid=instance_uuid,
2033 relocate_from_node_uuids=list(relocate_from_node_uuids))
2034 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
2035
2036 ial.Run(iallocator_name)
2037
2038 if not ial.success:
2039 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
2040 " %s" % (iallocator_name, ial.info),
2041 errors.ECODE_NORES)
2042
2043 remote_node_name = ial.result[0]
2044 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
2045
2046 if remote_node is None:
2047 raise errors.OpPrereqError("Node %s not found in configuration" %
2048 remote_node_name, errors.ECODE_NOENT)
2049
2050 lu.LogInfo("Selected new secondary for instance '%s': %s",
2051 instance_uuid, remote_node_name)
2052
2053 return remote_node.uuid
2054
2061
2063 """Checks if the instance disks are activated.
2064
2065 @param instance: The instance to check disks
2066 @return: True if they are activated, False otherwise
2067
2068 """
2069 node_uuids = self.cfg.GetInstanceNodes(instance.uuid)
2070
2071 for idx, dev in enumerate(self.cfg.GetInstanceDisks(instance.uuid)):
2072 for node_uuid in node_uuids:
2073 self.lu.LogInfo("Checking disk/%d on %s", idx,
2074 self.cfg.GetNodeName(node_uuid))
2075
2076 result = _BlockdevFind(self, node_uuid, dev, instance)
2077
2078 if result.offline:
2079 continue
2080 elif result.fail_msg or not result.payload:
2081 return False
2082
2083 return True
2084
2086 """Check prerequisites.
2087
2088 This checks that the instance is in the cluster.
2089
2090 """
2091 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
2092 assert self.instance is not None, \
2093 "Cannot retrieve locked instance %s" % self.instance_name
2094
2095 if self.instance.disk_template != constants.DT_DRBD8:
2096 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
2097 " instances", errors.ECODE_INVAL)
2098
2099 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid)
2100 if len(secondary_nodes) != 1:
2101 raise errors.OpPrereqError("The instance has a strange layout,"
2102 " expected one secondary but found %d" %
2103 len(secondary_nodes),
2104 errors.ECODE_FAULT)
2105
2106 secondary_node_uuid = secondary_nodes[0]
2107
2108 if self.iallocator_name is None:
2109 remote_node_uuid = self.remote_node_uuid
2110 else:
2111 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
2112 self.instance.uuid,
2113 secondary_nodes)
2114
2115 if remote_node_uuid is None:
2116 self.remote_node_info = None
2117 else:
2118 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
2119 "Remote node '%s' is not locked" % remote_node_uuid
2120
2121 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
2122 assert self.remote_node_info is not None, \
2123 "Cannot retrieve locked node %s" % remote_node_uuid
2124
2125 if remote_node_uuid == self.instance.primary_node:
2126 raise errors.OpPrereqError("The specified node is the primary node of"
2127 " the instance", errors.ECODE_INVAL)
2128
2129 if remote_node_uuid == secondary_node_uuid:
2130 raise errors.OpPrereqError("The specified node is already the"
2131 " secondary node of the instance",
2132 errors.ECODE_INVAL)
2133
2134 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2135 constants.REPLACE_DISK_CHG):
2136 raise errors.OpPrereqError("Cannot specify disks to be replaced",
2137 errors.ECODE_INVAL)
2138
2139 if self.mode == constants.REPLACE_DISK_AUTO:
2140 if not self._CheckDisksActivated(self.instance):
2141 raise errors.OpPrereqError("Please run activate-disks on instance %s"
2142 " first" % self.instance_name,
2143 errors.ECODE_STATE)
2144 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2145 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2146
2147 if faulty_primary and faulty_secondary:
2148 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2149 " one node and can not be repaired"
2150 " automatically" % self.instance_name,
2151 errors.ECODE_STATE)
2152
2153 if faulty_primary:
2154 self.disks = faulty_primary
2155 self.target_node_uuid = self.instance.primary_node
2156 self.other_node_uuid = secondary_node_uuid
2157 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2158 elif faulty_secondary:
2159 self.disks = faulty_secondary
2160 self.target_node_uuid = secondary_node_uuid
2161 self.other_node_uuid = self.instance.primary_node
2162 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2163 else:
2164 self.disks = []
2165 check_nodes = []
2166
2167 else:
2168
2169 if self.mode == constants.REPLACE_DISK_PRI:
2170 self.target_node_uuid = self.instance.primary_node
2171 self.other_node_uuid = secondary_node_uuid
2172 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2173
2174 elif self.mode == constants.REPLACE_DISK_SEC:
2175 self.target_node_uuid = secondary_node_uuid
2176 self.other_node_uuid = self.instance.primary_node
2177 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2178
2179 elif self.mode == constants.REPLACE_DISK_CHG:
2180 self.new_node_uuid = remote_node_uuid
2181 self.other_node_uuid = self.instance.primary_node
2182 self.target_node_uuid = secondary_node_uuid
2183 check_nodes = [self.new_node_uuid, self.other_node_uuid]
2184
2185 CheckNodeNotDrained(self.lu, remote_node_uuid)
2186 CheckNodeVmCapable(self.lu, remote_node_uuid)
2187
2188 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2189 assert old_node_info is not None
2190 if old_node_info.offline and not self.early_release:
2191
2192 self.early_release = True
2193 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2194 " early-release mode", secondary_node_uuid)
2195
2196 else:
2197 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2198 self.mode)
2199
2200
2201 if not self.disks:
2202 self.disks = range(len(self.instance.disks))
2203
2204
2205
2206 if self.remote_node_info:
2207
2208 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2209 cluster = self.cfg.GetClusterInfo()
2210 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2211 new_group_info)
2212 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance,
2213 self.remote_node_info, self.cfg,
2214 ignore=self.ignore_ipolicy)
2215
2216 for node_uuid in check_nodes:
2217 CheckNodeOnline(self.lu, node_uuid)
2218
2219 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2220 self.other_node_uuid,
2221 self.target_node_uuid]
2222 if node_uuid is not None)
2223
2224
2225 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2226 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2227 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2228
2229
2230 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2231
2232
2233 for disk_idx in self.disks:
2234 self.instance.FindDisk(disk_idx)
2235
2236
2237 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2238 in self.cfg.GetMultiNodeInfo(touched_nodes))
2239
2240 - def Exec(self, feedback_fn):
2241 """Execute disk replacement.
2242
2243 This dispatches the disk replacement to the appropriate handler.
2244
2245 """
2246 if __debug__:
2247
2248 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2249 assert set(owned_nodes) == set(self.node_secondary_ip), \
2250 ("Incorrect node locks, owning %s, expected %s" %
2251 (owned_nodes, self.node_secondary_ip.keys()))
2252 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2253 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2254
2255 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2256 assert list(owned_instances) == [self.instance_name], \
2257 "Instance '%s' not locked" % self.instance_name
2258
2259 if not self.disks:
2260 feedback_fn("No disks need replacement for instance '%s'" %
2261 self.instance.name)
2262 return
2263
2264 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2265 (utils.CommaJoin(self.disks), self.instance.name))
2266 feedback_fn("Current primary node: %s" %
2267 self.cfg.GetNodeName(self.instance.primary_node))
2268 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid)
2269 feedback_fn("Current secondary node: %s" %
2270 utils.CommaJoin(self.cfg.GetNodeNames(secondary_nodes)))
2271
2272 activate_disks = not self.instance.disks_active
2273
2274
2275 if activate_disks:
2276 StartInstanceDisks(self.lu, self.instance, True)
2277
2278 self.instance = self.cfg.GetInstanceInfo(self.instance.uuid)
2279
2280 try:
2281
2282 if self.new_node_uuid is not None:
2283 fn = self._ExecDrbd8Secondary
2284 else:
2285 fn = self._ExecDrbd8DiskOnly
2286
2287 result = fn(feedback_fn)
2288 finally:
2289
2290
2291 if activate_disks:
2292 _SafeShutdownInstanceDisks(self.lu, self.instance,
2293 req_states=INSTANCE_NOT_RUNNING)
2294
2295 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2296
2297 if __debug__:
2298
2299 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2300 nodes = frozenset(self.node_secondary_ip)
2301 assert ((self.early_release and not owned_nodes) or
2302 (not self.early_release and not (set(owned_nodes) - nodes))), \
2303 ("Not owning the correct locks, early_release=%s, owned=%r,"
2304 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2305
2306 return result
2307
2309 self.lu.LogInfo("Checking volume groups")
2310
2311 vgname = self.cfg.GetVGName()
2312
2313
2314 results = self.rpc.call_vg_list(node_uuids)
2315 if not results:
2316 raise errors.OpExecError("Can't list volume groups on the nodes")
2317
2318 for node_uuid in node_uuids:
2319 res = results[node_uuid]
2320 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2321 if vgname not in res.payload:
2322 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2323 (vgname, self.cfg.GetNodeName(node_uuid)))
2324
2326
2327 for idx, dev in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)):
2328 if idx not in self.disks:
2329 continue
2330
2331 for node_uuid in node_uuids:
2332 self.lu.LogInfo("Checking disk/%d on %s", idx,
2333 self.cfg.GetNodeName(node_uuid))
2334
2335 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2336
2337 msg = result.fail_msg
2338 if msg or not result.payload:
2339 if not msg:
2340 msg = "disk not found"
2341 if not self._CheckDisksActivated(self.instance):
2342 extra_hint = ("\nDisks seem to be not properly activated. Try"
2343 " running activate-disks on the instance before"
2344 " using replace-disks.")
2345 else:
2346 extra_hint = ""
2347 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2348 (idx, self.cfg.GetNodeName(node_uuid), msg,
2349 extra_hint))
2350
2365
2367 """Create new storage on the primary or secondary node.
2368
2369 This is only used for same-node replaces, not for changing the
2370 secondary node, hence we don't want to modify the existing disk.
2371
2372 """
2373 iv_names = {}
2374
2375 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
2376 disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg)
2377 for idx, dev in enumerate(disks):
2378 if idx not in self.disks:
2379 continue
2380
2381 self.lu.LogInfo("Adding storage on %s for disk/%d",
2382 self.cfg.GetNodeName(node_uuid), idx)
2383
2384 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2385 names = _GenerateUniqueNames(self.lu, lv_names)
2386
2387 (data_disk, meta_disk) = dev.children
2388 vg_data = data_disk.logical_id[0]
2389 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2390 logical_id=(vg_data, names[0]),
2391 params=data_disk.params)
2392 vg_meta = meta_disk.logical_id[0]
2393 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2394 size=constants.DRBD_META_SIZE,
2395 logical_id=(vg_meta, names[1]),
2396 params=meta_disk.params)
2397
2398 new_lvs = [lv_data, lv_meta]
2399 old_lvs = [child.Copy() for child in dev.children]
2400 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2401 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2402
2403
2404 for new_lv in new_lvs:
2405 try:
2406 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2407 GetInstanceInfoText(self.instance), False,
2408 excl_stor)
2409 except errors.DeviceCreationError, e:
2410 raise errors.OpExecError("Can't create block device: %s" % e.message)
2411
2412 return iv_names
2413
2415 for name, (dev, _, _) in iv_names.iteritems():
2416 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2417
2418 msg = result.fail_msg
2419 if msg or not result.payload:
2420 if not msg:
2421 msg = "disk not found"
2422 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2423 (name, msg))
2424
2425 if result.payload.is_degraded:
2426 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2427
2429 for name, (_, old_lvs, _) in iv_names.iteritems():
2430 self.lu.LogInfo("Remove logical volumes for %s", name)
2431
2432 for lv in old_lvs:
2433 msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
2434 .fail_msg
2435 if msg:
2436 self.lu.LogWarning("Can't remove old LV: %s", msg,
2437 hint="remove unused LVs manually")
2438
2440 """Replace a disk on the primary or secondary for DRBD 8.
2441
2442 The algorithm for replace is quite complicated:
2443
2444 1. for each disk to be replaced:
2445
2446 1. create new LVs on the target node with unique names
2447 1. detach old LVs from the drbd device
2448 1. rename old LVs to name_replaced.<time_t>
2449 1. rename new LVs to old LVs
2450 1. attach the new LVs (with the old names now) to the drbd device
2451
2452 1. wait for sync across all devices
2453
2454 1. for each modified disk:
2455
2456 1. remove old LVs (which have the name name_replaces.<time_t>)
2457
2458 Failures are not very well handled.
2459
2460 """
2461 steps_total = 6
2462
2463
2464 self.lu.LogStep(1, steps_total, "Check device existence")
2465 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2466 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2467
2468
2469 self.lu.LogStep(2, steps_total, "Check peer consistency")
2470 self._CheckDisksConsistency(
2471 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2472 False)
2473
2474
2475 self.lu.LogStep(3, steps_total, "Allocate new storage")
2476 iv_names = self._CreateNewStorage(self.target_node_uuid)
2477
2478
2479 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2480 for dev, old_lvs, new_lvs in iv_names.itervalues():
2481 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2482
2483 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
2484 (dev, self.instance),
2485 (old_lvs, self.instance))
2486 result.Raise("Can't detach drbd from local storage on node"
2487 " %s for device %s" %
2488 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498 temp_suffix = int(time.time())
2499 ren_fn = lambda d, suff: (d.logical_id[0],
2500 d.logical_id[1] + "_replaced-%s" % suff)
2501
2502
2503 rename_old_to_new = []
2504 for to_ren in old_lvs:
2505 result = self.rpc.call_blockdev_find(self.target_node_uuid,
2506 (to_ren, self.instance))
2507 if not result.fail_msg and result.payload:
2508
2509 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2510
2511 self.lu.LogInfo("Renaming the old LVs on the target node")
2512 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2513 rename_old_to_new)
2514 result.Raise("Can't rename old LVs on node %s" %
2515 self.cfg.GetNodeName(self.target_node_uuid))
2516
2517
2518 self.lu.LogInfo("Renaming the new LVs on the target node")
2519 rename_new_to_old = [(new, old.logical_id)
2520 for old, new in zip(old_lvs, new_lvs)]
2521 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2522 rename_new_to_old)
2523 result.Raise("Can't rename new LVs on node %s" %
2524 self.cfg.GetNodeName(self.target_node_uuid))
2525
2526
2527 for old, new in zip(old_lvs, new_lvs):
2528 new.logical_id = old.logical_id
2529
2530
2531
2532
2533 for disk in old_lvs:
2534 disk.logical_id = ren_fn(disk, temp_suffix)
2535
2536
2537 self.lu.LogInfo("Adding new mirror component on %s",
2538 self.cfg.GetNodeName(self.target_node_uuid))
2539 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2540 (dev, self.instance),
2541 (new_lvs, self.instance))
2542 msg = result.fail_msg
2543 if msg:
2544 for new_lv in new_lvs:
2545 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2546 (new_lv, self.instance)).fail_msg
2547 if msg2:
2548 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2549 hint=("cleanup manually the unused logical"
2550 "volumes"))
2551 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2552
2553 cstep = itertools.count(5)
2554
2555 if self.early_release:
2556 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2557 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2558
2559 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2560 else:
2561
2562 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2563 keep=self.node_secondary_ip.keys())
2564
2565
2566 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2567
2568
2569
2570
2571
2572
2573
2574 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2575 WaitForSync(self.lu, self.instance)
2576
2577
2578 self._CheckDevices(self.instance.primary_node, iv_names)
2579
2580
2581 if not self.early_release:
2582 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2583 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2584
2586 """Replace the secondary node for DRBD 8.
2587
2588 The algorithm for replace is quite complicated:
2589 - for all disks of the instance:
2590 - create new LVs on the new node with same names
2591 - shutdown the drbd device on the old secondary
2592 - disconnect the drbd network on the primary
2593 - create the drbd device on the new secondary
2594 - network attach the drbd on the primary, using an artifice:
2595 the drbd code for Attach() will connect to the network if it
2596 finds a device which is connected to the good local disks but
2597 not network enabled
2598 - wait for sync across all devices
2599 - remove all disks from the old secondary
2600
2601 Failures are not very well handled.
2602
2603 """
2604 steps_total = 6
2605
2606 pnode = self.instance.primary_node
2607
2608
2609 self.lu.LogStep(1, steps_total, "Check device existence")
2610 self._CheckDisksExistence([self.instance.primary_node])
2611 self._CheckVolumeGroup([self.instance.primary_node])
2612
2613
2614 self.lu.LogStep(2, steps_total, "Check peer consistency")
2615 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2616
2617
2618 self.lu.LogStep(3, steps_total, "Allocate new storage")
2619 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
2620 disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg)
2621 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2622 self.new_node_uuid)
2623 for idx, dev in enumerate(disks):
2624 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2625 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2626
2627 for new_lv in dev.children:
2628 try:
2629 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2630 new_lv, True, GetInstanceInfoText(self.instance),
2631 False, excl_stor)
2632 except errors.DeviceCreationError, e:
2633 raise errors.OpExecError("Can't create block device: %s" % e.message)
2634
2635
2636
2637
2638 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2639 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2640 for _ in inst_disks],
2641 self.instance.uuid)
2642 logging.debug("Allocated minors %r", minors)
2643
2644 iv_names = {}
2645 for idx, (dev, new_minor) in enumerate(zip(inst_disks, minors)):
2646 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2647 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2648
2649
2650
2651
2652 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2653 if self.instance.primary_node == o_node1:
2654 p_minor = o_minor1
2655 else:
2656 assert self.instance.primary_node == o_node2, "Three-node instance?"
2657 p_minor = o_minor2
2658
2659 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2660 p_minor, new_minor, o_secret)
2661 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2662 p_minor, new_minor, o_secret)
2663
2664 iv_names[idx] = (dev, dev.children, new_net_id)
2665 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2666 new_net_id)
2667 new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2668 logical_id=new_alone_id,
2669 children=dev.children,
2670 size=dev.size,
2671 params={})
2672 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2673 self.cfg)
2674 try:
2675 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2676 anno_new_drbd,
2677 GetInstanceInfoText(self.instance), False,
2678 excl_stor)
2679 except errors.GenericError:
2680 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2681 raise
2682
2683
2684 for idx, dev in enumerate(inst_disks):
2685 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2686 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2687 (dev, self.instance)).fail_msg
2688 if msg:
2689 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2690 "node: %s" % (idx, msg),
2691 hint=("Please cleanup this device manually as"
2692 " soon as possible"))
2693
2694 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2695 result = self.rpc.call_drbd_disconnect_net(
2696 [pnode], (inst_disks, self.instance))[pnode]
2697
2698 msg = result.fail_msg
2699 if msg:
2700
2701 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2702 raise errors.OpExecError("Can't detach the disks from the network on"
2703 " old node: %s" % (msg,))
2704
2705
2706
2707 self.lu.LogInfo("Updating instance configuration")
2708 for dev, _, new_logical_id in iv_names.itervalues():
2709 dev.logical_id = new_logical_id
2710 self.cfg.Update(dev, feedback_fn)
2711
2712 self.cfg.Update(self.instance, feedback_fn)
2713
2714
2715 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2716
2717
2718 self.lu.LogInfo("Attaching primary drbds to new secondary"
2719 " (standalone => connected)")
2720 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
2721 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2722 self.new_node_uuid],
2723 (inst_disks, self.instance),
2724 self.instance.name,
2725 False)
2726 for to_node, to_result in result.items():
2727 msg = to_result.fail_msg
2728 if msg:
2729 raise errors.OpExecError(
2730 "Can't attach drbd disks on node %s: %s (please do a gnt-instance "
2731 "info %s to see the status of disks)" %
2732 (self.cfg.GetNodeName(to_node), msg, self.instance.name))
2733
2734 cstep = itertools.count(5)
2735
2736 if self.early_release:
2737 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2738 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2739
2740 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2741 else:
2742
2743 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2744 keep=self.node_secondary_ip.keys())
2745
2746
2747
2748
2749
2750
2751
2752 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2753 WaitForSync(self.lu, self.instance)
2754
2755
2756 self._CheckDevices(self.instance.primary_node, iv_names)
2757
2758
2759 if not self.early_release:
2760 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2761 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2762
2765 """ Creates a new temporary bootable disk, and makes sure it is destroyed.
2766
2767 Is a context manager, and should be used with the ``with`` statement as such.
2768
2769 The disk is guaranteed to be created at index 0, shifting any other disks of
2770 the instance by one place, and allowing the instance to be booted with the
2771 content of the disk.
2772
2773 """
2774
2777 """ Constructor storing arguments until used later.
2778
2779 @type lu: L{ganeti.cmdlib.base.LogicalUnit}
2780 @param lu: The LU within which this disk is created.
2781
2782 @type instance: L{ganeti.objects.Instance}
2783 @param instance: The instance to which the disk should be added
2784
2785 @type disks: list of triples (disk template, disk access mode, int)
2786 @param disks:
2787 disk specification, which is a list of triples containing the
2788 disk template (e.g., L{constants.DT_PLAIN}), the disk access
2789 mode (i.e., L{constants.DISK_RDONLY} or L{constants.DISK_RDWR}),
2790 and size in MiB.
2791
2792 @type feedback_fn: function
2793 @param feedback_fn: Function used to log progress
2794
2795 """
2796 self._lu = lu
2797 self._instance = instance
2798 self._disks = disks
2799 self._feedback_fn = feedback_fn
2800 self._shutdown_timeout = shutdown_timeout
2801
2803 """ Ensures that the instance is down, and its disks inactive.
2804
2805 All the operations related to the creation and destruction of disks require
2806 that the instance is down and that the disks are inactive. This function is
2807 invoked to make it so.
2808
2809 """
2810
2811
2812
2813 self._feedback_fn("Shutting down instance")
2814 result = self._lu.rpc.call_instance_shutdown(self._instance.primary_node,
2815 self._instance,
2816 self._shutdown_timeout,
2817 self._lu.op.reason)
2818 result.Raise("Shutdown of instance '%s' while removing temporary disk "
2819 "failed" % self._instance.name)
2820
2821
2822
2823 if self._instance.disks_active:
2824 self._feedback_fn("Deactivating disks")
2825 ShutdownInstanceDisks(self._lu, self._instance)
2826
2828 """ Context manager entry function, creating the disk.
2829
2830 @rtype: L{ganeti.objects.Disk}
2831 @return: The disk object created.
2832
2833 """
2834 self._EnsureInstanceDiskState()
2835
2836 new_disks = []
2837
2838
2839
2840
2841
2842
2843 for idx, (disk_template, disk_access, disk_size) in enumerate(self._disks):
2844 new_disk = objects.Disk()
2845 new_disk.dev_type = disk_template
2846 new_disk.mode = disk_access
2847 new_disk.uuid = self._lu.cfg.GenerateUniqueID(self._lu.proc.GetECId())
2848 new_disk.logical_id = (self._lu.cfg.GetVGName(), new_disk.uuid)
2849 new_disk.params = {}
2850 new_disk.size = disk_size
2851
2852 new_disks.append(new_disk)
2853
2854 self._feedback_fn("Attempting to create temporary disk")
2855
2856 self._undoing_info = CreateDisks(self._lu, self._instance, disks=new_disks)
2857 for idx, new_disk in enumerate(new_disks):
2858 self._lu.cfg.AddInstanceDisk(self._instance.uuid, new_disk, idx=idx)
2859 self._instance = self._lu.cfg.GetInstanceInfo(self._instance.uuid)
2860
2861 self._feedback_fn("Temporary disk created")
2862
2863 self._new_disks = new_disks
2864
2865 return new_disks
2866
2867 - def __exit__(self, exc_type, _value, _traceback):
2868 """ Context manager exit function, destroying the disk.
2869
2870 """
2871 if exc_type:
2872 self._feedback_fn("Exception raised, cleaning up temporary disk")
2873 else:
2874 self._feedback_fn("Regular cleanup of temporary disk")
2875
2876 try:
2877 self._EnsureInstanceDiskState()
2878
2879 _UndoCreateDisks(self._lu, self._undoing_info, self._instance)
2880
2881 for disk in self._new_disks:
2882 self._lu.cfg.RemoveInstanceDisk(self._instance.uuid, disk.uuid)
2883 self._instance = self._lu.cfg.GetInstanceInfo(self._instance.uuid)
2884
2885 self._feedback_fn("Temporary disk removed")
2886 except:
2887 self._feedback_fn("Disk cleanup failed; it will have to be removed "
2888 "manually")
2889 raise
2890