1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Common functions used by multiple logical units."""
32
33 import copy
34 import os
35
36 from ganeti import compat
37 from ganeti import constants
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 import ganeti.rpc.node as rpc
45 from ganeti import ssconf
46 from ganeti import utils
47
48
49
50 INSTANCE_DOWN = [constants.ADMINST_DOWN]
51 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
52 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
53
54
55 CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
56 constants.ADMINST_OFFLINE,
57 ]))
58
59
61 """Expand an item name.
62
63 @param expand_fn: the function to use for expansion
64 @param name: requested item name
65 @param kind: text description ('Node' or 'Instance')
66 @return: the result of the expand_fn, if successful
67 @raise errors.OpPrereqError: if the item is not found
68
69 """
70 (uuid, full_name) = expand_fn(name)
71 if uuid is None or full_name is None:
72 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
73 errors.ECODE_NOENT)
74 return (uuid, full_name)
75
76
78 """Wrapper over L{_ExpandItemName} for instance."""
79 (uuid, full_name) = _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
80 if expected_uuid is not None and uuid != expected_uuid:
81 raise errors.OpPrereqError(
82 "The instances UUID '%s' does not match the expected UUID '%s' for"
83 " instance '%s'. Maybe the instance changed since you submitted this"
84 " job." % (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
85 return (uuid, full_name)
86
87
89 """Expand a short node name into the node UUID and full name.
90
91 @type cfg: L{config.ConfigWriter}
92 @param cfg: The cluster configuration
93 @type expected_uuid: string
94 @param expected_uuid: expected UUID for the node (or None if there is no
95 expectation). If it does not match, a L{errors.OpPrereqError} is
96 raised.
97 @type name: string
98 @param name: the short node name
99
100 """
101 (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
102 if expected_uuid is not None and uuid != expected_uuid:
103 raise errors.OpPrereqError(
104 "The nodes UUID '%s' does not match the expected UUID '%s' for node"
105 " '%s'. Maybe the node changed since you submitted this job." %
106 (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
107 return (uuid, full_name)
108
109
111 """Returns a dict declaring all lock levels shared.
112
113 """
114 return dict.fromkeys(locking.LEVELS, 1)
115
116
118 """Checks if the instances in a node group are still correct.
119
120 @type cfg: L{config.ConfigWriter}
121 @param cfg: The cluster configuration
122 @type group_uuid: string
123 @param group_uuid: Node group UUID
124 @type owned_instance_names: set or frozenset
125 @param owned_instance_names: List of currently owned instances
126
127 """
128 wanted_instances = frozenset(cfg.GetInstanceNames(
129 cfg.GetNodeGroupInstances(group_uuid)))
130 if owned_instance_names != wanted_instances:
131 raise errors.OpPrereqError("Instances in node group '%s' changed since"
132 " locks were acquired, wanted '%s', have '%s';"
133 " retry the operation" %
134 (group_uuid,
135 utils.CommaJoin(wanted_instances),
136 utils.CommaJoin(owned_instance_names)),
137 errors.ECODE_STATE)
138
139 return wanted_instances
140
141
143 """Returns list of checked and expanded node names.
144
145 @type lu: L{LogicalUnit}
146 @param lu: the logical unit on whose behalf we execute
147 @type short_node_names: list
148 @param short_node_names: list of node names or None for all nodes
149 @rtype: tuple of lists
150 @return: tupe with (list of node UUIDs, list of node names)
151 @raise errors.ProgrammerError: if the nodes parameter is wrong type
152
153 """
154 if short_node_names:
155 node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
156 for name in short_node_names]
157 else:
158 node_uuids = lu.cfg.GetNodeList()
159
160 return (node_uuids, [lu.cfg.GetNodeName(uuid) for uuid in node_uuids])
161
162
164 """Returns list of checked and expanded instance names.
165
166 @type lu: L{LogicalUnit}
167 @param lu: the logical unit on whose behalf we execute
168 @type short_inst_names: list
169 @param short_inst_names: list of instance names or None for all instances
170 @rtype: tuple of lists
171 @return: tuple of (instance UUIDs, instance names)
172 @raise errors.OpPrereqError: if the instances parameter is wrong type
173 @raise errors.OpPrereqError: if any of the passed instances is not found
174
175 """
176 if short_inst_names:
177 inst_uuids = [ExpandInstanceUuidAndName(lu.cfg, None, name)[0]
178 for name in short_inst_names]
179 else:
180 inst_uuids = lu.cfg.GetInstanceList()
181 return (inst_uuids, [lu.cfg.GetInstanceName(uuid) for uuid in inst_uuids])
182
183
184 -def RunPostHook(lu, node_name):
185 """Runs the post-hook for an opcode on a single node.
186
187 """
188 hm = lu.proc.BuildHooksManager(lu)
189 try:
190 hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=[node_name])
191 except Exception, err:
192 lu.LogWarning("Errors occurred running hooks on %s: %s",
193 node_name, err)
194
195
197 """Distribute additional files which are part of the cluster configuration.
198
199 ConfigWriter takes care of distributing the config and ssconf files, but
200 there are more files which should be distributed to all nodes. This function
201 makes sure those are copied.
202
203 """
204
205 cluster = lu.cfg.GetClusterInfo()
206 master_info = lu.cfg.GetMasterNodeInfo()
207
208 online_node_uuids = lu.cfg.GetOnlineNodeList()
209 online_node_uuid_set = frozenset(online_node_uuids)
210 vm_node_uuids = list(online_node_uuid_set.intersection(
211 lu.cfg.GetVmCapableNodeList()))
212
213
214 for node_uuids in [online_node_uuids, vm_node_uuids]:
215 if master_info.uuid in node_uuids:
216 node_uuids.remove(master_info.uuid)
217
218
219 (files_all, _, files_mc, files_vm) = \
220 ComputeAncillaryFiles(cluster, True)
221
222
223 assert not (pathutils.CLUSTER_CONF_FILE in files_all or
224 pathutils.CLUSTER_CONF_FILE in files_vm)
225 assert not files_mc, "Master candidates not handled in this function"
226
227 filemap = [
228 (online_node_uuids, files_all),
229 (vm_node_uuids, files_vm),
230 ]
231
232
233 for (node_uuids, files) in filemap:
234 for fname in files:
235 UploadHelper(lu, node_uuids, fname)
236
237
239 """Compute files external to Ganeti which need to be consistent.
240
241 @type redist: boolean
242 @param redist: Whether to include files which need to be redistributed
243
244 """
245
246 files_all = set([
247 pathutils.SSH_KNOWN_HOSTS_FILE,
248 pathutils.CONFD_HMAC_KEY,
249 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
250 pathutils.SPICE_CERT_FILE,
251 pathutils.SPICE_CACERT_FILE,
252 pathutils.RAPI_USERS_FILE,
253 ])
254
255 if redist:
256
257 files_all.add(pathutils.RAPI_CERT_FILE)
258 else:
259 files_all.update(pathutils.ALL_CERT_FILES)
260 files_all.update(ssconf.SimpleStore().GetFileList())
261
262 if cluster.modify_etc_hosts:
263 files_all.add(pathutils.ETC_HOSTS)
264
265 if cluster.use_external_mip_script:
266 files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
267
268
269
270
271 files_opt = set([
272 pathutils.RAPI_USERS_FILE,
273 ])
274
275
276 files_mc = set()
277
278 if not redist:
279 files_mc.add(pathutils.CLUSTER_CONF_FILE)
280
281
282 if (not redist and (cluster.IsFileStorageEnabled() or
283 cluster.IsSharedFileStorageEnabled())):
284 files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
285 files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
286
287
288 files_vm = set(
289 filename
290 for hv_name in cluster.enabled_hypervisors
291 for filename in
292 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
293
294 files_opt |= set(
295 filename
296 for hv_name in cluster.enabled_hypervisors
297 for filename in
298 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
299
300
301 all_files_set = files_all | files_mc | files_vm
302 assert (len(all_files_set) ==
303 sum(map(len, [files_all, files_mc, files_vm]))), \
304 "Found file listed in more than one file list"
305
306
307 assert all_files_set.issuperset(files_opt), \
308 "Optional file not in a different required list"
309
310
311 assert not (redist and
312 pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
313
314 return (files_all, files_opt, files_mc, files_vm)
315
316
318 """Helper for uploading a file and showing warnings.
319
320 """
321 if os.path.exists(fname):
322 result = lu.rpc.call_upload_file(node_uuids, fname)
323 for to_node_uuids, to_result in result.items():
324 msg = to_result.fail_msg
325 if msg:
326 msg = ("Copy of file %s to node %s failed: %s" %
327 (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
328 lu.LogWarning(msg)
329
330
332 """Combines the hv state from an opcode with the one of the object
333
334 @param op_input: The input dict from the opcode
335 @param obj_input: The input dict from the objects
336 @return: The verified and updated dict
337
338 """
339 if op_input:
340 invalid_hvs = set(op_input) - constants.HYPER_TYPES
341 if invalid_hvs:
342 raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
343 " %s" % utils.CommaJoin(invalid_hvs),
344 errors.ECODE_INVAL)
345 if obj_input is None:
346 obj_input = {}
347 type_check = constants.HVSTS_PARAMETER_TYPES
348 return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
349
350 return None
351
352
354 """Combines the disk state from an opcode with the one of the object
355
356 @param op_input: The input dict from the opcode
357 @param obj_input: The input dict from the objects
358 @return: The verified and updated dict
359 """
360 if op_input:
361 invalid_dst = set(op_input) - constants.DS_VALID_TYPES
362 if invalid_dst:
363 raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
364 utils.CommaJoin(invalid_dst),
365 errors.ECODE_INVAL)
366 type_check = constants.DSS_PARAMETER_TYPES
367 if obj_input is None:
368 obj_input = {}
369 return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
370 type_check))
371 for key, value in op_input.items())
372
373 return None
374
375
377 """OS parameters validation.
378
379 @type lu: L{LogicalUnit}
380 @param lu: the logical unit for which we check
381 @type required: boolean
382 @param required: whether the validation should fail if the OS is not
383 found
384 @type node_uuids: list
385 @param node_uuids: the list of nodes on which we should check
386 @type osname: string
387 @param osname: the name of the hypervisor we should use
388 @type osparams: dict
389 @param osparams: the parameters which we need to check
390 @raise errors.OpPrereqError: if the parameters are not valid
391
392 """
393 node_uuids = _FilterVmNodes(lu, node_uuids)
394 result = lu.rpc.call_os_validate(node_uuids, required, osname,
395 [constants.OS_VALIDATE_PARAMETERS],
396 osparams)
397 for node_uuid, nres in result.items():
398
399
400 nres.Raise("OS Parameters validation failed on node %s" %
401 lu.cfg.GetNodeName(node_uuid))
402 if not nres.payload:
403 lu.LogInfo("OS %s not found on node %s, validation skipped",
404 osname, lu.cfg.GetNodeName(node_uuid))
405
406
408 """Hypervisor parameter validation.
409
410 This function abstracts the hypervisor parameter validation to be
411 used in both instance create and instance modify.
412
413 @type lu: L{LogicalUnit}
414 @param lu: the logical unit for which we check
415 @type node_uuids: list
416 @param node_uuids: the list of nodes on which we should check
417 @type hvname: string
418 @param hvname: the name of the hypervisor we should use
419 @type hvparams: dict
420 @param hvparams: the parameters which we need to check
421 @raise errors.OpPrereqError: if the parameters are not valid
422
423 """
424 node_uuids = _FilterVmNodes(lu, node_uuids)
425
426 cluster = lu.cfg.GetClusterInfo()
427 hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
428
429 hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
430 for node_uuid in node_uuids:
431 info = hvinfo[node_uuid]
432 if info.offline:
433 continue
434 info.Raise("Hypervisor parameter validation failed on node %s" %
435 lu.cfg.GetNodeName(node_uuid))
436
437
455
456
458 """Check node PVs.
459
460 """
461 pvlist_dict = nresult.get(constants.NV_PVLIST, None)
462 if pvlist_dict is None:
463 return (["Can't get PV list from node"], None)
464 pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
465 errlist = []
466
467
468
469 for pv in pvlist:
470 if ":" in pv.name:
471 errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
472 (pv.name, pv.vg_name))
473 es_pvinfo = None
474 if exclusive_storage:
475 (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
476 errlist.extend(errmsgs)
477 shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
478 if shared_pvs:
479 for (pvname, lvlist) in shared_pvs:
480
481 errlist.append("PV %s is shared among unrelated LVs (%s)" %
482 (pvname, utils.CommaJoin(lvlist)))
483 return (errlist, es_pvinfo)
484
485
487 """Computes if value is in the desired range.
488
489 @param name: name of the parameter for which we perform the check
490 @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
491 not just 'disk')
492 @param ispecs: dictionary containing min and max values
493 @param value: actual value that we want to use
494 @return: None or an error string
495
496 """
497 if value in [None, constants.VALUE_AUTO]:
498 return None
499 max_v = ispecs[constants.ISPECS_MAX].get(name, value)
500 min_v = ispecs[constants.ISPECS_MIN].get(name, value)
501 if value > max_v or min_v > value:
502 if qualifier:
503 fqn = "%s/%s" % (name, qualifier)
504 else:
505 fqn = name
506 return ("%s value %s is not in range [%s, %s]" %
507 (fqn, value, min_v, max_v))
508 return None
509
510
515 """Verifies ipolicy against provided specs.
516
517 @type ipolicy: dict
518 @param ipolicy: The ipolicy
519 @type mem_size: int
520 @param mem_size: The memory size
521 @type cpu_count: int
522 @param cpu_count: Used cpu cores
523 @type disk_count: int
524 @param disk_count: Number of disks used
525 @type nic_count: int
526 @param nic_count: Number of nics used
527 @type disk_sizes: list of ints
528 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
529 @type spindle_use: int
530 @param spindle_use: The number of spindles this instance uses
531 @type disk_template: string
532 @param disk_template: The disk template of the instance
533 @param _compute_fn: The compute function (unittest only)
534 @return: A list of violations, or an empty list of no violations are found
535
536 """
537 assert disk_count == len(disk_sizes)
538
539 test_settings = [
540 (constants.ISPEC_MEM_SIZE, "", mem_size),
541 (constants.ISPEC_CPU_COUNT, "", cpu_count),
542 (constants.ISPEC_NIC_COUNT, "", nic_count),
543 (constants.ISPEC_SPINDLE_USE, "", spindle_use),
544 ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
545 for idx, d in enumerate(disk_sizes)]
546 if disk_template != constants.DT_DISKLESS:
547
548 test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
549 ret = []
550 allowed_dts = ipolicy[constants.IPOLICY_DTS]
551 if disk_template not in allowed_dts:
552 ret.append("Disk template %s is not allowed (allowed templates: %s)" %
553 (disk_template, utils.CommaJoin(allowed_dts)))
554
555 min_errs = None
556 for minmax in ipolicy[constants.ISPECS_MINMAX]:
557 errs = filter(None,
558 (_compute_fn(name, qualifier, minmax, value)
559 for (name, qualifier, value) in test_settings))
560 if min_errs is None or len(errs) < len(min_errs):
561 min_errs = errs
562 assert min_errs is not None
563 return ret + min_errs
564
565
568 """Compute if instance meets the specs of ipolicy.
569
570 @type ipolicy: dict
571 @param ipolicy: The ipolicy to verify against
572 @type instance: L{objects.Instance}
573 @param instance: The instance to verify
574 @type cfg: L{config.ConfigWriter}
575 @param cfg: Cluster configuration
576 @param _compute_fn: The function to verify ipolicy (unittest only)
577 @see: L{ComputeIPolicySpecViolation}
578
579 """
580 ret = []
581 be_full = cfg.GetClusterInfo().FillBE(instance)
582 mem_size = be_full[constants.BE_MAXMEM]
583 cpu_count = be_full[constants.BE_VCPUS]
584 es_flags = rpc.GetExclusiveStorageForNodes(cfg, instance.all_nodes)
585 if any(es_flags.values()):
586
587 try:
588 spindle_use = sum([disk.spindles for disk in instance.disks])
589 except TypeError:
590 ret.append("Number of spindles not configured for disks of instance %s"
591 " while exclusive storage is enabled, try running gnt-cluster"
592 " repair-disk-sizes" % instance.name)
593
594 spindle_use = None
595 else:
596 spindle_use = be_full[constants.BE_SPINDLE_USE]
597 disk_count = len(instance.disks)
598 disk_sizes = [disk.size for disk in instance.disks]
599 nic_count = len(instance.nics)
600 disk_template = instance.disk_template
601
602 return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
603 disk_sizes, spindle_use, disk_template)
604
605
607 """Computes a set of instances who violates given ipolicy.
608
609 @param ipolicy: The ipolicy to verify
610 @type instances: L{objects.Instance}
611 @param instances: List of instances to verify
612 @type cfg: L{config.ConfigWriter}
613 @param cfg: Cluster configuration
614 @return: A frozenset of instance names violating the ipolicy
615
616 """
617 return frozenset([inst.name for inst in instances
618 if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
619
620
622 """Computes a set of any instances that would violate the new ipolicy.
623
624 @param old_ipolicy: The current (still in-place) ipolicy
625 @param new_ipolicy: The new (to become) ipolicy
626 @param instances: List of instances to verify
627 @type cfg: L{config.ConfigWriter}
628 @param cfg: Cluster configuration
629 @return: A list of instances which violates the new ipolicy but
630 did not before
631
632 """
633 return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
634 _ComputeViolatingInstances(old_ipolicy, instances, cfg))
635
636
637 -def GetUpdatedParams(old_params, update_dict,
638 use_default=True, use_none=False):
639 """Return the new version of a parameter dictionary.
640
641 @type old_params: dict
642 @param old_params: old parameters
643 @type update_dict: dict
644 @param update_dict: dict containing new parameter values, or
645 constants.VALUE_DEFAULT to reset the parameter to its default
646 value
647 @param use_default: boolean
648 @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
649 values as 'to be deleted' values
650 @param use_none: boolean
651 @type use_none: whether to recognise C{None} values as 'to be
652 deleted' values
653 @rtype: dict
654 @return: the new parameter dictionary
655
656 """
657 params_copy = copy.deepcopy(old_params)
658 for key, val in update_dict.iteritems():
659 if ((use_default and val == constants.VALUE_DEFAULT) or
660 (use_none and val is None)):
661 try:
662 del params_copy[key]
663 except KeyError:
664 pass
665 else:
666 params_copy[key] = val
667 return params_copy
668
669
671 """Return the new version of an instance policy.
672
673 @param group_policy: whether this policy applies to a group and thus
674 we should support removal of policy entries
675
676 """
677 ipolicy = copy.deepcopy(old_ipolicy)
678 for key, value in new_ipolicy.items():
679 if key not in constants.IPOLICY_ALL_KEYS:
680 raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
681 errors.ECODE_INVAL)
682 if (not value or value == [constants.VALUE_DEFAULT] or
683 value == constants.VALUE_DEFAULT):
684 if group_policy:
685 if key in ipolicy:
686 del ipolicy[key]
687 else:
688 raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
689 " on the cluster'" % key,
690 errors.ECODE_INVAL)
691 else:
692 if key in constants.IPOLICY_PARAMETERS:
693
694 try:
695 ipolicy[key] = float(value)
696 except (TypeError, ValueError), err:
697 raise errors.OpPrereqError("Invalid value for attribute"
698 " '%s': '%s', error: %s" %
699 (key, value, err), errors.ECODE_INVAL)
700 elif key == constants.ISPECS_MINMAX:
701 for minmax in value:
702 for k in minmax.keys():
703 utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
704 ipolicy[key] = value
705 elif key == constants.ISPECS_STD:
706 if group_policy:
707 msg = "%s cannot appear in group instance specs" % key
708 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
709 ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
710 use_none=False, use_default=False)
711 utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
712 else:
713
714
715 ipolicy[key] = list(value)
716 try:
717 objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
718 except errors.ConfigurationError, err:
719 raise errors.OpPrereqError("Invalid instance policy: %s" % err,
720 errors.ECODE_INVAL)
721 return ipolicy
722
723
725 """Little helper wrapper to the rpc annotation method.
726
727 @param instance: The instance object
728 @type devs: List of L{objects.Disk}
729 @param devs: The root devices (not any of its children!)
730 @param cfg: The config object
731 @returns The annotated disk copies
732 @see L{rpc.node.AnnotateDiskParams}
733
734 """
735 return rpc.AnnotateDiskParams(devs, cfg.GetInstanceDiskParams(instance))
736
737
739 """Tells if node supports OOB.
740
741 @type cfg: L{config.ConfigWriter}
742 @param cfg: The cluster configuration
743 @type node: L{objects.Node}
744 @param node: The node
745 @return: The OOB script if supported or an empty string otherwise
746
747 """
748 return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
749
750
752 """Updates and verifies a dict with sub dicts of the same type.
753
754 @param base: The dict with the old data
755 @param updates: The dict with the new data
756 @param type_check: Dict suitable to ForceDictType to verify correct types
757 @returns: A new dict with updated and verified values
758
759 """
760 def fn(old, value):
761 new = GetUpdatedParams(old, value)
762 utils.ForceDictType(new, type_check)
763 return new
764
765 ret = copy.deepcopy(base)
766 ret.update(dict((key, fn(base.get(key, {}), value))
767 for key, value in updates.items()))
768 return ret
769
770
772 """Filters out non-vm_capable nodes from a list.
773
774 @type lu: L{LogicalUnit}
775 @param lu: the logical unit for which we check
776 @type node_uuids: list
777 @param node_uuids: the list of nodes on which we should check
778 @rtype: list
779 @return: the list of vm-capable nodes
780
781 """
782 vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
783 return [uuid for uuid in node_uuids if uuid not in vm_nodes]
784
785
787 """Decides on which iallocator to use.
788
789 @type cfg: L{config.ConfigWriter}
790 @param cfg: Cluster configuration object
791 @type ialloc: string or None
792 @param ialloc: Iallocator specified in opcode
793 @rtype: string
794 @return: Iallocator name
795
796 """
797 if not ialloc:
798
799 ialloc = cfg.GetDefaultIAllocator()
800
801 if not ialloc:
802 raise errors.OpPrereqError("No iallocator was specified, neither in the"
803 " opcode nor as a cluster-wide default",
804 errors.ECODE_INVAL)
805
806 return ialloc
807
808
811 """Checks if node groups for locked instances are still correct.
812
813 @type cfg: L{config.ConfigWriter}
814 @param cfg: Cluster configuration
815 @type instances: dict; string as key, L{objects.Instance} as value
816 @param instances: Dictionary, instance UUID as key, instance object as value
817 @type owned_groups: iterable of string
818 @param owned_groups: List of owned groups
819 @type owned_node_uuids: iterable of string
820 @param owned_node_uuids: List of owned nodes
821 @type cur_group_uuid: string or None
822 @param cur_group_uuid: Optional group UUID to check against instance's groups
823
824 """
825 for (uuid, inst) in instances.items():
826 assert owned_node_uuids.issuperset(inst.all_nodes), \
827 "Instance %s's nodes changed while we kept the lock" % inst.name
828
829 inst_groups = CheckInstanceNodeGroups(cfg, uuid, owned_groups)
830
831 assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
832 "Instance %s has no node in group %s" % (inst.name, cur_group_uuid)
833
834
836 """Checks if the owned node groups are still correct for an instance.
837
838 @type cfg: L{config.ConfigWriter}
839 @param cfg: The cluster configuration
840 @type inst_uuid: string
841 @param inst_uuid: Instance UUID
842 @type owned_groups: set or frozenset
843 @param owned_groups: List of currently owned node groups
844 @type primary_only: boolean
845 @param primary_only: Whether to check node groups for only the primary node
846
847 """
848 inst_groups = cfg.GetInstanceNodeGroups(inst_uuid, primary_only)
849
850 if not owned_groups.issuperset(inst_groups):
851 raise errors.OpPrereqError("Instance %s's node groups changed since"
852 " locks were acquired, current groups are"
853 " are '%s', owning groups '%s'; retry the"
854 " operation" %
855 (cfg.GetInstanceName(inst_uuid),
856 utils.CommaJoin(inst_groups),
857 utils.CommaJoin(owned_groups)),
858 errors.ECODE_STATE)
859
860 return inst_groups
861
862
864 """Unpacks the result of change-group and node-evacuate iallocator requests.
865
866 Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
867 L{constants.IALLOCATOR_MODE_CHG_GROUP}.
868
869 @type lu: L{LogicalUnit}
870 @param lu: Logical unit instance
871 @type alloc_result: tuple/list
872 @param alloc_result: Result from iallocator
873 @type early_release: bool
874 @param early_release: Whether to release locks early if possible
875 @type use_nodes: bool
876 @param use_nodes: Whether to display node names instead of groups
877
878 """
879 (moved, failed, jobs) = alloc_result
880
881 if failed:
882 failreason = utils.CommaJoin("%s (%s)" % (name, reason)
883 for (name, reason) in failed)
884 lu.LogWarning("Unable to evacuate instances %s", failreason)
885 raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
886
887 if moved:
888 lu.LogInfo("Instances to be moved: %s",
889 utils.CommaJoin(
890 "%s (to %s)" %
891 (name, _NodeEvacDest(use_nodes, group, node_names))
892 for (name, group, node_names) in moved))
893
894 return [map(compat.partial(_SetOpEarlyRelease, early_release),
895 map(opcodes.OpCode.LoadOpCode, ops))
896 for ops in jobs]
897
898
900 """Returns group or nodes depending on caller's choice.
901
902 """
903 if use_nodes:
904 return utils.CommaJoin(node_names)
905 else:
906 return group
907
908
910 """Sets C{early_release} flag on opcodes if available.
911
912 """
913 try:
914 op.early_release = early_release
915 except AttributeError:
916 assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
917
918 return op
919
920
922 """Creates a map from (node, volume) to instance name.
923
924 @type instances: list of L{objects.Instance}
925 @rtype: dict; tuple of (node uuid, volume name) as key, L{objects.Instance}
926 object as value
927
928 """
929 return dict(((node_uuid, vol), inst)
930 for inst in instances
931 for (node_uuid, vols) in inst.MapLVsByNode().items()
932 for vol in vols)
933
934
936 """Make sure that none of the given paramters is global.
937
938 If a global parameter is found, an L{errors.OpPrereqError} exception is
939 raised. This is used to avoid setting global parameters for individual nodes.
940
941 @type params: dictionary
942 @param params: Parameters to check
943 @type glob_pars: dictionary
944 @param glob_pars: Forbidden parameters
945 @type kind: string
946 @param kind: Kind of parameters (e.g. "node")
947 @type bad_levels: string
948 @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
949 "instance")
950 @type good_levels: strings
951 @param good_levels: Level(s) at which the parameters are allowed (e.g.
952 "cluster or group")
953
954 """
955 used_globals = glob_pars.intersection(params)
956 if used_globals:
957 msg = ("The following %s parameters are global and cannot"
958 " be customized at %s level, please modify them at"
959 " %s level: %s" %
960 (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
961 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
962
963
965 """Whether exclusive_storage is in effect for the given node.
966
967 @type cfg: L{config.ConfigWriter}
968 @param cfg: The cluster configuration
969 @type node: L{objects.Node}
970 @param node: The node
971 @rtype: bool
972 @return: The effective value of exclusive_storage
973
974 """
975 return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
976
977
979 """Ensure that an instance is in one of the required states.
980
981 @param lu: the LU on behalf of which we make the check
982 @param instance: the instance to check
983 @param msg: if passed, should be a message to replace the default one
984 @raise errors.OpPrereqError: if the instance is not in the required state
985
986 """
987 if msg is None:
988 msg = ("can't use instance from outside %s states" %
989 utils.CommaJoin(req_states))
990 if instance.admin_state not in req_states:
991 raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
992 (instance.name, instance.admin_state, msg),
993 errors.ECODE_STATE)
994
995 if constants.ADMINST_UP not in req_states:
996 pnode_uuid = instance.primary_node
997 if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
998 all_hvparams = lu.cfg.GetClusterInfo().hvparams
999 ins_l = lu.rpc.call_instance_list(
1000 [pnode_uuid], [instance.hypervisor], all_hvparams)[pnode_uuid]
1001 ins_l.Raise("Can't contact node %s for instance information" %
1002 lu.cfg.GetNodeName(pnode_uuid),
1003 prereq=True, ecode=errors.ECODE_ENVIRON)
1004 if instance.name in ins_l.payload:
1005 raise errors.OpPrereqError("Instance %s is running, %s" %
1006 (instance.name, msg), errors.ECODE_STATE)
1007 else:
1008 lu.LogWarning("Primary node offline, ignoring check that instance"
1009 " is down")
1010
1011
1013 """Check the sanity of iallocator and node arguments and use the
1014 cluster-wide iallocator if appropriate.
1015
1016 Check that at most one of (iallocator, node) is specified. If none is
1017 specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
1018 then the LU's opcode's iallocator slot is filled with the cluster-wide
1019 default iallocator.
1020
1021 @type iallocator_slot: string
1022 @param iallocator_slot: the name of the opcode iallocator slot
1023 @type node_slot: string
1024 @param node_slot: the name of the opcode target node slot
1025
1026 """
1027 node = getattr(lu.op, node_slot, None)
1028 ialloc = getattr(lu.op, iallocator_slot, None)
1029 if node == []:
1030 node = None
1031
1032 if node is not None and ialloc is not None:
1033 raise errors.OpPrereqError("Do not specify both, iallocator and node",
1034 errors.ECODE_INVAL)
1035 elif ((node is None and ialloc is None) or
1036 ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1037 default_iallocator = lu.cfg.GetDefaultIAllocator()
1038 if default_iallocator:
1039 setattr(lu.op, iallocator_slot, default_iallocator)
1040 else:
1041 raise errors.OpPrereqError("No iallocator or node given and no"
1042 " cluster-wide default iallocator found;"
1043 " please specify either an iallocator or a"
1044 " node, or set a cluster-wide default"
1045 " iallocator", errors.ECODE_INVAL)
1046
1047
1062
1063
1065 """Ensure that a given node is online.
1066
1067 @param lu: the LU on behalf of which we make the check
1068 @param node_uuid: the node to check
1069 @param msg: if passed, should be a message to replace the default one
1070 @raise errors.OpPrereqError: if the node is offline
1071
1072 """
1073 if msg is None:
1074 msg = "Can't use offline node"
1075 if lu.cfg.GetNodeInfo(node_uuid).offline:
1076 raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),
1077 errors.ECODE_STATE)
1078
1079
1081 """Helper function to check if a disk template is enabled.
1082
1083 @type cluster: C{objects.Cluster}
1084 @param cluster: the cluster's configuration
1085 @type disk_template: str
1086 @param disk_template: the disk template to be checked
1087
1088 """
1089 assert disk_template is not None
1090 if disk_template not in constants.DISK_TEMPLATES:
1091 raise errors.OpPrereqError("'%s' is not a valid disk template."
1092 " Valid disk templates are: %s" %
1093 (disk_template,
1094 ",".join(constants.DISK_TEMPLATES)))
1095 if not disk_template in cluster.enabled_disk_templates:
1096 raise errors.OpPrereqError("Disk template '%s' is not enabled in cluster."
1097 " Enabled disk templates are: %s" %
1098 (disk_template,
1099 ",".join(cluster.enabled_disk_templates)))
1100
1101
1103 """Helper function to check if a storage type is enabled.
1104
1105 @type cluster: C{objects.Cluster}
1106 @param cluster: the cluster's configuration
1107 @type storage_type: str
1108 @param storage_type: the storage type to be checked
1109
1110 """
1111 assert storage_type is not None
1112 assert storage_type in constants.STORAGE_TYPES
1113
1114
1115 if storage_type == constants.ST_LVM_PV:
1116 CheckStorageTypeEnabled(cluster, constants.ST_LVM_VG)
1117 else:
1118 possible_disk_templates = \
1119 utils.storage.GetDiskTemplatesOfStorageTypes(storage_type)
1120 for disk_template in possible_disk_templates:
1121 if disk_template in cluster.enabled_disk_templates:
1122 return
1123 raise errors.OpPrereqError("No disk template of storage type '%s' is"
1124 " enabled in this cluster. Enabled disk"
1125 " templates are: %s" % (storage_type,
1126 ",".join(cluster.enabled_disk_templates)))
1127
1128
1130 """Checks ipolicy disk templates against enabled disk tempaltes.
1131
1132 @type ipolicy: dict
1133 @param ipolicy: the new ipolicy
1134 @type enabled_disk_templates: list of string
1135 @param enabled_disk_templates: list of enabled disk templates on the
1136 cluster
1137 @raises errors.OpPrereqError: if there is at least one allowed disk
1138 template that is not also enabled.
1139
1140 """
1141 assert constants.IPOLICY_DTS in ipolicy
1142 allowed_disk_templates = ipolicy[constants.IPOLICY_DTS]
1143 not_enabled = set(allowed_disk_templates) - set(enabled_disk_templates)
1144 if not_enabled:
1145 raise errors.OpPrereqError("The following disk template are allowed"
1146 " by the ipolicy, but not enabled on the"
1147 " cluster: %s" % utils.CommaJoin(not_enabled))
1148
1149
1167
1168
1170 """Checks if the access param is consistent with the cluster configuration.
1171
1172 @note: requires a configuration lock to run.
1173 @param parameters: the parameters to validate
1174 @param cfg: the cfg object of the cluster
1175 @param group: if set, only check for consistency within this group.
1176 @raise errors.OpPrereqError: if the LU attempts to change the access parameter
1177 to an invalid value, such as "pink bunny".
1178 @raise errors.OpPrereqError: if the LU attempts to change the access parameter
1179 to an inconsistent value, such as asking for RBD
1180 userspace access to the chroot hypervisor.
1181
1182 """
1183 CheckDiskAccessModeValidity(parameters)
1184
1185 for disk_template in parameters:
1186 access = parameters[disk_template].get(constants.LDP_ACCESS,
1187 constants.DISK_KERNELSPACE)
1188
1189 if disk_template not in constants.DTS_HAVE_ACCESS:
1190 continue
1191
1192
1193
1194 inst_uuids = cfg.GetNodeGroupInstances(group) if group else \
1195 cfg.GetInstanceList()
1196
1197 for entry in inst_uuids:
1198 inst = cfg.GetInstanceInfo(entry)
1199 inst_template = inst.disk_template
1200
1201 if inst_template != disk_template:
1202 continue
1203
1204 hv = inst.hypervisor
1205
1206 if not IsValidDiskAccessModeCombination(hv, inst_template, access):
1207 raise errors.OpPrereqError("Instance {i}: cannot use '{a}' access"
1208 " setting with {h} hypervisor and {d} disk"
1209 " type.".format(i=inst.name,
1210 a=access,
1211 h=hv,
1212 d=inst_template))
1213
1214
1216 """Checks if an hypervisor can read a disk template with given mode.
1217
1218 @param hv: the hypervisor that will access the data
1219 @param disk_template: the disk template the data is stored as
1220 @param mode: how the hypervisor should access the data
1221 @return: True if the hypervisor can read a given read disk_template
1222 in the specified mode.
1223
1224 """
1225 if mode == constants.DISK_KERNELSPACE:
1226 return True
1227
1228 if (hv == constants.HT_KVM and
1229 disk_template in (constants.DT_RBD, constants.DT_GLUSTER) and
1230 mode == constants.DISK_USERSPACE):
1231 return True
1232
1233
1234 return False
1235
1236
1256
1257
1259 """Removes the node's certificate from the candidate certificates list.
1260
1261 @type node_uuid: string
1262 @param node_uuid: the node's UUID
1263 @type cluster: C{objects.Cluster}
1264 @param cluster: the cluster's configuration
1265
1266 """
1267 utils.RemoveNodeFromCandidateCerts(node_uuid, cluster.candidate_certs)
1268
1269
1295
1296
1298 """Ensure KVM daemon is running on nodes with KVM instances.
1299
1300 If user shutdown is enabled in the cluster:
1301 - The KVM daemon will be started on VM capable nodes containing
1302 KVM instances.
1303 - The KVM daemon will be stopped on non VM capable nodes.
1304
1305 If user shutdown is disabled in the cluster:
1306 - The KVM daemon will be stopped on all nodes
1307
1308 Issues a warning for each failed RPC call.
1309
1310 @type lu: L{LogicalUnit}
1311 @param lu: logical unit on whose behalf we execute
1312
1313 @type feedback_fn: callable
1314 @param feedback_fn: feedback function
1315
1316 @type nodes: list of string
1317 @param nodes: if supplied, it overrides the node uuids to start/stop;
1318 this is used mainly for optimization
1319
1320 """
1321 cluster = lu.cfg.GetClusterInfo()
1322
1323
1324 if nodes is not None:
1325 node_uuids = set(nodes)
1326 else:
1327 node_uuids = lu.cfg.GetNodeList()
1328
1329
1330 if constants.HT_KVM in cluster.enabled_hypervisors and \
1331 cluster.enabled_user_shutdown:
1332 start_nodes = []
1333 stop_nodes = []
1334
1335 for node_uuid in node_uuids:
1336 if lu.cfg.GetNodeInfo(node_uuid).vm_capable:
1337 start_nodes.append(node_uuid)
1338 else:
1339 stop_nodes.append(node_uuid)
1340 else:
1341 start_nodes = []
1342 stop_nodes = node_uuids
1343
1344
1345 if start_nodes:
1346 results = lu.rpc.call_node_ensure_daemon(start_nodes, constants.KVMD, True)
1347 for node_uuid in start_nodes:
1348 results[node_uuid].Warn("Failed to start KVM daemon in node '%s'" %
1349 node_uuid, feedback_fn)
1350
1351
1352 if stop_nodes:
1353 results = lu.rpc.call_node_ensure_daemon(stop_nodes, constants.KVMD, False)
1354 for node_uuid in stop_nodes:
1355 results[node_uuid].Warn("Failed to stop KVM daemon in node '%s'" %
1356 node_uuid, feedback_fn)
1357