1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with storage of instances."""
32
33 import itertools
34 import logging
35 import os
36 import time
37
38 from ganeti import compat
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ht
42 from ganeti import locking
43 from ganeti.masterd import iallocator
44 from ganeti import objects
45 from ganeti import utils
46 import ganeti.rpc.node as rpc
47 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
48 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
49 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
50 ComputeIPolicyDiskSizesViolation, \
51 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
52 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
53 CheckDiskTemplateEnabled
54 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
55 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
56 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
57
58 import ganeti.masterd.instance
59
60
61 _DISK_TEMPLATE_NAME_PREFIX = {
62 constants.DT_PLAIN: "",
63 constants.DT_RBD: ".rbd",
64 constants.DT_EXT: ".ext",
65 constants.DT_FILE: ".file",
66 constants.DT_SHARED_FILE: ".sharedfile",
67 }
72 """Create a single block device on a given node.
73
74 This will not recurse over children of the device, so they must be
75 created in advance.
76
77 @param lu: the lu on whose behalf we execute
78 @param node_uuid: the node on which to create the device
79 @type instance: L{objects.Instance}
80 @param instance: the instance which owns the device
81 @type device: L{objects.Disk}
82 @param device: the device to create
83 @param info: the extra 'metadata' we should attach to the device
84 (this will be represented as a LVM tag)
85 @type force_open: boolean
86 @param force_open: this parameter will be passes to the
87 L{backend.BlockdevCreate} function where it specifies
88 whether we run on primary or not, and it affects both
89 the child assembly and the device own Open() execution
90 @type excl_stor: boolean
91 @param excl_stor: Whether exclusive_storage is active for the node
92
93 """
94 result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
95 device.size, instance.name, force_open,
96 info, excl_stor)
97 result.Raise("Can't create block device %s on"
98 " node %s for instance %s" % (device,
99 lu.cfg.GetNodeName(node_uuid),
100 instance.name))
101
102
103 -def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
104 info, force_open, excl_stor):
105 """Create a tree of block devices on a given node.
106
107 If this device type has to be created on secondaries, create it and
108 all its children.
109
110 If not, just recurse to children keeping the same 'force' value.
111
112 @attention: The device has to be annotated already.
113
114 @param lu: the lu on whose behalf we execute
115 @param node_uuid: the node on which to create the device
116 @type instance: L{objects.Instance}
117 @param instance: the instance which owns the device
118 @type device: L{objects.Disk}
119 @param device: the device to create
120 @type force_create: boolean
121 @param force_create: whether to force creation of this device; this
122 will be change to True whenever we find a device which has
123 CreateOnSecondary() attribute
124 @param info: the extra 'metadata' we should attach to the device
125 (this will be represented as a LVM tag)
126 @type force_open: boolean
127 @param force_open: this parameter will be passes to the
128 L{backend.BlockdevCreate} function where it specifies
129 whether we run on primary or not, and it affects both
130 the child assembly and the device own Open() execution
131 @type excl_stor: boolean
132 @param excl_stor: Whether exclusive_storage is active for the node
133
134 @return: list of created devices
135 """
136 created_devices = []
137 try:
138 if device.CreateOnSecondary():
139 force_create = True
140
141 if device.children:
142 for child in device.children:
143 devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
144 force_create, info, force_open, excl_stor)
145 created_devices.extend(devs)
146
147 if not force_create:
148 return created_devices
149
150 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
151 excl_stor)
152
153
154 created_devices = [(node_uuid, device)]
155 return created_devices
156
157 except errors.DeviceCreationError, e:
158 e.created_devices.extend(created_devices)
159 raise e
160 except errors.OpExecError, e:
161 raise errors.DeviceCreationError(str(e), created_devices)
162
165 """Whether exclusive_storage is in effect for the given node.
166
167 @type cfg: L{config.ConfigWriter}
168 @param cfg: The cluster configuration
169 @type node_uuid: string
170 @param node_uuid: The node UUID
171 @rtype: bool
172 @return: The effective value of exclusive_storage
173 @raise errors.OpPrereqError: if no node exists with the given name
174
175 """
176 ni = cfg.GetNodeInfo(node_uuid)
177 if ni is None:
178 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
179 errors.ECODE_NOENT)
180 return IsExclusiveStorageEnabledNode(cfg, ni)
181
182
183 -def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
184 force_open):
185 """Wrapper around L{_CreateBlockDevInner}.
186
187 This method annotates the root device first.
188
189 """
190 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
191 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
192 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
193 force_open, excl_stor)
194
197 """Undo the work performed by L{CreateDisks}.
198
199 This function is called in case of an error to undo the work of
200 L{CreateDisks}.
201
202 @type lu: L{LogicalUnit}
203 @param lu: the logical unit on whose behalf we execute
204 @param disks_created: the result returned by L{CreateDisks}
205 @type instance: L{objects.Instance}
206 @param instance: the instance for which disks were created
207
208 """
209 for (node_uuid, disk) in disks_created:
210 result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
211 result.Warn("Failed to remove newly-created disk %s on node %s" %
212 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
213
214
215 -def CreateDisks(lu, instance, disk_template=None,
216 to_skip=None, target_node_uuid=None, disks=None):
217 """Create all disks for an instance.
218
219 This abstracts away some work from AddInstance.
220
221 Since the instance may not have been saved to the config file yet, this
222 function can not query the config file for the instance's disks; in that
223 case they need to be passed as an argument.
224
225 This function is also used by the disk template conversion mechanism to
226 create the new disks of the instance. Since the instance will have the
227 old template at the time we create the new disks, the new template must
228 be passed as an extra argument.
229
230 @type lu: L{LogicalUnit}
231 @param lu: the logical unit on whose behalf we execute
232 @type instance: L{objects.Instance}
233 @param instance: the instance whose disks we should create
234 @type disk_template: string
235 @param disk_template: if provided, overrides the instance's disk_template
236 @type to_skip: list
237 @param to_skip: list of indices to skip
238 @type target_node_uuid: string
239 @param target_node_uuid: if passed, overrides the target node for creation
240 @type disks: list of {objects.Disk}
241 @param disks: the disks to create; if not specified, all the disks of the
242 instance are created
243 @return: information about the created disks, to be used to call
244 L{_UndoCreateDisks}
245 @raise errors.OpPrereqError: in case of error
246
247 """
248 info = GetInstanceInfoText(instance)
249
250 if disks is None:
251 disks = lu.cfg.GetInstanceDisks(instance.uuid)
252
253 if target_node_uuid is None:
254 pnode_uuid = instance.primary_node
255
256
257
258 all_node_uuids = []
259 for disk in disks:
260 all_node_uuids.extend(disk.all_nodes)
261 all_node_uuids = set(all_node_uuids)
262
263 all_node_uuids.discard(pnode_uuid)
264 all_node_uuids = [pnode_uuid] + list(all_node_uuids)
265 else:
266 pnode_uuid = target_node_uuid
267 all_node_uuids = [pnode_uuid]
268
269 if disk_template is None:
270 disk_template = utils.GetDiskTemplate(disks)
271 if disk_template == constants.DT_MIXED:
272 raise errors.OpExecError("Creating disk for '%s' instances "
273 "only possible with explicit disk template."
274 % (constants.DT_MIXED,))
275
276 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), disk_template)
277
278 if disk_template in constants.DTS_FILEBASED:
279 file_storage_dir = os.path.dirname(disks[0].logical_id[1])
280 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
281
282 result.Raise("Failed to create directory '%s' on"
283 " node %s" % (file_storage_dir,
284 lu.cfg.GetNodeName(pnode_uuid)))
285
286 disks_created = []
287 for idx, device in enumerate(disks):
288 if to_skip and idx in to_skip:
289 continue
290 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
291 for node_uuid in all_node_uuids:
292 f_create = node_uuid == pnode_uuid
293 try:
294 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
295 f_create)
296 disks_created.append((node_uuid, device))
297 except errors.DeviceCreationError, e:
298 logging.warning("Creating disk %s for instance '%s' failed",
299 idx, instance.name)
300 disks_created.extend(e.created_devices)
301 _UndoCreateDisks(lu, disks_created, instance)
302 raise errors.OpExecError(e.message)
303 return disks_created
304
307 """Compute disk size requirements in the volume group
308
309 """
310 def _compute(disks, payload):
311 """Universal algorithm.
312
313 """
314 vgs = {}
315 for disk in disks:
316 vg_name = disk[constants.IDISK_VG]
317 vgs[vg_name] = \
318 vgs.get(vg_name, 0) + disk[constants.IDISK_SIZE] + payload
319
320 return vgs
321
322
323 req_size_dict = {
324 constants.DT_DISKLESS: {},
325 constants.DT_PLAIN: _compute(disks, 0),
326
327 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
328 constants.DT_FILE: {},
329 constants.DT_SHARED_FILE: {},
330 constants.DT_GLUSTER: {},
331 }
332
333 if disk_template not in req_size_dict:
334 raise errors.ProgrammerError("Disk template '%s' size requirement"
335 " is unknown" % disk_template)
336
337 return req_size_dict[disk_template]
338
341 """Computes the instance disks.
342
343 @type disks: list of dictionaries
344 @param disks: The disks' input dictionary
345 @type disk_template: string
346 @param disk_template: The disk template of the instance
347 @type default_vg: string
348 @param default_vg: The default_vg to assume
349
350 @return: The computed disks
351
352 """
353 new_disks = []
354 for disk in disks:
355 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
356 if mode not in constants.DISK_ACCESS_SET:
357 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
358 mode, errors.ECODE_INVAL)
359 size = disk.get(constants.IDISK_SIZE, None)
360 if size is None:
361 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
362 try:
363 size = int(size)
364 except (TypeError, ValueError):
365 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
366 errors.ECODE_INVAL)
367
368 CheckDiskExtProvider(disk, disk_template)
369
370 data_vg = disk.get(constants.IDISK_VG, default_vg)
371 name = disk.get(constants.IDISK_NAME, None)
372 if name is not None and name.lower() == constants.VALUE_NONE:
373 name = None
374 new_disk = {
375 constants.IDISK_SIZE: size,
376 constants.IDISK_MODE: mode,
377 constants.IDISK_VG: data_vg,
378 constants.IDISK_NAME: name,
379 constants.IDISK_TYPE: disk_template,
380 }
381
382 for key in [
383 constants.IDISK_METAVG,
384 constants.IDISK_ADOPT,
385 constants.IDISK_SPINDLES,
386 ]:
387 if key in disk:
388 new_disk[key] = disk[key]
389
390
391 if (disk_template in constants.DTS_HAVE_ACCESS and
392 constants.IDISK_ACCESS in disk):
393 new_disk[constants.IDISK_ACCESS] = disk[constants.IDISK_ACCESS]
394
395
396
397 if disk_template == constants.DT_EXT:
398 new_disk[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
399 for key in disk:
400 if key not in constants.IDISK_PARAMS:
401 new_disk[key] = disk[key]
402
403 new_disks.append(new_disk)
404
405 return new_disks
406
409 """Computes the new instance's disks for the template conversion.
410
411 This method is used by the disks template conversion mechanism. Using the
412 'ComputeDisks' method as an auxiliary method computes the disks that will be
413 used for generating the new disk template of the instance. It computes the
414 size, mode, and name parameters from the instance's current disks, such as
415 the volume group and the access parameters for the templates that support
416 them. For conversions targeting an extstorage template, the mandatory
417 provider's name or any user-provided extstorage parameters will also be
418 included in the result.
419
420 @type disks: list of {objects.Disk}
421 @param disks: The current disks of the instance
422 @type disk_template: string
423 @param disk_template: The disk template of the instance
424 @type default_vg: string
425 @param default_vg: The default volume group to assume
426 @type ext_params: dict
427 @param ext_params: The extstorage parameters
428
429 @rtype: list of dictionaries
430 @return: The computed disks' information for the new template
431
432 """
433
434 for key in ext_params.keys():
435 if key != constants.IDISK_PROVIDER:
436 assert key not in constants.IDISK_PARAMS, \
437 "Invalid extstorage parameter '%s'" % key
438
439
440 inst_disks = [dict((key, value) for key, value in disk.iteritems()
441 if key in constants.IDISK_PARAMS)
442 for disk in map(objects.Disk.ToDict, disks)]
443
444
445 for disk in inst_disks:
446 disk.update(ext_params)
447
448
449 new_disks = ComputeDisks(inst_disks, disk_template, default_vg)
450
451
452 for disk, new_disk in zip(disks, new_disks):
453
454
455 if (disk.dev_type == disk_template and
456 disk_template == constants.DT_EXT):
457 provider = new_disk[constants.IDISK_PROVIDER]
458 if provider == disk.params[constants.IDISK_PROVIDER]:
459 raise errors.OpPrereqError("Not converting, '%s' of type ExtStorage"
460 " already using provider '%s'" %
461 (disk.iv_name, provider), errors.ECODE_INVAL)
462
463
464
465 if (disk_template in constants.DTS_HAVE_ACCESS and
466 constants.IDISK_ACCESS in disk.params):
467 new_disk[constants.IDISK_ACCESS] = disk.params[constants.IDISK_ACCESS]
468
469
470 if disk_template in constants.DTS_LVM:
471 if disk.dev_type == constants.DT_PLAIN:
472 new_disk[constants.IDISK_VG] = disk.logical_id[0]
473 elif disk.dev_type == constants.DT_DRBD8:
474 new_disk[constants.IDISK_VG] = disk.children[0].logical_id[0]
475
476 return new_disks
477
481 """Calculate final instance file storage dir.
482
483 @type disk_type: disk template
484 @param disk_type: L{constants.DT_FILE}, L{constants.DT_SHARED_FILE}, or
485 L{constants.DT_GLUSTER}
486
487 @type cfg: ConfigWriter
488 @param cfg: the configuration that is to be used.
489 @type file_storage_dir: path
490 @param file_storage_dir: the path below the configured base.
491 @type instance_name: string
492 @param instance_name: name of the instance this disk is for.
493
494 @rtype: string
495 @return: The file storage directory for the instance
496
497 """
498
499 instance_file_storage_dir = None
500 if disk_type in constants.DTS_FILEBASED:
501
502 joinargs = []
503
504 cfg_storage = None
505 if disk_type == constants.DT_FILE:
506 cfg_storage = cfg.GetFileStorageDir()
507 elif disk_type == constants.DT_SHARED_FILE:
508 cfg_storage = cfg.GetSharedFileStorageDir()
509 elif disk_type == constants.DT_GLUSTER:
510 cfg_storage = cfg.GetGlusterStorageDir()
511
512 if not cfg_storage:
513 raise errors.OpPrereqError(
514 "Cluster file storage dir for {tpl} storage type not defined".format(
515 tpl=repr(disk_type)
516 ),
517 errors.ECODE_STATE)
518
519 joinargs.append(cfg_storage)
520
521 if file_storage_dir is not None:
522 joinargs.append(file_storage_dir)
523
524 if disk_type != constants.DT_GLUSTER:
525 joinargs.append(instance_name)
526
527 if len(joinargs) > 1:
528
529 instance_file_storage_dir = utils.PathJoin(*joinargs)
530 else:
531 instance_file_storage_dir = joinargs[0]
532
533 return instance_file_storage_dir
534
537 """Compute disk size requirements inside the RADOS cluster.
538
539 """
540
541 pass
542
543
544 -def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
545 iv_name, forthcoming=False):
546 """Generate a drbd8 device complete with its children.
547
548 """
549 assert len(vgnames) == len(names) == 2
550 port = lu.cfg.AllocatePort()
551 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
552
553 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
554 logical_id=(vgnames[0], names[0]),
555 nodes=[primary_uuid, secondary_uuid],
556 params={}, forthcoming=forthcoming)
557 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
558 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
559 size=constants.DRBD_META_SIZE,
560 logical_id=(vgnames[1], names[1]),
561 nodes=[primary_uuid, secondary_uuid],
562 params={}, forthcoming=forthcoming)
563 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
564
565 drbd_uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
566 minors = lu.cfg.AllocateDRBDMinor([primary_uuid, secondary_uuid], drbd_uuid)
567 assert len(minors) == 2
568 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
569 logical_id=(primary_uuid, secondary_uuid, port,
570 minors[0], minors[1],
571 shared_secret),
572 children=[dev_data, dev_meta],
573 nodes=[primary_uuid, secondary_uuid],
574 iv_name=iv_name, params={},
575 forthcoming=forthcoming)
576 drbd_dev.uuid = drbd_uuid
577 return drbd_dev
578
579
580 -def GenerateDiskTemplate(
581 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
582 disk_info, file_storage_dir, file_driver, base_index,
583 feedback_fn, full_disk_params, forthcoming=False):
584 """Generate the entire disk layout for a given template type.
585
586 """
587 vgname = lu.cfg.GetVGName()
588 disk_count = len(disk_info)
589 disks = []
590
591 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
592
593 if template_name == constants.DT_DISKLESS:
594 pass
595 elif template_name == constants.DT_DRBD8:
596 if len(secondary_node_uuids) != 1:
597 raise errors.ProgrammerError("Wrong template configuration")
598 remote_node_uuid = secondary_node_uuids[0]
599
600 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
601 full_disk_params)
602 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
603
604 names = []
605 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
606 for i in range(disk_count)]):
607 names.append(lv_prefix + "_data")
608 names.append(lv_prefix + "_meta")
609 for idx, disk in enumerate(disk_info):
610 disk_index = idx + base_index
611 data_vg = disk.get(constants.IDISK_VG, vgname)
612 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
613 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
614 disk[constants.IDISK_SIZE],
615 [data_vg, meta_vg],
616 names[idx * 2:idx * 2 + 2],
617 "disk/%d" % disk_index,
618 forthcoming=forthcoming)
619 disk_dev.mode = disk[constants.IDISK_MODE]
620 disk_dev.name = disk.get(constants.IDISK_NAME, None)
621 disk_dev.dev_type = template_name
622 disks.append(disk_dev)
623 else:
624 if secondary_node_uuids:
625 raise errors.ProgrammerError("Wrong template configuration")
626
627 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
628 if name_prefix is None:
629 names = None
630 else:
631 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
632 (name_prefix, base_index + i)
633 for i in range(disk_count)])
634 disk_nodes = []
635
636 if template_name == constants.DT_PLAIN:
637
638 def logical_id_fn(idx, _, disk):
639 vg = disk.get(constants.IDISK_VG, vgname)
640 return (vg, names[idx])
641
642 disk_nodes = [primary_node_uuid]
643
644 elif template_name == constants.DT_GLUSTER:
645 logical_id_fn = lambda _1, disk_index, _2: \
646 (file_driver, "ganeti/%s.%d" % (instance_uuid,
647 disk_index))
648
649 elif template_name in constants.DTS_FILEBASED:
650 logical_id_fn = \
651 lambda _, disk_index, disk: (file_driver,
652 "%s/%s" % (file_storage_dir,
653 names[idx]))
654 if template_name == constants.DT_FILE:
655 disk_nodes = [primary_node_uuid]
656
657 elif template_name == constants.DT_BLOCK:
658 logical_id_fn = \
659 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
660 disk[constants.IDISK_ADOPT])
661 elif template_name == constants.DT_RBD:
662 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
663 elif template_name == constants.DT_EXT:
664 def logical_id_fn(idx, _, disk):
665 provider = disk.get(constants.IDISK_PROVIDER, None)
666 if provider is None:
667 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
668 " not found", constants.DT_EXT,
669 constants.IDISK_PROVIDER)
670 return (provider, names[idx])
671 else:
672 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
673
674 dev_type = template_name
675
676 for idx, disk in enumerate(disk_info):
677 params = {}
678
679 if template_name == constants.DT_EXT:
680 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
681 for key in disk:
682 if key not in constants.IDISK_PARAMS:
683 params[key] = disk[key]
684
685 if (template_name in constants.DTS_HAVE_ACCESS and
686 constants.IDISK_ACCESS in disk):
687 params[constants.IDISK_ACCESS] = disk[constants.IDISK_ACCESS]
688 disk_index = idx + base_index
689 size = disk[constants.IDISK_SIZE]
690 feedback_fn("* disk %s, size %s" %
691 (disk_index, utils.FormatUnit(size, "h")))
692 disk_dev = objects.Disk(dev_type=dev_type, size=size,
693 logical_id=logical_id_fn(idx, disk_index, disk),
694 iv_name="disk/%d" % disk_index,
695 mode=disk[constants.IDISK_MODE],
696 params=params, nodes=disk_nodes,
697 spindles=disk.get(constants.IDISK_SPINDLES),
698 forthcoming=forthcoming)
699 disk_dev.name = disk.get(constants.IDISK_NAME, None)
700 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
701 disks.append(disk_dev)
702
703 return disks
704
707 """Recursively remove the forthcoming flag
708
709 """
710 for disk in disks:
711 disk.forthcoming = False
712 CommitDisks(disk.children)
713
716 """Check the presence of the spindle options with exclusive_storage.
717
718 @type diskdict: dict
719 @param diskdict: disk parameters
720 @type es_flag: bool
721 @param es_flag: the effective value of the exlusive_storage flag
722 @type required: bool
723 @param required: whether spindles are required or just optional
724 @raise errors.OpPrereqError when spindles are given and they should not
725
726 """
727 if (not es_flag and constants.IDISK_SPINDLES in diskdict and
728 diskdict[constants.IDISK_SPINDLES] is not None):
729 raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
730 " when exclusive storage is not active",
731 errors.ECODE_INVAL)
732 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
733 diskdict[constants.IDISK_SPINDLES] is None)):
734 raise errors.OpPrereqError("You must specify spindles in instance disks"
735 " when exclusive storage is active",
736 errors.ECODE_INVAL)
737
760
763 """Recreate an instance's missing disks.
764
765 """
766 HPATH = "instance-recreate-disks"
767 HTYPE = constants.HTYPE_INSTANCE
768 REQ_BGL = False
769
770 _MODIFYABLE = compat.UniqueFrozenset([
771 constants.IDISK_SIZE,
772 constants.IDISK_MODE,
773 constants.IDISK_SPINDLES,
774 ])
775
776
777 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
778 constants.IDISK_ADOPT,
779
780
781 constants.IDISK_VG,
782 constants.IDISK_METAVG,
783 constants.IDISK_PROVIDER,
784 constants.IDISK_NAME,
785 constants.IDISK_ACCESS,
786 constants.IDISK_TYPE,
787 ]))
788
790 """Run the allocator based on input opcode.
791
792 """
793 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812 spindle_use = be_full[constants.BE_SPINDLE_USE]
813 disk_template = self.cfg.GetInstanceDiskTemplate(self.instance.uuid)
814 disks = [{
815 constants.IDISK_SIZE: d.size,
816 constants.IDISK_MODE: d.mode,
817 constants.IDISK_SPINDLES: d.spindles,
818 constants.IDISK_TYPE: d.dev_type
819 } for d in self.cfg.GetInstanceDisks(self.instance.uuid)]
820 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
821 disk_template=disk_template,
822 group_name=None,
823 tags=list(self.instance.GetTags()),
824 os=self.instance.os,
825 nics=[{}],
826 vcpus=be_full[constants.BE_VCPUS],
827 memory=be_full[constants.BE_MAXMEM],
828 spindle_use=spindle_use,
829 disks=disks,
830 hypervisor=self.instance.hypervisor,
831 node_whitelist=None)
832 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
833
834 ial.Run(self.op.iallocator)
835
836 assert req.RequiredNodes() == \
837 len(self.cfg.GetInstanceNodes(self.instance.uuid))
838
839 if not ial.success:
840 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
841 " %s" % (self.op.iallocator, ial.info),
842 errors.ECODE_NORES)
843
844 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
845 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
846 self.op.instance_name, self.op.iallocator,
847 utils.CommaJoin(self.op.nodes))
848
873
875 self._ExpandAndLockInstance()
876 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
877
878 if self.op.nodes:
879 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
880 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
881 else:
882 self.needed_locks[locking.LEVEL_NODE] = []
883 if self.op.iallocator:
884
885 self.needed_locks[locking.LEVEL_NODEGROUP] = []
886
887 self.needed_locks[locking.LEVEL_NODE_RES] = []
888
889 self.dont_collate_locks[locking.LEVEL_NODEGROUP] = True
890 self.dont_collate_locks[locking.LEVEL_NODE] = True
891 self.dont_collate_locks[locking.LEVEL_NODE_RES] = True
892
894 if level == locking.LEVEL_NODEGROUP:
895 assert self.op.iallocator is not None
896 assert not self.op.nodes
897 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
898 self.share_locks[locking.LEVEL_NODEGROUP] = 1
899
900
901
902 self.needed_locks[locking.LEVEL_NODEGROUP] = \
903 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
904
905 elif level == locking.LEVEL_NODE:
906
907
908
909
910
911 if self.op.iallocator:
912 assert not self.op.nodes
913 assert not self.needed_locks[locking.LEVEL_NODE]
914 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
915
916
917 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
918 self.needed_locks[locking.LEVEL_NODE].extend(
919 self.cfg.GetNodeGroup(group_uuid).members)
920
921 elif not self.op.nodes:
922 self._LockInstancesNodes(primary_only=False)
923 elif level == locking.LEVEL_NODE_RES:
924
925 self.needed_locks[locking.LEVEL_NODE_RES] = \
926 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
927
935
943
945 """Check prerequisites.
946
947 This checks that the instance is in the cluster and is not running.
948
949 """
950 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
951 assert instance is not None, \
952 "Cannot retrieve locked instance %s" % self.op.instance_name
953 if self.op.node_uuids:
954 inst_nodes = self.cfg.GetInstanceNodes(instance.uuid)
955 if len(self.op.node_uuids) != len(inst_nodes):
956 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
957 " %d replacement nodes were specified" %
958 (instance.name, len(inst_nodes),
959 len(self.op.node_uuids)),
960 errors.ECODE_INVAL)
961 disks = self.cfg.GetInstanceDisks(instance.uuid)
962 assert (not utils.AnyDiskOfType(disks, [constants.DT_DRBD8]) or
963 len(self.op.node_uuids) == 2)
964 assert (not utils.AnyDiskOfType(disks, [constants.DT_PLAIN]) or
965 len(self.op.node_uuids) == 1)
966 primary_node = self.op.node_uuids[0]
967 else:
968 primary_node = instance.primary_node
969 if not self.op.iallocator:
970 CheckNodeOnline(self, primary_node)
971
972 if not instance.disks:
973 raise errors.OpPrereqError("Instance '%s' has no disks" %
974 self.op.instance_name, errors.ECODE_INVAL)
975
976
977 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
978 if owned_groups:
979
980
981 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
982 primary_only=True)
983
984
985
986 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
987 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
988 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
989 msg="cannot recreate disks")
990
991 if self.op.disks:
992 self.disks = dict(self.op.disks)
993 else:
994 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
995
996 maxidx = max(self.disks.keys())
997 if maxidx >= len(instance.disks):
998 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
999 errors.ECODE_INVAL)
1000
1001 if ((self.op.node_uuids or self.op.iallocator) and
1002 sorted(self.disks.keys()) != range(len(instance.disks))):
1003 raise errors.OpPrereqError("Can't recreate disks partially and"
1004 " change the nodes at the same time",
1005 errors.ECODE_INVAL)
1006
1007 self.instance = instance
1008
1009 if self.op.iallocator:
1010 self._RunAllocator()
1011
1012 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
1013 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
1014
1015 if self.op.node_uuids:
1016 node_uuids = self.op.node_uuids
1017 else:
1018 node_uuids = self.cfg.GetInstanceNodes(instance.uuid)
1019 excl_stor = compat.any(
1020 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
1021 )
1022 for new_params in self.disks.values():
1023 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
1024
1025 - def Exec(self, feedback_fn):
1026 """Recreate the disks.
1027
1028 """
1029 assert (self.owned_locks(locking.LEVEL_NODE) ==
1030 self.owned_locks(locking.LEVEL_NODE_RES))
1031
1032 to_skip = []
1033 mods = []
1034
1035 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
1036 for idx, disk in enumerate(inst_disks):
1037 try:
1038 changes = self.disks[idx]
1039 except KeyError:
1040
1041 to_skip.append(idx)
1042 continue
1043
1044
1045 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
1046
1047 assert len(self.op.node_uuids) == 2
1048 assert len(disk.logical_id) == 6
1049
1050 (_, _, old_port, _, _, old_secret) = disk.logical_id
1051 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
1052 disk.uuid)
1053 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
1054 new_minors[0], new_minors[1], old_secret)
1055 assert len(disk.logical_id) == len(new_id)
1056 else:
1057 new_id = None
1058
1059 mods.append((idx, new_id, changes))
1060
1061
1062
1063 for idx, new_id, changes in mods:
1064 disk = inst_disks[idx]
1065 if new_id is not None:
1066 assert disk.dev_type == constants.DT_DRBD8
1067 disk.logical_id = new_id
1068 if changes:
1069 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
1070 mode=changes.get(constants.IDISK_MODE, None),
1071 spindles=changes.get(constants.IDISK_SPINDLES, None))
1072 self.cfg.Update(disk, feedback_fn)
1073
1074
1075 if self.op.node_uuids:
1076 self.LogWarning("Changing the instance's nodes, you will have to"
1077 " remove any disks left on the older nodes manually")
1078 self.instance.primary_node = self.op.node_uuids[0]
1079 self.cfg.Update(self.instance, feedback_fn)
1080 for disk in inst_disks:
1081 self.cfg.SetDiskNodes(disk.uuid, self.op.node_uuids)
1082
1083
1084 mylocks = self.owned_locks(locking.LEVEL_NODE)
1085 inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid)
1086 assert mylocks.issuperset(frozenset(inst_nodes))
1087 new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
1088
1089
1090 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
1091 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
1092 wipedisks = [(idx, disk, 0)
1093 for (idx, disk) in enumerate(inst_disks)
1094 if idx not in to_skip]
1095 WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
1096 cleanup=new_disks)
1097
1118
1121 """Checks the vg capacity for a given node.
1122
1123 @type node_info: tuple (_, list of dicts, _)
1124 @param node_info: the result of the node info call for one node
1125 @type node_name: string
1126 @param node_name: the name of the node
1127 @type vg: string
1128 @param vg: volume group name
1129 @type requested: int
1130 @param requested: the amount of disk in MiB to check for
1131 @raise errors.OpPrereqError: if the node doesn't have enough disk,
1132 or we cannot check the node
1133
1134 """
1135 (_, space_info, _) = node_info
1136 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
1137 space_info, constants.ST_LVM_VG)
1138 if not lvm_vg_info:
1139 raise errors.OpPrereqError("Can't retrieve storage information for LVM",
1140 errors.ECODE_ENVIRON)
1141 vg_free = lvm_vg_info.get("storage_free", None)
1142 if not isinstance(vg_free, int):
1143 raise errors.OpPrereqError("Can't compute free disk space on node"
1144 " %s for vg %s, result was '%s'" %
1145 (node_name, vg, vg_free), errors.ECODE_ENVIRON)
1146 if requested > vg_free:
1147 raise errors.OpPrereqError("Not enough disk space on target node %s"
1148 " vg %s: required %d MiB, available %d MiB" %
1149 (node_name, vg, requested, vg_free),
1150 errors.ECODE_NORES)
1151
1154 """Checks if nodes have enough free disk space in the specified VG.
1155
1156 This function checks if all given nodes have the needed amount of
1157 free disk. In case any node has less disk or we cannot get the
1158 information from the node, this function raises an OpPrereqError
1159 exception.
1160
1161 @type lu: C{LogicalUnit}
1162 @param lu: a logical unit from which we get configuration data
1163 @type node_uuids: C{list}
1164 @param node_uuids: the list of node UUIDs to check
1165 @type vg: C{str}
1166 @param vg: the volume group to check
1167 @type requested: C{int}
1168 @param requested: the amount of disk in MiB to check for
1169 @raise errors.OpPrereqError: if the node doesn't have enough disk,
1170 or we cannot check the node
1171
1172 """
1173 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
1174 for node_uuid in node_uuids:
1175 node_name = lu.cfg.GetNodeName(node_uuid)
1176 info = nodeinfo[node_uuid]
1177 info.Raise("Cannot get current information from node %s" % node_name,
1178 prereq=True, ecode=errors.ECODE_ENVIRON)
1179 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
1180
1183 """Checks if nodes have enough free disk space in all the VGs.
1184
1185 This function checks if all given nodes have the needed amount of
1186 free disk. In case any node has less disk or we cannot get the
1187 information from the node, this function raises an OpPrereqError
1188 exception.
1189
1190 @type lu: C{LogicalUnit}
1191 @param lu: a logical unit from which we get configuration data
1192 @type node_uuids: C{list}
1193 @param node_uuids: the list of node UUIDs to check
1194 @type req_sizes: C{dict}
1195 @param req_sizes: the hash of vg and corresponding amount of disk in
1196 MiB to check for
1197 @raise errors.OpPrereqError: if the node doesn't have enough disk,
1198 or we cannot check the node
1199
1200 """
1201 for vg, req_size in req_sizes.items():
1202 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
1203
1206 """Converts a disk size in bytes to mebibytes.
1207
1208 Warns and rounds up if the size isn't an even multiple of 1 MiB.
1209
1210 """
1211 (mib, remainder) = divmod(size, 1024 * 1024)
1212
1213 if remainder != 0:
1214 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
1215 " to not overwrite existing data (%s bytes will not be"
1216 " wiped)", (1024 * 1024) - remainder)
1217 mib += 1
1218
1219 return mib
1220
1221
1222 -def _CalcEta(time_taken, written, total_size):
1223 """Calculates the ETA based on size written and total size.
1224
1225 @param time_taken: The time taken so far
1226 @param written: amount written so far
1227 @param total_size: The total size of data to be written
1228 @return: The remaining time in seconds
1229
1230 """
1231 avg_time = time_taken / float(written)
1232 return (total_size - written) * avg_time
1233
1234
1235 -def WipeDisks(lu, instance, disks=None):
1236 """Wipes instance disks.
1237
1238 @type lu: L{LogicalUnit}
1239 @param lu: the logical unit on whose behalf we execute
1240 @type instance: L{objects.Instance}
1241 @param instance: the instance whose disks we should create
1242 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1243 @param disks: Disk details; tuple contains disk index, disk object and the
1244 start offset
1245
1246 """
1247 node_uuid = instance.primary_node
1248 node_name = lu.cfg.GetNodeName(node_uuid)
1249
1250 if disks is None:
1251 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1252 disks = [(idx, disk, 0)
1253 for (idx, disk) in enumerate(inst_disks)]
1254
1255 logging.info("Pausing synchronization of disks of instance '%s'",
1256 instance.name)
1257 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1258 (map(compat.snd, disks),
1259 instance),
1260 True)
1261 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1262
1263 for idx, success in enumerate(result.payload):
1264 if not success:
1265 logging.warn("Pausing synchronization of disk %s of instance '%s'"
1266 " failed", idx, instance.name)
1267
1268 try:
1269 for (idx, device, offset) in disks:
1270
1271
1272 wipe_chunk_size = \
1273 int(min(constants.MAX_WIPE_CHUNK,
1274 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1275
1276 size = device.size
1277 last_output = 0
1278 start_time = time.time()
1279
1280 if offset == 0:
1281 info_text = ""
1282 else:
1283 info_text = (" (from %s to %s)" %
1284 (utils.FormatUnit(offset, "h"),
1285 utils.FormatUnit(size, "h")))
1286
1287 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1288
1289 logging.info("Wiping disk %d for instance %s on node %s using"
1290 " chunk size %s", idx, instance.name, node_name,
1291 wipe_chunk_size)
1292
1293 while offset < size:
1294 wipe_size = min(wipe_chunk_size, size - offset)
1295
1296 logging.debug("Wiping disk %d, offset %s, chunk %s",
1297 idx, offset, wipe_size)
1298
1299 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1300 offset, wipe_size)
1301 result.Raise("Could not wipe disk %d at offset %d for size %d" %
1302 (idx, offset, wipe_size))
1303
1304 now = time.time()
1305 offset += wipe_size
1306 if now - last_output >= 60:
1307 eta = _CalcEta(now - start_time, offset, size)
1308 lu.LogInfo(" - done: %.1f%% ETA: %s",
1309 offset / float(size) * 100, utils.FormatSeconds(eta))
1310 last_output = now
1311 finally:
1312 logging.info("Resuming synchronization of disks for instance '%s'",
1313 instance.name)
1314
1315 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1316 (map(compat.snd, disks),
1317 instance),
1318 False)
1319
1320 if result.fail_msg:
1321 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1322 node_name, result.fail_msg)
1323 else:
1324 for idx, success in enumerate(result.payload):
1325 if not success:
1326 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1327 " failed", idx, instance.name)
1328
1329
1330 -def ImageDisks(lu, instance, image, disks=None):
1331 """Dumps an image onto an instance disk.
1332
1333 @type lu: L{LogicalUnit}
1334 @param lu: the logical unit on whose behalf we execute
1335 @type instance: L{objects.Instance}
1336 @param instance: the instance whose disks we should create
1337 @type image: string
1338 @param image: the image whose disks we should create
1339 @type disks: None or list of ints
1340 @param disks: disk indices
1341
1342 """
1343 node_uuid = instance.primary_node
1344 node_name = lu.cfg.GetNodeName(node_uuid)
1345
1346 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1347 if disks is None:
1348 disks = [(0, inst_disks[0])]
1349 else:
1350 disks = map(lambda idx: (idx, inst_disks[idx]), disks)
1351
1352 logging.info("Pausing synchronization of disks of instance '%s'",
1353 instance.name)
1354 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1355 (map(compat.snd, disks),
1356 instance),
1357 True)
1358 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1359
1360 for idx, success in enumerate(result.payload):
1361 if not success:
1362 logging.warn("Pausing synchronization of disk %s of instance '%s'"
1363 " failed", idx, instance.name)
1364
1365 try:
1366 for (idx, device) in disks:
1367 lu.LogInfo("Imaging disk '%d' for instance '%s' on node '%s'",
1368 idx, instance.name, node_name)
1369
1370 result = lu.rpc.call_blockdev_image(node_uuid, (device, instance),
1371 image, device.size)
1372 result.Raise("Could not image disk '%d' for instance '%s' on node '%s'" %
1373 (idx, instance.name, node_name))
1374 finally:
1375 logging.info("Resuming synchronization of disks for instance '%s'",
1376 instance.name)
1377
1378 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1379 (map(compat.snd, disks),
1380 instance),
1381 False)
1382
1383 if result.fail_msg:
1384 lu.LogWarning("Failed to resume disk synchronization for instance '%s' on"
1385 " node '%s'", node_name, result.fail_msg)
1386 else:
1387 for idx, success in enumerate(result.payload):
1388 if not success:
1389 lu.LogWarning("Failed to resume synchronization of disk '%d' of"
1390 " instance '%s'", idx, instance.name)
1391
1394 """Wrapper for L{WipeDisks} that handles errors.
1395
1396 @type lu: L{LogicalUnit}
1397 @param lu: the logical unit on whose behalf we execute
1398 @type instance: L{objects.Instance}
1399 @param instance: the instance whose disks we should wipe
1400 @param disks: see L{WipeDisks}
1401 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1402 case of error
1403 @raise errors.OpPrereqError: in case of failure
1404
1405 """
1406 try:
1407 WipeDisks(lu, instance, disks=disks)
1408 except errors.OpExecError:
1409 logging.warning("Wiping disks for instance '%s' failed",
1410 instance.name)
1411 _UndoCreateDisks(lu, cleanup, instance)
1412 raise
1413
1416 """Return the instance disks selected by the disks list
1417
1418 @type disks: list of L{objects.Disk} or None
1419 @param disks: selected disks
1420 @rtype: list of L{objects.Disk}
1421 @return: selected instance disks to act on
1422
1423 """
1424 if disks is None:
1425 return instance_disks
1426 else:
1427 inst_disks_uuids = [d.uuid for d in instance_disks]
1428 disks_uuids = [d.uuid for d in disks]
1429 if not set(disks_uuids).issubset(inst_disks_uuids):
1430 raise errors.ProgrammerError("Can only act on disks belonging to the"
1431 " target instance: expected a subset of %s,"
1432 " got %s" % (inst_disks_uuids, disks_uuids))
1433 return disks
1434
1435
1436 -def WaitForSync(lu, instance, disks=None, oneshot=False):
1437 """Sleep and poll for an instance's disk to sync.
1438
1439 """
1440 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1441 if not inst_disks or disks is not None and not disks:
1442 return True
1443
1444 disks = [d for d in ExpandCheckDisks(inst_disks, disks)
1445 if d.dev_type in constants.DTS_INT_MIRROR]
1446
1447 if not oneshot:
1448 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1449
1450 node_uuid = instance.primary_node
1451 node_name = lu.cfg.GetNodeName(node_uuid)
1452
1453
1454
1455 retries = 0
1456 degr_retries = 10
1457 while True:
1458 max_time = 0
1459 done = True
1460 cumul_degraded = False
1461 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1462 msg = rstats.fail_msg
1463 if msg:
1464 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1465 retries += 1
1466 if retries >= 10:
1467 raise errors.RemoteError("Can't contact node %s for mirror data,"
1468 " aborting." % node_name)
1469 time.sleep(6)
1470 continue
1471 rstats = rstats.payload
1472 retries = 0
1473 for i, mstat in enumerate(rstats):
1474 if mstat is None:
1475 lu.LogWarning("Can't compute data for node %s/%s",
1476 node_name, disks[i].iv_name)
1477 continue
1478
1479 cumul_degraded = (cumul_degraded or
1480 (mstat.is_degraded and mstat.sync_percent is None))
1481 if mstat.sync_percent is not None:
1482 done = False
1483 if mstat.estimated_time is not None:
1484 rem_time = ("%s remaining (estimated)" %
1485 utils.FormatSeconds(mstat.estimated_time))
1486 max_time = mstat.estimated_time
1487 else:
1488 rem_time = "no time estimate"
1489 max_time = 5
1490 lu.LogInfo("- device %s: %5.2f%% done, %s",
1491 disks[i].iv_name, mstat.sync_percent, rem_time)
1492
1493
1494
1495
1496 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1497 logging.info("Degraded disks found, %d retries left", degr_retries)
1498 degr_retries -= 1
1499 time.sleep(1)
1500 continue
1501
1502 if done or oneshot:
1503 break
1504
1505 time.sleep(min(60, max_time))
1506
1507 if done:
1508 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1509
1510 return not cumul_degraded
1511
1514 """Shutdown block devices of an instance.
1515
1516 This does the shutdown on all nodes of the instance.
1517
1518 If the ignore_primary is false, errors on the primary node are
1519 ignored.
1520
1521 Modifies the configuration of the instance, so the caller should re-read the
1522 instance configuration, if needed.
1523
1524 """
1525 all_result = True
1526
1527 if disks is None:
1528
1529 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1530 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1531 disks = ExpandCheckDisks(inst_disks, disks)
1532
1533 for disk in disks:
1534 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1535 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1536 msg = result.fail_msg
1537 if msg:
1538 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1539 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1540 if ((node_uuid == instance.primary_node and not ignore_primary) or
1541 (node_uuid != instance.primary_node and not result.offline)):
1542 all_result = False
1543 return all_result
1544
1547 """Shutdown block devices of an instance.
1548
1549 This function checks if an instance is running, before calling
1550 _ShutdownInstanceDisks.
1551
1552 """
1553 if req_states is None:
1554 req_states = INSTANCE_DOWN
1555 CheckInstanceState(lu, instance, req_states, msg="cannot shutdown disks")
1556 ShutdownInstanceDisks(lu, instance, disks=disks)
1557
1558
1559 -def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1560 ignore_size=False):
1561 """Prepare the block devices for an instance.
1562
1563 This sets up the block devices on all nodes.
1564
1565 Modifies the configuration of the instance, so the caller should re-read the
1566 instance configuration, if needed.
1567
1568 @type lu: L{LogicalUnit}
1569 @param lu: the logical unit on whose behalf we execute
1570 @type instance: L{objects.Instance}
1571 @param instance: the instance for whose disks we assemble
1572 @type disks: list of L{objects.Disk} or None
1573 @param disks: which disks to assemble (or all, if None)
1574 @type ignore_secondaries: boolean
1575 @param ignore_secondaries: if true, errors on secondary nodes
1576 won't result in an error return from the function
1577 @type ignore_size: boolean
1578 @param ignore_size: if true, the current known size of the disk
1579 will not be used during the disk activation, useful for cases
1580 when the size is wrong
1581 @return: False if the operation failed, otherwise a list of
1582 (host, instance_visible_name, node_visible_name)
1583 with the mapping from node devices to instance devices, as well as the
1584 payloads of the RPC calls
1585
1586 """
1587 device_info = []
1588 disks_ok = True
1589 payloads = []
1590
1591 if disks is None:
1592
1593 instance = lu.cfg.MarkInstanceDisksActive(instance.uuid)
1594
1595 inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1596 disks = ExpandCheckDisks(inst_disks, disks)
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608 for idx, inst_disk in enumerate(disks):
1609 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1610 instance.primary_node):
1611 if ignore_size:
1612 node_disk = node_disk.Copy()
1613 node_disk.UnsetSize()
1614 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1615 instance, False, idx)
1616 msg = result.fail_msg
1617 if msg:
1618 secondary_nodes = lu.cfg.GetInstanceSecondaryNodes(instance.uuid)
1619 is_offline_secondary = (node_uuid in secondary_nodes and
1620 result.offline)
1621 lu.LogWarning("Could not prepare block device %s on node %s"
1622 " (is_primary=False, pass=1): %s",
1623 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1624 if not (ignore_secondaries or is_offline_secondary):
1625 disks_ok = False
1626
1627
1628
1629
1630 for idx, inst_disk in enumerate(disks):
1631 dev_path = None
1632
1633 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1634 instance.primary_node):
1635 if node_uuid != instance.primary_node:
1636 continue
1637 if ignore_size:
1638 node_disk = node_disk.Copy()
1639 node_disk.UnsetSize()
1640 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1641 instance, True, idx)
1642 payloads.append(result.payload)
1643 msg = result.fail_msg
1644 if msg:
1645 lu.LogWarning("Could not prepare block device %s on node %s"
1646 " (is_primary=True, pass=2): %s",
1647 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1648 disks_ok = False
1649 else:
1650 dev_path, _, __ = result.payload
1651
1652 device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1653 inst_disk.iv_name, dev_path))
1654
1655 if not disks_ok:
1656 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1657
1658 return disks_ok, device_info, payloads
1659
1662 """Start the disks of an instance.
1663
1664 Modifies the configuration of the instance, so the caller should re-read the
1665 instance configuration, if needed.
1666
1667 """
1668 disks_ok, _, _ = AssembleInstanceDisks(lu, instance,
1669 ignore_secondaries=force)
1670 if not disks_ok:
1671 ShutdownInstanceDisks(lu, instance)
1672 if force is not None and not force:
1673 lu.LogWarning("",
1674 hint=("If the message above refers to a secondary node,"
1675 " you can retry the operation using '--force'"))
1676 raise errors.OpExecError("Disk consistency error")
1677
1680 """Grow a disk of an instance.
1681
1682 """
1683 HPATH = "disk-grow"
1684 HTYPE = constants.HTYPE_INSTANCE
1685 REQ_BGL = False
1686
1695
1703
1705 """Build hooks env.
1706
1707 This runs on the master, the primary and all the secondaries.
1708
1709 """
1710 env = {
1711 "DISK": self.op.disk,
1712 "AMOUNT": self.op.amount,
1713 "ABSOLUTE": self.op.absolute,
1714 }
1715 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1716 return env
1717
1725
1727 """Check prerequisites.
1728
1729 This checks that the instance is in the cluster.
1730
1731 """
1732 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1733 assert self.instance is not None, \
1734 "Cannot retrieve locked instance %s" % self.op.instance_name
1735 node_uuids = list(self.cfg.GetInstanceNodes(self.instance.uuid))
1736 for node_uuid in node_uuids:
1737 CheckNodeOnline(self, node_uuid)
1738 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1739
1740 self.disk = self.cfg.GetDiskInfo(self.instance.FindDisk(self.op.disk))
1741
1742 if self.disk.dev_type not in constants.DTS_GROWABLE:
1743 raise errors.OpPrereqError(
1744 "Instance's disk layout %s does not support"
1745 " growing" % self.disk.dev_type, errors.ECODE_INVAL)
1746
1747 if self.op.absolute:
1748 self.target = self.op.amount
1749 self.delta = self.target - self.disk.size
1750 if self.delta < 0:
1751 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1752 "current disk size (%s)" %
1753 (utils.FormatUnit(self.target, "h"),
1754 utils.FormatUnit(self.disk.size, "h")),
1755 errors.ECODE_STATE)
1756 else:
1757 self.delta = self.op.amount
1758 self.target = self.disk.size + self.delta
1759 if self.delta < 0:
1760 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1761 utils.FormatUnit(self.delta, "h"),
1762 errors.ECODE_INVAL)
1763
1764 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1765
1766 self._CheckIPolicy(self.target)
1767
1778
1801
1802 - def Exec(self, feedback_fn):
1803 """Execute disk grow.
1804
1805 """
1806 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1807 assert (self.owned_locks(locking.LEVEL_NODE) ==
1808 self.owned_locks(locking.LEVEL_NODE_RES))
1809
1810 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1811
1812 disks_ok, _, _ = AssembleInstanceDisks(self, self.instance,
1813 disks=[self.disk])
1814 if not disks_ok:
1815 raise errors.OpExecError("Cannot activate block device to grow")
1816
1817 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1818 (self.op.disk, self.instance.name,
1819 utils.FormatUnit(self.delta, "h"),
1820 utils.FormatUnit(self.target, "h")))
1821
1822
1823 inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid)
1824 for node_uuid in inst_nodes:
1825 result = self.rpc.call_blockdev_grow(node_uuid,
1826 (self.disk, self.instance),
1827 self.delta, True, True,
1828 self.node_es_flags[node_uuid])
1829 result.Raise("Dry-run grow request failed to node %s" %
1830 self.cfg.GetNodeName(node_uuid))
1831
1832 if wipe_disks:
1833
1834 result = self.rpc.call_blockdev_getdimensions(
1835 self.instance.primary_node, [([self.disk], self.instance)])
1836 result.Raise("Failed to retrieve disk size from node '%s'" %
1837 self.instance.primary_node)
1838
1839 (disk_dimensions, ) = result.payload
1840
1841 if disk_dimensions is None:
1842 raise errors.OpExecError("Failed to retrieve disk size from primary"
1843 " node '%s'" % self.instance.primary_node)
1844 (disk_size_in_bytes, _) = disk_dimensions
1845
1846 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1847
1848 assert old_disk_size >= self.disk.size, \
1849 ("Retrieved disk size too small (got %s, should be at least %s)" %
1850 (old_disk_size, self.disk.size))
1851 else:
1852 old_disk_size = None
1853
1854
1855
1856 for node_uuid in inst_nodes:
1857 result = self.rpc.call_blockdev_grow(node_uuid,
1858 (self.disk, self.instance),
1859 self.delta, False, True,
1860 self.node_es_flags[node_uuid])
1861 result.Raise("Grow request failed to node %s" %
1862 self.cfg.GetNodeName(node_uuid))
1863
1864
1865 node_uuid = self.instance.primary_node
1866 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1867 self.delta, False, False,
1868 self.node_es_flags[node_uuid])
1869 result.Raise("Grow request failed to node %s" %
1870 self.cfg.GetNodeName(node_uuid))
1871
1872 self.disk.RecordGrow(self.delta)
1873 self.cfg.Update(self.instance, feedback_fn)
1874 self.cfg.Update(self.disk, feedback_fn)
1875
1876
1877 ReleaseLocks(self, locking.LEVEL_NODE)
1878
1879
1880 self.WConfdClient().DownGradeLocksLevel(
1881 locking.LEVEL_NAMES[locking.LEVEL_INSTANCE])
1882
1883 assert wipe_disks ^ (old_disk_size is None)
1884
1885 if wipe_disks:
1886 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
1887 assert inst_disks[self.op.disk] == self.disk
1888
1889
1890 WipeDisks(self, self.instance,
1891 disks=[(self.op.disk, self.disk, old_disk_size)])
1892
1893 if self.op.wait_for_sync:
1894 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1895 if disk_abort:
1896 self.LogWarning("Disk syncing has not returned a good status; check"
1897 " the instance")
1898 if not self.instance.disks_active:
1899 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1900 elif not self.instance.disks_active:
1901 self.LogWarning("Not shutting down the disk even if the instance is"
1902 " not supposed to be running because no wait for"
1903 " sync mode was requested")
1904
1905 assert self.owned_locks(locking.LEVEL_NODE_RES)
1906 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1907
1910 """Replace the disks of an instance.
1911
1912 """
1913 HPATH = "mirrors-replace"
1914 HTYPE = constants.HTYPE_INSTANCE
1915 REQ_BGL = False
1916
1934
1936 self._ExpandAndLockInstance(allow_forthcoming=True)
1937
1938 assert locking.LEVEL_NODE not in self.needed_locks
1939 assert locking.LEVEL_NODE_RES not in self.needed_locks
1940 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1941
1942 assert self.op.iallocator is None or self.op.remote_node is None, \
1943 "Conflicting options"
1944
1945 if self.op.remote_node is not None:
1946 (self.op.remote_node_uuid, self.op.remote_node) = \
1947 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1948 self.op.remote_node)
1949
1950
1951
1952
1953
1954 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1955 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1956 else:
1957 self.needed_locks[locking.LEVEL_NODE] = []
1958 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1959
1960 if self.op.iallocator is not None:
1961
1962 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1963
1964 self.needed_locks[locking.LEVEL_NODE_RES] = []
1965
1966 self.dont_collate_locks[locking.LEVEL_NODEGROUP] = True
1967 self.dont_collate_locks[locking.LEVEL_NODE] = True
1968 self.dont_collate_locks[locking.LEVEL_NODE_RES] = True
1969
1970 self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1971 self.op.instance_name, self.op.mode,
1972 self.op.iallocator, self.op.remote_node_uuid,
1973 self.op.disks, self.op.early_release,
1974 self.op.ignore_ipolicy)
1975
1976 self.tasklets = [self.replacer]
1977
1979 if level == locking.LEVEL_NODEGROUP:
1980 assert self.op.remote_node_uuid is None
1981 assert self.op.iallocator is not None
1982 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1983
1984 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1985
1986
1987 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1988 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1989
1990 elif level == locking.LEVEL_NODE:
1991 if self.op.iallocator is not None:
1992 assert self.op.remote_node_uuid is None
1993 assert not self.needed_locks[locking.LEVEL_NODE]
1994
1995
1996 self.needed_locks[locking.LEVEL_NODE] = \
1997 [node_uuid
1998 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1999 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
2000 else:
2001 self._LockInstancesNodes()
2002
2003 elif level == locking.LEVEL_NODE_RES:
2004
2005 self.needed_locks[locking.LEVEL_NODE_RES] = \
2006 self.needed_locks[locking.LEVEL_NODE]
2007
2023
2025 """Build hooks nodes.
2026
2027 """
2028 instance = self.replacer.instance
2029 nl = [
2030 self.cfg.GetMasterNode(),
2031 instance.primary_node,
2032 ]
2033 if self.op.remote_node_uuid is not None:
2034 nl.append(self.op.remote_node_uuid)
2035 return nl, nl
2036
2047
2050 """Bring up an instance's disks.
2051
2052 """
2053 REQ_BGL = False
2054
2059
2063
2065 """Check prerequisites.
2066
2067 This checks that the instance is in the cluster.
2068
2069 """
2070 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
2071 assert self.instance is not None, \
2072 "Cannot retrieve locked instance %s" % self.op.instance_name
2073 CheckNodeOnline(self, self.instance.primary_node)
2074
2075 - def Exec(self, feedback_fn):
2091
2094 """Shutdown an instance's disks.
2095
2096 """
2097 REQ_BGL = False
2098
2103
2107
2109 """Check prerequisites.
2110
2111 This checks that the instance is in the cluster.
2112
2113 """
2114 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
2115 assert self.instance is not None, \
2116 "Cannot retrieve locked instance %s" % self.op.instance_name
2117
2118 - def Exec(self, feedback_fn):
2126
2130 """Check that mirrors are not degraded.
2131
2132 @attention: The device has to be annotated already.
2133
2134 The ldisk parameter, if True, will change the test from the
2135 is_degraded attribute (which represents overall non-ok status for
2136 the device(s)) to the ldisk (representing the local storage status).
2137
2138 """
2139 result = True
2140
2141 if on_primary or dev.AssembleOnSecondary():
2142 rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
2143 msg = rstats.fail_msg
2144 if msg:
2145 lu.LogWarning("Can't find disk on node %s: %s",
2146 lu.cfg.GetNodeName(node_uuid), msg)
2147 result = False
2148 elif not rstats.payload:
2149 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
2150 result = False
2151 else:
2152 if ldisk:
2153 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2154 else:
2155 result = result and not rstats.payload.is_degraded
2156
2157 if dev.children:
2158 for child in dev.children:
2159 result = result and _CheckDiskConsistencyInner(lu, instance, child,
2160 node_uuid, on_primary)
2161
2162 return result
2163
2172
2175 """Wrapper around call_blockdev_find to annotate diskparams.
2176
2177 @param lu: A reference to the lu object
2178 @param node_uuid: The node to call out
2179 @param dev: The device to find
2180 @param instance: The instance object the device belongs to
2181 @returns The result of the rpc call
2182
2183 """
2184 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
2185 return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
2186
2189 """Generate a suitable LV name.
2190
2191 This will generate a logical volume name for the given instance.
2192
2193 """
2194 results = []
2195 for val in exts:
2196 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
2197 results.append("%s%s" % (new_id, val))
2198 return results
2199
2202 """Replaces disks for an instance.
2203
2204 Note: Locking is not within the scope of this class.
2205
2206 """
2207 - def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
2208 remote_node_uuid, disks, early_release, ignore_ipolicy):
2209 """Initializes this class.
2210
2211 """
2212 Tasklet.__init__(self, lu)
2213
2214
2215 self.instance_uuid = instance_uuid
2216 self.instance_name = instance_name
2217 self.mode = mode
2218 self.iallocator_name = iallocator_name
2219 self.remote_node_uuid = remote_node_uuid
2220 self.disks = disks
2221 self.early_release = early_release
2222 self.ignore_ipolicy = ignore_ipolicy
2223
2224
2225 self.instance = None
2226 self.new_node_uuid = None
2227 self.target_node_uuid = None
2228 self.other_node_uuid = None
2229 self.remote_node_info = None
2230 self.node_secondary_ip = None
2231
2232 @staticmethod
2233 - def _RunAllocator(lu, iallocator_name, instance_uuid,
2234 relocate_from_node_uuids):
2235 """Compute a new secondary node using an IAllocator.
2236
2237 """
2238 req = iallocator.IAReqRelocate(
2239 inst_uuid=instance_uuid,
2240 relocate_from_node_uuids=list(relocate_from_node_uuids))
2241 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
2242
2243 ial.Run(iallocator_name)
2244
2245 if not ial.success:
2246 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
2247 " %s" % (iallocator_name, ial.info),
2248 errors.ECODE_NORES)
2249
2250 remote_node_name = ial.result[0]
2251 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
2252
2253 if remote_node is None:
2254 raise errors.OpPrereqError("Node %s not found in configuration" %
2255 remote_node_name, errors.ECODE_NOENT)
2256
2257 lu.LogInfo("Selected new secondary for instance '%s': %s",
2258 instance_uuid, remote_node_name)
2259
2260 return remote_node.uuid
2261
2268
2270 """Checks if the instance disks are activated.
2271
2272 @param instance: The instance to check disks
2273 @return: True if they are activated, False otherwise
2274
2275 """
2276 node_uuids = self.cfg.GetInstanceNodes(instance.uuid)
2277
2278 for idx, dev in enumerate(self.cfg.GetInstanceDisks(instance.uuid)):
2279 for node_uuid in node_uuids:
2280 self.lu.LogInfo("Checking disk/%d on %s", idx,
2281 self.cfg.GetNodeName(node_uuid))
2282
2283 result = _BlockdevFind(self, node_uuid, dev, instance)
2284
2285 if result.offline:
2286 continue
2287 elif result.fail_msg or not result.payload:
2288 return False
2289
2290 return True
2291
2293 """Check prerequisites.
2294
2295 This checks that the instance is in the cluster.
2296
2297 """
2298 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
2299 assert self.instance is not None, \
2300 "Cannot retrieve locked instance %s" % self.instance_name
2301
2302 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid)
2303 if len(secondary_nodes) != 1:
2304 raise errors.OpPrereqError("The instance has a strange layout,"
2305 " expected one secondary but found %d" %
2306 len(secondary_nodes),
2307 errors.ECODE_FAULT)
2308
2309 secondary_node_uuid = secondary_nodes[0]
2310
2311 if self.iallocator_name is None:
2312 remote_node_uuid = self.remote_node_uuid
2313 else:
2314 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
2315 self.instance.uuid,
2316 secondary_nodes)
2317
2318 if remote_node_uuid is None:
2319 self.remote_node_info = None
2320 else:
2321 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
2322 "Remote node '%s' is not locked" % remote_node_uuid
2323
2324 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
2325 assert self.remote_node_info is not None, \
2326 "Cannot retrieve locked node %s" % remote_node_uuid
2327
2328 if remote_node_uuid == self.instance.primary_node:
2329 raise errors.OpPrereqError("The specified node is the primary node of"
2330 " the instance", errors.ECODE_INVAL)
2331
2332 if remote_node_uuid == secondary_node_uuid:
2333 raise errors.OpPrereqError("The specified node is already the"
2334 " secondary node of the instance",
2335 errors.ECODE_INVAL)
2336
2337 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2338 constants.REPLACE_DISK_CHG):
2339 raise errors.OpPrereqError("Cannot specify disks to be replaced",
2340 errors.ECODE_INVAL)
2341
2342 if self.mode == constants.REPLACE_DISK_AUTO:
2343 if not self._CheckDisksActivated(self.instance):
2344 raise errors.OpPrereqError("Please run activate-disks on instance %s"
2345 " first" % self.instance_name,
2346 errors.ECODE_STATE)
2347 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2348 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2349
2350 if faulty_primary and faulty_secondary:
2351 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2352 " one node and can not be repaired"
2353 " automatically" % self.instance_name,
2354 errors.ECODE_STATE)
2355
2356 if faulty_primary:
2357 self.disks = faulty_primary
2358 self.target_node_uuid = self.instance.primary_node
2359 self.other_node_uuid = secondary_node_uuid
2360 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2361 elif faulty_secondary:
2362 self.disks = faulty_secondary
2363 self.target_node_uuid = secondary_node_uuid
2364 self.other_node_uuid = self.instance.primary_node
2365 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2366 else:
2367 self.disks = []
2368 check_nodes = []
2369
2370 else:
2371
2372 if self.mode == constants.REPLACE_DISK_PRI:
2373 self.target_node_uuid = self.instance.primary_node
2374 self.other_node_uuid = secondary_node_uuid
2375 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2376
2377 elif self.mode == constants.REPLACE_DISK_SEC:
2378 self.target_node_uuid = secondary_node_uuid
2379 self.other_node_uuid = self.instance.primary_node
2380 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2381
2382 elif self.mode == constants.REPLACE_DISK_CHG:
2383 self.new_node_uuid = remote_node_uuid
2384 self.other_node_uuid = self.instance.primary_node
2385 self.target_node_uuid = secondary_node_uuid
2386 check_nodes = [self.new_node_uuid, self.other_node_uuid]
2387
2388 CheckNodeNotDrained(self.lu, remote_node_uuid)
2389 CheckNodeVmCapable(self.lu, remote_node_uuid)
2390
2391 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2392 assert old_node_info is not None
2393 if old_node_info.offline and not self.early_release:
2394
2395 self.early_release = True
2396 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2397 " early-release mode", secondary_node_uuid)
2398
2399 else:
2400 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2401 self.mode)
2402
2403
2404 if not self.disks:
2405 self.disks = range(len(self.instance.disks))
2406
2407 disks = self.cfg.GetInstanceDisks(self.instance.uuid)
2408 if (not disks or
2409 not utils.AllDiskOfType(disks, [constants.DT_DRBD8])):
2410 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
2411 " instances", errors.ECODE_INVAL)
2412
2413
2414
2415 if self.remote_node_info:
2416
2417 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2418 cluster = self.cfg.GetClusterInfo()
2419 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2420 new_group_info)
2421 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance,
2422 self.remote_node_info, self.cfg,
2423 ignore=self.ignore_ipolicy)
2424
2425 for node_uuid in check_nodes:
2426 CheckNodeOnline(self.lu, node_uuid)
2427
2428 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2429 self.other_node_uuid,
2430 self.target_node_uuid]
2431 if node_uuid is not None)
2432
2433
2434 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2435 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2436
2437
2438 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2439
2440
2441 for disk_idx in self.disks:
2442 self.instance.FindDisk(disk_idx)
2443
2444
2445 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2446 in self.cfg.GetMultiNodeInfo(touched_nodes))
2447
2448 - def Exec(self, feedback_fn):
2449 """Execute disk replacement.
2450
2451 This dispatches the disk replacement to the appropriate handler.
2452
2453 """
2454 if __debug__:
2455
2456 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2457 assert set(owned_nodes) == set(self.node_secondary_ip), \
2458 ("Incorrect node locks, owning %s, expected %s" %
2459 (owned_nodes, self.node_secondary_ip.keys()))
2460 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2461 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2462
2463 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2464 assert list(owned_instances) == [self.instance_name], \
2465 "Instance '%s' not locked" % self.instance_name
2466
2467 if not self.disks:
2468 feedback_fn("No disks need replacement for instance '%s'" %
2469 self.instance.name)
2470 return
2471
2472 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2473 (utils.CommaJoin(self.disks), self.instance.name))
2474 feedback_fn("Current primary node: %s" %
2475 self.cfg.GetNodeName(self.instance.primary_node))
2476 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid)
2477 feedback_fn("Current secondary node: %s" %
2478 utils.CommaJoin(self.cfg.GetNodeNames(secondary_nodes)))
2479
2480 activate_disks = not self.instance.disks_active
2481
2482
2483
2484
2485 if activate_disks and not self.instance.forthcoming:
2486 StartInstanceDisks(self.lu, self.instance, True)
2487
2488 self.instance = self.cfg.GetInstanceInfo(self.instance.uuid)
2489
2490 try:
2491
2492 if self.new_node_uuid is not None:
2493 fn = self._ExecDrbd8Secondary
2494 else:
2495 fn = self._ExecDrbd8DiskOnly
2496
2497 result = fn(feedback_fn)
2498 finally:
2499
2500
2501 if activate_disks and not self.instance.forthcoming:
2502 _SafeShutdownInstanceDisks(self.lu, self.instance,
2503 req_states=INSTANCE_NOT_RUNNING)
2504
2505 self.lu.AssertReleasedLocks(locking.LEVEL_NODE)
2506
2507 if __debug__:
2508
2509 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2510 nodes = frozenset(self.node_secondary_ip)
2511 assert ((self.early_release and not owned_nodes) or
2512 (not self.early_release and not (set(owned_nodes) - nodes))), \
2513 ("Not owning the correct locks, early_release=%s, owned=%r,"
2514 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2515
2516 return result
2517
2519 self.lu.LogInfo("Checking volume groups")
2520
2521 vgname = self.cfg.GetVGName()
2522
2523
2524 results = self.rpc.call_vg_list(node_uuids)
2525 if not results:
2526 raise errors.OpExecError("Can't list volume groups on the nodes")
2527
2528 for node_uuid in node_uuids:
2529 res = results[node_uuid]
2530 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2531 if vgname not in res.payload:
2532 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2533 (vgname, self.cfg.GetNodeName(node_uuid)))
2534
2536
2537 for idx, dev in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)):
2538 if idx not in self.disks:
2539 continue
2540
2541 for node_uuid in node_uuids:
2542 self.lu.LogInfo("Checking disk/%d on %s", idx,
2543 self.cfg.GetNodeName(node_uuid))
2544
2545 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2546
2547 msg = result.fail_msg
2548 if msg or not result.payload:
2549 if not msg:
2550 msg = "disk not found"
2551 if not self._CheckDisksActivated(self.instance):
2552 extra_hint = ("\nDisks seem to be not properly activated. Try"
2553 " running activate-disks on the instance before"
2554 " using replace-disks.")
2555 else:
2556 extra_hint = ""
2557 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2558 (idx, self.cfg.GetNodeName(node_uuid), msg,
2559 extra_hint))
2560
2575
2577 """Create new storage on the primary or secondary node.
2578
2579 This is only used for same-node replaces, not for changing the
2580 secondary node, hence we don't want to modify the existing disk.
2581
2582 """
2583 iv_names = {}
2584
2585 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
2586 disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg)
2587 for idx, dev in enumerate(disks):
2588 if idx not in self.disks:
2589 continue
2590
2591 self.lu.LogInfo("Adding storage on %s for disk/%d",
2592 self.cfg.GetNodeName(node_uuid), idx)
2593
2594 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2595 names = _GenerateUniqueNames(self.lu, lv_names)
2596
2597 (data_disk, meta_disk) = dev.children
2598 vg_data = data_disk.logical_id[0]
2599 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2600 logical_id=(vg_data, names[0]),
2601 params=data_disk.params)
2602 vg_meta = meta_disk.logical_id[0]
2603 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2604 size=constants.DRBD_META_SIZE,
2605 logical_id=(vg_meta, names[1]),
2606 params=meta_disk.params)
2607
2608 new_lvs = [lv_data, lv_meta]
2609 old_lvs = [child.Copy() for child in dev.children]
2610 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2611 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2612
2613
2614 for new_lv in new_lvs:
2615 try:
2616 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2617 GetInstanceInfoText(self.instance), False,
2618 excl_stor)
2619 except errors.DeviceCreationError, e:
2620 raise errors.OpExecError("Can't create block device: %s" % e.message)
2621
2622 return iv_names
2623
2625 for name, (dev, _, _) in iv_names.iteritems():
2626 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2627
2628 msg = result.fail_msg
2629 if msg or not result.payload:
2630 if not msg:
2631 msg = "disk not found"
2632 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2633 (name, msg))
2634
2635 if result.payload.is_degraded:
2636 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2637
2639 for name, (_, old_lvs, _) in iv_names.iteritems():
2640 self.lu.LogInfo("Remove logical volumes for %s", name)
2641
2642 for lv in old_lvs:
2643 msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
2644 .fail_msg
2645 if msg:
2646 self.lu.LogWarning("Can't remove old LV: %s", msg,
2647 hint="remove unused LVs manually")
2648
2650 """Replace a disk on the primary or secondary for DRBD 8.
2651
2652 The algorithm for replace is quite complicated:
2653
2654 1. for each disk to be replaced:
2655
2656 1. create new LVs on the target node with unique names
2657 1. detach old LVs from the drbd device
2658 1. rename old LVs to name_replaced.<time_t>
2659 1. rename new LVs to old LVs
2660 1. attach the new LVs (with the old names now) to the drbd device
2661
2662 1. wait for sync across all devices
2663
2664 1. for each modified disk:
2665
2666 1. remove old LVs (which have the name name_replaces.<time_t>)
2667
2668 Failures are not very well handled.
2669
2670 """
2671 steps_total = 6
2672
2673 if self.instance.forthcoming:
2674 feedback_fn("Instance forthcoming, not touching disks")
2675 return
2676
2677
2678 self.lu.LogStep(1, steps_total, "Check device existence")
2679 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2680 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2681
2682
2683 self.lu.LogStep(2, steps_total, "Check peer consistency")
2684 self._CheckDisksConsistency(
2685 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2686 False)
2687
2688
2689 self.lu.LogStep(3, steps_total, "Allocate new storage")
2690 iv_names = self._CreateNewStorage(self.target_node_uuid)
2691
2692
2693 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2694 for dev, old_lvs, new_lvs in iv_names.itervalues():
2695 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2696
2697 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
2698 (dev, self.instance),
2699 (old_lvs, self.instance))
2700 result.Raise("Can't detach drbd from local storage on node"
2701 " %s for device %s" %
2702 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712 temp_suffix = int(time.time())
2713 ren_fn = lambda d, suff: (d.logical_id[0],
2714 d.logical_id[1] + "_replaced-%s" % suff)
2715
2716
2717 rename_old_to_new = []
2718 for to_ren in old_lvs:
2719 result = self.rpc.call_blockdev_find(self.target_node_uuid,
2720 (to_ren, self.instance))
2721 if not result.fail_msg and result.payload:
2722
2723 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2724
2725 self.lu.LogInfo("Renaming the old LVs on the target node")
2726 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2727 rename_old_to_new)
2728 result.Raise("Can't rename old LVs on node %s" %
2729 self.cfg.GetNodeName(self.target_node_uuid))
2730
2731
2732 self.lu.LogInfo("Renaming the new LVs on the target node")
2733 rename_new_to_old = [(new, old.logical_id)
2734 for old, new in zip(old_lvs, new_lvs)]
2735 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2736 rename_new_to_old)
2737 result.Raise("Can't rename new LVs on node %s" %
2738 self.cfg.GetNodeName(self.target_node_uuid))
2739
2740
2741 for old, new in zip(old_lvs, new_lvs):
2742 new.logical_id = old.logical_id
2743
2744
2745
2746
2747 for disk in old_lvs:
2748 disk.logical_id = ren_fn(disk, temp_suffix)
2749
2750
2751 self.lu.LogInfo("Adding new mirror component on %s",
2752 self.cfg.GetNodeName(self.target_node_uuid))
2753 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2754 (dev, self.instance),
2755 (new_lvs, self.instance))
2756 msg = result.fail_msg
2757 if msg:
2758 for new_lv in new_lvs:
2759 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2760 (new_lv, self.instance)).fail_msg
2761 if msg2:
2762 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2763 hint=("cleanup manually the unused logical"
2764 "volumes"))
2765 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2766
2767 cstep = itertools.count(5)
2768