1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Common functions used by multiple logical units."""
32
33 import copy
34 import math
35 import os
36 import urllib2
37
38 from ganeti import compat
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import hypervisor
42 from ganeti import locking
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import pathutils
46 import ganeti.rpc.node as rpc
47 from ganeti.serializer import Private
48 from ganeti import ssconf
49 from ganeti import utils
50
51
52
53 INSTANCE_DOWN = [constants.ADMINST_DOWN]
54 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
55 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
56
57
59 """Expand an item name.
60
61 @param expand_fn: the function to use for expansion
62 @param name: requested item name
63 @param kind: text description ('Node' or 'Instance')
64 @return: the result of the expand_fn, if successful
65 @raise errors.OpPrereqError: if the item is not found
66
67 """
68 (uuid, full_name) = expand_fn(name)
69 if uuid is None or full_name is None:
70 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
71 errors.ECODE_NOENT)
72 return (uuid, full_name)
73
74
76 """Wrapper over L{_ExpandItemName} for instance."""
77 (uuid, full_name) = _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
78 if expected_uuid is not None and uuid != expected_uuid:
79 raise errors.OpPrereqError(
80 "The instances UUID '%s' does not match the expected UUID '%s' for"
81 " instance '%s'. Maybe the instance changed since you submitted this"
82 " job." % (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
83 return (uuid, full_name)
84
85
87 """Expand a short node name into the node UUID and full name.
88
89 @type cfg: L{config.ConfigWriter}
90 @param cfg: The cluster configuration
91 @type expected_uuid: string
92 @param expected_uuid: expected UUID for the node (or None if there is no
93 expectation). If it does not match, a L{errors.OpPrereqError} is
94 raised.
95 @type name: string
96 @param name: the short node name
97
98 """
99 (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
100 if expected_uuid is not None and uuid != expected_uuid:
101 raise errors.OpPrereqError(
102 "The nodes UUID '%s' does not match the expected UUID '%s' for node"
103 " '%s'. Maybe the node changed since you submitted this job." %
104 (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
105 return (uuid, full_name)
106
107
109 """Returns a dict declaring all lock levels shared.
110
111 """
112 return dict.fromkeys(locking.LEVELS, 1)
113
114
116 """Checks if the instances in a node group are still correct.
117
118 @type cfg: L{config.ConfigWriter}
119 @param cfg: The cluster configuration
120 @type group_uuid: string
121 @param group_uuid: Node group UUID
122 @type owned_instance_names: set or frozenset
123 @param owned_instance_names: List of currently owned instances
124
125 """
126 wanted_instances = frozenset(cfg.GetInstanceNames(
127 cfg.GetNodeGroupInstances(group_uuid)))
128 if owned_instance_names != wanted_instances:
129 raise errors.OpPrereqError("Instances in node group '%s' changed since"
130 " locks were acquired, wanted '%s', have '%s';"
131 " retry the operation" %
132 (group_uuid,
133 utils.CommaJoin(wanted_instances),
134 utils.CommaJoin(owned_instance_names)),
135 errors.ECODE_STATE)
136
137 return wanted_instances
138
139
141 """Returns list of checked and expanded node names.
142
143 @type lu: L{LogicalUnit}
144 @param lu: the logical unit on whose behalf we execute
145 @type short_node_names: list
146 @param short_node_names: list of node names or None for all nodes
147 @rtype: tuple of lists
148 @return: tupe with (list of node UUIDs, list of node names)
149 @raise errors.ProgrammerError: if the nodes parameter is wrong type
150
151 """
152 if short_node_names:
153 node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
154 for name in short_node_names]
155 else:
156 node_uuids = lu.cfg.GetNodeList()
157
158 return (node_uuids, [lu.cfg.GetNodeName(uuid) for uuid in node_uuids])
159
160
162 """Returns list of checked and expanded instance names.
163
164 @type lu: L{LogicalUnit}
165 @param lu: the logical unit on whose behalf we execute
166 @type short_inst_names: list
167 @param short_inst_names: list of instance names or None for all instances
168 @rtype: tuple of lists
169 @return: tuple of (instance UUIDs, instance names)
170 @raise errors.OpPrereqError: if the instances parameter is wrong type
171 @raise errors.OpPrereqError: if any of the passed instances is not found
172
173 """
174 if short_inst_names:
175 inst_uuids = [ExpandInstanceUuidAndName(lu.cfg, None, name)[0]
176 for name in short_inst_names]
177 else:
178 inst_uuids = lu.cfg.GetInstanceList()
179 return (inst_uuids, [lu.cfg.GetInstanceName(uuid) for uuid in inst_uuids])
180
181
182 -def RunPostHook(lu, node_name):
183 """Runs the post-hook for an opcode on a single node.
184
185 """
186 hm = lu.proc.BuildHooksManager(lu)
187 try:
188 hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=[node_name])
189 except Exception, err:
190 lu.LogWarning("Errors occurred running hooks on %s: %s",
191 node_name, err)
192
193
195 """Distribute additional files which are part of the cluster configuration.
196
197 ConfigWriter takes care of distributing the config and ssconf files, but
198 there are more files which should be distributed to all nodes. This function
199 makes sure those are copied.
200
201 """
202
203 cluster = lu.cfg.GetClusterInfo()
204 master_info = lu.cfg.GetMasterNodeInfo()
205
206 online_node_uuids = lu.cfg.GetOnlineNodeList()
207 online_node_uuid_set = frozenset(online_node_uuids)
208 vm_node_uuids = list(online_node_uuid_set.intersection(
209 lu.cfg.GetVmCapableNodeList()))
210
211
212 for node_uuids in [online_node_uuids, vm_node_uuids]:
213 if master_info.uuid in node_uuids:
214 node_uuids.remove(master_info.uuid)
215
216
217 (files_all, _, files_mc, files_vm) = \
218 ComputeAncillaryFiles(cluster, True)
219
220
221 assert not (pathutils.CLUSTER_CONF_FILE in files_all or
222 pathutils.CLUSTER_CONF_FILE in files_vm)
223 assert not files_mc, "Master candidates not handled in this function"
224
225 filemap = [
226 (online_node_uuids, files_all),
227 (vm_node_uuids, files_vm),
228 ]
229
230
231 for (node_uuids, files) in filemap:
232 for fname in files:
233 UploadHelper(lu, node_uuids, fname)
234
235
237 """Compute files external to Ganeti which need to be consistent.
238
239 @type redist: boolean
240 @param redist: Whether to include files which need to be redistributed
241
242 """
243
244 files_all = set([
245 pathutils.SSH_KNOWN_HOSTS_FILE,
246 pathutils.CONFD_HMAC_KEY,
247 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
248 pathutils.SPICE_CERT_FILE,
249 pathutils.SPICE_CACERT_FILE,
250 pathutils.RAPI_USERS_FILE,
251 ])
252
253 if redist:
254
255 files_all.add(pathutils.RAPI_CERT_FILE)
256 else:
257 files_all.update(pathutils.ALL_CERT_FILES)
258 files_all.update(ssconf.SimpleStore().GetFileList())
259
260 if cluster.modify_etc_hosts:
261 files_all.add(pathutils.ETC_HOSTS)
262
263 if cluster.use_external_mip_script:
264 files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
265
266
267
268
269 files_opt = set([
270 pathutils.RAPI_USERS_FILE,
271 ])
272
273
274 files_mc = set()
275
276 if not redist:
277 files_mc.add(pathutils.CLUSTER_CONF_FILE)
278
279
280 if (not redist and (cluster.IsFileStorageEnabled() or
281 cluster.IsSharedFileStorageEnabled())):
282 files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
283 files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
284
285
286 files_vm = set(
287 filename
288 for hv_name in cluster.enabled_hypervisors
289 for filename in
290 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
291
292 files_opt |= set(
293 filename
294 for hv_name in cluster.enabled_hypervisors
295 for filename in
296 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
297
298
299 all_files_set = files_all | files_mc | files_vm
300 assert (len(all_files_set) ==
301 sum(map(len, [files_all, files_mc, files_vm]))), \
302 "Found file listed in more than one file list"
303
304
305 assert all_files_set.issuperset(files_opt), \
306 "Optional file not in a different required list"
307
308
309 assert not (redist and
310 pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
311
312 return (files_all, files_opt, files_mc, files_vm)
313
314
316 """Helper for uploading a file and showing warnings.
317
318 """
319 if os.path.exists(fname):
320 result = lu.rpc.call_upload_file(node_uuids, fname)
321 for to_node_uuids, to_result in result.items():
322 msg = to_result.fail_msg
323 if msg:
324 msg = ("Copy of file %s to node %s failed: %s" %
325 (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
326 lu.LogWarning(msg)
327
328
330 """Combines the hv state from an opcode with the one of the object
331
332 @param op_input: The input dict from the opcode
333 @param obj_input: The input dict from the objects
334 @return: The verified and updated dict
335
336 """
337 if op_input:
338 invalid_hvs = set(op_input) - constants.HYPER_TYPES
339 if invalid_hvs:
340 raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
341 " %s" % utils.CommaJoin(invalid_hvs),
342 errors.ECODE_INVAL)
343 if obj_input is None:
344 obj_input = {}
345 type_check = constants.HVSTS_PARAMETER_TYPES
346 return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
347
348 return None
349
350
352 """Combines the disk state from an opcode with the one of the object
353
354 @param op_input: The input dict from the opcode
355 @param obj_input: The input dict from the objects
356 @return: The verified and updated dict
357 """
358 if op_input:
359 invalid_dst = set(op_input) - constants.DS_VALID_TYPES
360 if invalid_dst:
361 raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
362 utils.CommaJoin(invalid_dst),
363 errors.ECODE_INVAL)
364 type_check = constants.DSS_PARAMETER_TYPES
365 if obj_input is None:
366 obj_input = {}
367 return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
368 type_check))
369 for key, value in op_input.items())
370
371 return None
372
373
374 -def CheckOSParams(lu, required, node_uuids, osname, osparams, force_variant):
375 """OS parameters validation.
376
377 @type lu: L{LogicalUnit}
378 @param lu: the logical unit for which we check
379 @type required: boolean
380 @param required: whether the validation should fail if the OS is not
381 found
382 @type node_uuids: list
383 @param node_uuids: the list of nodes on which we should check
384 @type osname: string
385 @param osname: the name of the OS we should use
386 @type osparams: dict
387 @param osparams: the parameters which we need to check
388 @raise errors.OpPrereqError: if the parameters are not valid
389
390 """
391 node_uuids = _FilterVmNodes(lu, node_uuids)
392
393
394 for key in osparams:
395 if isinstance(osparams[key], Private):
396 osparams[key] = osparams[key].Get()
397
398 if osname:
399 result = lu.rpc.call_os_validate(node_uuids, required, osname,
400 [constants.OS_VALIDATE_PARAMETERS],
401 osparams, force_variant)
402 for node_uuid, nres in result.items():
403
404
405 nres.Raise("OS Parameters validation failed on node %s" %
406 lu.cfg.GetNodeName(node_uuid))
407 if not nres.payload:
408 lu.LogInfo("OS %s not found on node %s, validation skipped",
409 osname, lu.cfg.GetNodeName(node_uuid))
410
411
413 """Checks if a given image description is either a valid file path or a URL.
414
415 @type image: string
416 @param image: An absolute path or URL, the assumed location of a disk image.
417 @type error_message: string
418 @param error_message: The error message to show if the image is not valid.
419
420 @raise errors.OpPrereqError: If the validation fails.
421
422 """
423 if image is not None and not (utils.IsUrl(image) or os.path.isabs(image)):
424 raise errors.OpPrereqError(error_message)
425
426
428 """Checks if the OS image in the OS parameters of an opcode is
429 valid.
430
431 This function can also be used in LUs as they carry an opcode.
432
433 @type op: L{opcodes.OpCode}
434 @param op: opcode containing the OS params
435
436 @rtype: string or NoneType
437 @return:
438 None if the OS parameters in the opcode do not contain the OS
439 image, otherwise the OS image value contained in the OS parameters
440 @raise errors.OpPrereqError: if OS image is not a URL or an absolute path
441
442 """
443 os_image = objects.GetOSImage(op.osparams)
444 CheckImageValidity(os_image, "OS image must be an absolute path or a URL")
445 return os_image
446
447
449 """Hypervisor parameter validation.
450
451 This function abstracts the hypervisor parameter validation to be
452 used in both instance create and instance modify.
453
454 @type lu: L{LogicalUnit}
455 @param lu: the logical unit for which we check
456 @type node_uuids: list
457 @param node_uuids: the list of nodes on which we should check
458 @type hvname: string
459 @param hvname: the name of the hypervisor we should use
460 @type hvparams: dict
461 @param hvparams: the parameters which we need to check
462 @raise errors.OpPrereqError: if the parameters are not valid
463
464 """
465 node_uuids = _FilterVmNodes(lu, node_uuids)
466
467 cluster = lu.cfg.GetClusterInfo()
468 hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
469
470 hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
471 for node_uuid in node_uuids:
472 info = hvinfo[node_uuid]
473 if info.offline:
474 continue
475 info.Raise("Hypervisor parameter validation failed on node %s" %
476 lu.cfg.GetNodeName(node_uuid))
477
478
494
495
497 """Check node PVs.
498
499 """
500 pvlist_dict = nresult.get(constants.NV_PVLIST, None)
501 if pvlist_dict is None:
502 return (["Can't get PV list from node"], None)
503 pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
504 errlist = []
505
506
507
508 for pv in pvlist:
509 if ":" in pv.name:
510 errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
511 (pv.name, pv.vg_name))
512 es_pvinfo = None
513 if exclusive_storage:
514 (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
515 errlist.extend(errmsgs)
516 shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
517 if shared_pvs:
518 for (pvname, lvlist) in shared_pvs:
519
520 errlist.append("PV %s is shared among unrelated LVs (%s)" %
521 (pvname, utils.CommaJoin(lvlist)))
522 return (errlist, es_pvinfo)
523
524
526 """Computes if value is in the desired range.
527
528 @param name: name of the parameter for which we perform the check
529 @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
530 not just 'disk')
531 @param ispecs: dictionary containing min and max values
532 @param value: actual value that we want to use
533 @return: None or an error string
534
535 """
536 if value in [None, constants.VALUE_AUTO]:
537 return None
538 max_v = ispecs[constants.ISPECS_MAX].get(name, value)
539 min_v = ispecs[constants.ISPECS_MIN].get(name, value)
540 if value > max_v or min_v > value:
541 if qualifier:
542 fqn = "%s/%s" % (name, qualifier)
543 else:
544 fqn = name
545 return ("%s value %s is not in range [%s, %s]" %
546 (fqn, value, min_v, max_v))
547 return None
548
549
553 """Verifies ipolicy against provided specs.
554
555 @type ipolicy: dict
556 @param ipolicy: The ipolicy
557 @type mem_size: int
558 @param mem_size: The memory size
559 @type cpu_count: int
560 @param cpu_count: Used cpu cores
561 @type disk_count: int
562 @param disk_count: Number of disks used
563 @type nic_count: int
564 @param nic_count: Number of nics used
565 @type disk_sizes: list of ints
566 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
567 @type spindle_use: int
568 @param spindle_use: The number of spindles this instance uses
569 @type disk_types: list of strings
570 @param disk_types: The disk template of the instance
571 @param _compute_fn: The compute function (unittest only)
572 @return: A list of violations, or an empty list of no violations are found
573
574 """
575 assert disk_count == len(disk_sizes)
576 assert isinstance(disk_types, list)
577 assert disk_count == len(disk_types)
578
579 test_settings = [
580 (constants.ISPEC_MEM_SIZE, "", mem_size),
581 (constants.ISPEC_CPU_COUNT, "", cpu_count),
582 (constants.ISPEC_NIC_COUNT, "", nic_count),
583 (constants.ISPEC_SPINDLE_USE, "", spindle_use),
584 ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
585 for idx, d in enumerate(disk_sizes)]
586
587 allowed_dts = set(ipolicy[constants.IPOLICY_DTS])
588 ret = []
589 if disk_count != 0:
590
591 test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
592 elif constants.DT_DISKLESS not in allowed_dts:
593 ret.append("Disk template %s is not allowed (allowed templates %s)" %
594 (constants.DT_DISKLESS, utils.CommaJoin(allowed_dts)))
595
596 forbidden_dts = set(disk_types) - allowed_dts
597 if forbidden_dts:
598 ret.append("Disk template %s is not allowed (allowed templates: %s)" %
599 (utils.CommaJoin(forbidden_dts), utils.CommaJoin(allowed_dts)))
600
601 min_errs = None
602 for minmax in ipolicy[constants.ISPECS_MINMAX]:
603 errs = filter(None,
604 (_compute_fn(name, qualifier, minmax, value)
605 for (name, qualifier, value) in test_settings))
606 if min_errs is None or len(errs) < len(min_errs):
607 min_errs = errs
608 assert min_errs is not None
609 return ret + min_errs
610
611
614 """Verifies ipolicy against provided disk sizes.
615
616 No other specs except the disk sizes, the number of disks and the disk
617 template are checked.
618
619 @type ipolicy: dict
620 @param ipolicy: The ipolicy
621 @type disk_sizes: list of ints
622 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
623 @type disks: list of L{Disk}
624 @param disks: The Disk objects of the instance
625 @param _compute_fn: The compute function (unittest only)
626 @return: A list of violations, or an empty list of no violations are found
627
628 """
629 if len(disk_sizes) != len(disks):
630 return [constants.ISPEC_DISK_COUNT]
631 dev_types = [d.dev_type for d in disks]
632 return ComputeIPolicySpecViolation(ipolicy,
633
634 None, None, len(disk_sizes),
635 None, disk_sizes,
636 None,
637 dev_types,
638 _compute_fn=_compute_fn)
639
640
643 """Compute if instance meets the specs of ipolicy.
644
645 @type ipolicy: dict
646 @param ipolicy: The ipolicy to verify against
647 @type instance: L{objects.Instance}
648 @param instance: The instance to verify
649 @type cfg: L{config.ConfigWriter}
650 @param cfg: Cluster configuration
651 @param _compute_fn: The function to verify ipolicy (unittest only)
652 @see: L{ComputeIPolicySpecViolation}
653
654 """
655 ret = []
656 be_full = cfg.GetClusterInfo().FillBE(instance)
657 mem_size = be_full[constants.BE_MAXMEM]
658 cpu_count = be_full[constants.BE_VCPUS]
659 inst_nodes = cfg.GetInstanceNodes(instance.uuid)
660 es_flags = rpc.GetExclusiveStorageForNodes(cfg, inst_nodes)
661 disks = cfg.GetInstanceDisks(instance.uuid)
662 if any(es_flags.values()):
663
664 try:
665 spindle_use = sum([disk.spindles for disk in disks])
666 except TypeError:
667 ret.append("Number of spindles not configured for disks of instance %s"
668 " while exclusive storage is enabled, try running gnt-cluster"
669 " repair-disk-sizes" % instance.name)
670
671 spindle_use = None
672 else:
673 spindle_use = be_full[constants.BE_SPINDLE_USE]
674 disk_count = len(disks)
675 disk_sizes = [disk.size for disk in disks]
676 nic_count = len(instance.nics)
677 disk_types = [d.dev_type for d in disks]
678
679 return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
680 disk_sizes, spindle_use, disk_types)
681
682
684 """Computes a set of instances who violates given ipolicy.
685
686 @param ipolicy: The ipolicy to verify
687 @type instances: L{objects.Instance}
688 @param instances: List of instances to verify
689 @type cfg: L{config.ConfigWriter}
690 @param cfg: Cluster configuration
691 @return: A frozenset of instance names violating the ipolicy
692
693 """
694 return frozenset([inst.name for inst in instances
695 if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
696
697
699 """Computes a set of any instances that would violate the new ipolicy.
700
701 @param old_ipolicy: The current (still in-place) ipolicy
702 @param new_ipolicy: The new (to become) ipolicy
703 @param instances: List of instances to verify
704 @type cfg: L{config.ConfigWriter}
705 @param cfg: Cluster configuration
706 @return: A list of instances which violates the new ipolicy but
707 did not before
708
709 """
710 return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
711 _ComputeViolatingInstances(old_ipolicy, instances, cfg))
712
713
714 -def GetUpdatedParams(old_params, update_dict,
715 use_default=True, use_none=False):
716 """Return the new version of a parameter dictionary.
717
718 @type old_params: dict
719 @param old_params: old parameters
720 @type update_dict: dict
721 @param update_dict: dict containing new parameter values, or
722 constants.VALUE_DEFAULT to reset the parameter to its default
723 value
724 @param use_default: boolean
725 @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
726 values as 'to be deleted' values
727 @param use_none: boolean
728 @type use_none: whether to recognise C{None} values as 'to be
729 deleted' values
730 @rtype: dict
731 @return: the new parameter dictionary
732
733 """
734 params_copy = copy.deepcopy(old_params)
735 for key, val in update_dict.iteritems():
736 if ((use_default and val == constants.VALUE_DEFAULT) or
737 (use_none and val is None)):
738 try:
739 del params_copy[key]
740 except KeyError:
741 pass
742 else:
743 params_copy[key] = val
744 return params_copy
745
746
748 """Return the new version of an instance policy.
749
750 @param group_policy: whether this policy applies to a group and thus
751 we should support removal of policy entries
752
753 """
754 ipolicy = copy.deepcopy(old_ipolicy)
755 for key, value in new_ipolicy.items():
756 if key not in constants.IPOLICY_ALL_KEYS:
757 raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
758 errors.ECODE_INVAL)
759 if (not value or value == [constants.VALUE_DEFAULT] or
760 value == constants.VALUE_DEFAULT):
761 if group_policy:
762 if key in ipolicy:
763 del ipolicy[key]
764 else:
765 raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
766 " on the cluster'" % key,
767 errors.ECODE_INVAL)
768 else:
769 if key in constants.IPOLICY_PARAMETERS:
770
771 try:
772 ipolicy[key] = float(value)
773 except (TypeError, ValueError), err:
774 raise errors.OpPrereqError("Invalid value for attribute"
775 " '%s': '%s', error: %s" %
776 (key, value, err), errors.ECODE_INVAL)
777 elif key == constants.ISPECS_MINMAX:
778 for minmax in value:
779 for k in minmax.keys():
780 utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
781 ipolicy[key] = value
782 elif key == constants.ISPECS_STD:
783 if group_policy:
784 msg = "%s cannot appear in group instance specs" % key
785 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
786 ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
787 use_none=False, use_default=False)
788 utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
789 else:
790
791
792 ipolicy[key] = list(value)
793 try:
794 objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
795 except errors.ConfigurationError, err:
796 raise errors.OpPrereqError("Invalid instance policy: %s" % err,
797 errors.ECODE_INVAL)
798 return ipolicy
799
800
802 """Little helper wrapper to the rpc annotation method.
803
804 @param instance: The instance object
805 @type devs: List of L{objects.Disk}
806 @param devs: The root devices (not any of its children!)
807 @param cfg: The config object
808 @returns The annotated disk copies
809 @see L{ganeti.rpc.node.AnnotateDiskParams}
810
811 """
812 return rpc.AnnotateDiskParams(devs, cfg.GetInstanceDiskParams(instance))
813
814
816 """Tells if node supports OOB.
817
818 @type cfg: L{config.ConfigWriter}
819 @param cfg: The cluster configuration
820 @type node: L{objects.Node}
821 @param node: The node
822 @return: The OOB script if supported or an empty string otherwise
823
824 """
825 return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
826
827
829 """Updates and verifies a dict with sub dicts of the same type.
830
831 @param base: The dict with the old data
832 @param updates: The dict with the new data
833 @param type_check: Dict suitable to ForceDictType to verify correct types
834 @returns: A new dict with updated and verified values
835
836 """
837 def fn(old, value):
838 new = GetUpdatedParams(old, value)
839 utils.ForceDictType(new, type_check)
840 return new
841
842 ret = copy.deepcopy(base)
843 ret.update(dict((key, fn(base.get(key, {}), value))
844 for key, value in updates.items()))
845 return ret
846
847
849 """Filters out non-vm_capable nodes from a list.
850
851 @type lu: L{LogicalUnit}
852 @param lu: the logical unit for which we check
853 @type node_uuids: list
854 @param node_uuids: the list of nodes on which we should check
855 @rtype: list
856 @return: the list of vm-capable nodes
857
858 """
859 vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
860 return [uuid for uuid in node_uuids if uuid not in vm_nodes]
861
862
864 """Decides on which iallocator to use.
865
866 @type cfg: L{config.ConfigWriter}
867 @param cfg: Cluster configuration object
868 @type ialloc: string or None
869 @param ialloc: Iallocator specified in opcode
870 @rtype: string
871 @return: Iallocator name
872
873 """
874 if not ialloc:
875
876 ialloc = cfg.GetDefaultIAllocator()
877
878 if not ialloc:
879 raise errors.OpPrereqError("No iallocator was specified, neither in the"
880 " opcode nor as a cluster-wide default",
881 errors.ECODE_INVAL)
882
883 return ialloc
884
885
888 """Checks if node groups for locked instances are still correct.
889
890 @type cfg: L{config.ConfigWriter}
891 @param cfg: Cluster configuration
892 @type instances: dict; string as key, L{objects.Instance} as value
893 @param instances: Dictionary, instance UUID as key, instance object as value
894 @type owned_groups: iterable of string
895 @param owned_groups: List of owned groups
896 @type owned_node_uuids: iterable of string
897 @param owned_node_uuids: List of owned nodes
898 @type cur_group_uuid: string or None
899 @param cur_group_uuid: Optional group UUID to check against instance's groups
900
901 """
902 for (uuid, inst) in instances.items():
903 inst_nodes = cfg.GetInstanceNodes(inst.uuid)
904 assert owned_node_uuids.issuperset(inst_nodes), \
905 "Instance %s's nodes changed while we kept the lock" % inst.name
906
907 inst_groups = CheckInstanceNodeGroups(cfg, uuid, owned_groups)
908
909 assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
910 "Instance %s has no node in group %s" % (inst.name, cur_group_uuid)
911
912
914 """Checks if the owned node groups are still correct for an instance.
915
916 @type cfg: L{config.ConfigWriter}
917 @param cfg: The cluster configuration
918 @type inst_uuid: string
919 @param inst_uuid: Instance UUID
920 @type owned_groups: set or frozenset
921 @param owned_groups: List of currently owned node groups
922 @type primary_only: boolean
923 @param primary_only: Whether to check node groups for only the primary node
924
925 """
926 inst_groups = cfg.GetInstanceNodeGroups(inst_uuid, primary_only)
927
928 if not owned_groups.issuperset(inst_groups):
929 raise errors.OpPrereqError("Instance %s's node groups changed since"
930 " locks were acquired, current groups are"
931 " are '%s', owning groups '%s'; retry the"
932 " operation" %
933 (cfg.GetInstanceName(inst_uuid),
934 utils.CommaJoin(inst_groups),
935 utils.CommaJoin(owned_groups)),
936 errors.ECODE_STATE)
937
938 return inst_groups
939
940
942 """Unpacks the result of change-group and node-evacuate iallocator requests.
943
944 Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
945 L{constants.IALLOCATOR_MODE_CHG_GROUP}.
946
947 @type lu: L{LogicalUnit}
948 @param lu: Logical unit instance
949 @type alloc_result: tuple/list
950 @param alloc_result: Result from iallocator
951 @type early_release: bool
952 @param early_release: Whether to release locks early if possible
953 @type use_nodes: bool
954 @param use_nodes: Whether to display node names instead of groups
955
956 """
957 (moved, failed, jobs) = alloc_result
958
959 if failed:
960 failreason = utils.CommaJoin("%s (%s)" % (name, reason)
961 for (name, reason) in failed)
962 lu.LogWarning("Unable to evacuate instances %s", failreason)
963 raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
964
965 if moved:
966 lu.LogInfo("Instances to be moved: %s",
967 utils.CommaJoin(
968 "%s (to %s)" %
969 (name, _NodeEvacDest(use_nodes, group, node_names))
970 for (name, group, node_names) in moved))
971
972 return [map(compat.partial(_SetOpEarlyRelease, early_release),
973 map(opcodes.OpCode.LoadOpCode, ops))
974 for ops in jobs]
975
976
978 """Returns group or nodes depending on caller's choice.
979
980 """
981 if use_nodes:
982 return utils.CommaJoin(node_names)
983 else:
984 return group
985
986
988 """Sets C{early_release} flag on opcodes if available.
989
990 """
991 try:
992 op.early_release = early_release
993 except AttributeError:
994 assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
995
996 return op
997
998
1000 """Creates a map from (node, volume) to instance name.
1001
1002 @type cfg: L{config.ConfigWriter}
1003 @param cfg: The cluster configuration
1004 @type instances: list of L{objects.Instance}
1005 @rtype: dict; tuple of (node uuid, volume name) as key, L{objects.Instance}
1006 object as value
1007
1008 """
1009 return dict(
1010 ((node_uuid, vol), inst)
1011 for inst in instances
1012 for (node_uuid, vols) in cfg.GetInstanceLVsByNode(inst.uuid).items()
1013 for vol in vols)
1014
1015
1017 """Make sure that none of the given paramters is global.
1018
1019 If a global parameter is found, an L{errors.OpPrereqError} exception is
1020 raised. This is used to avoid setting global parameters for individual nodes.
1021
1022 @type params: dictionary
1023 @param params: Parameters to check
1024 @type glob_pars: dictionary
1025 @param glob_pars: Forbidden parameters
1026 @type kind: string
1027 @param kind: Kind of parameters (e.g. "node")
1028 @type bad_levels: string
1029 @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
1030 "instance")
1031 @type good_levels: strings
1032 @param good_levels: Level(s) at which the parameters are allowed (e.g.
1033 "cluster or group")
1034
1035 """
1036 used_globals = glob_pars.intersection(params)
1037 if used_globals:
1038 msg = ("The following %s parameters are global and cannot"
1039 " be customized at %s level, please modify them at"
1040 " %s level: %s" %
1041 (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
1042 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1043
1044
1046 """Whether exclusive_storage is in effect for the given node.
1047
1048 @type cfg: L{config.ConfigWriter}
1049 @param cfg: The cluster configuration
1050 @type node: L{objects.Node}
1051 @param node: The node
1052 @rtype: bool
1053 @return: The effective value of exclusive_storage
1054
1055 """
1056 return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
1057
1058
1060 """Given an instance object, checks if the instance is running.
1061
1062 This function asks the backend whether the instance is running and
1063 user shutdown instances are considered not to be running.
1064
1065 @type lu: L{LogicalUnit}
1066 @param lu: LU on behalf of which we make the check
1067
1068 @type instance: L{objects.Instance}
1069 @param instance: instance to check whether it is running
1070
1071 @rtype: bool
1072 @return: 'True' if the instance is running, 'False' otherwise
1073
1074 """
1075 hvparams = lu.cfg.GetClusterInfo().FillHV(instance)
1076 result = lu.rpc.call_instance_info(instance.primary_node, instance.name,
1077 instance.hypervisor, hvparams)
1078
1079
1080 result.Raise("Can't retrieve instance information for instance '%s'" %
1081 instance.name, prereq=prereq, ecode=errors.ECODE_ENVIRON)
1082
1083 return result.payload and \
1084 "state" in result.payload and \
1085 (result.payload["state"] != hypervisor.hv_base.HvInstanceState.SHUTDOWN)
1086
1087
1089 """Ensure that an instance is in one of the required states.
1090
1091 @param lu: the LU on behalf of which we make the check
1092 @param instance: the instance to check
1093 @param msg: if passed, should be a message to replace the default one
1094 @raise errors.OpPrereqError: if the instance is not in the required state
1095
1096 """
1097 if msg is None:
1098 msg = ("can't use instance from outside %s states" %
1099 utils.CommaJoin(req_states))
1100 if instance.admin_state not in req_states:
1101 raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
1102 (instance.name, instance.admin_state, msg),
1103 errors.ECODE_STATE)
1104
1105 if constants.ADMINST_UP not in req_states:
1106 pnode_uuid = instance.primary_node
1107
1108 if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
1109 if IsInstanceRunning(lu, instance):
1110 raise errors.OpPrereqError("Instance %s is running, %s" %
1111 (instance.name, msg), errors.ECODE_STATE)
1112 else:
1113 lu.LogWarning("Primary node offline, ignoring check that instance"
1114 " is down")
1115
1116
1118 """Check the sanity of iallocator and node arguments and use the
1119 cluster-wide iallocator if appropriate.
1120
1121 Check that at most one of (iallocator, node) is specified. If none is
1122 specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
1123 then the LU's opcode's iallocator slot is filled with the cluster-wide
1124 default iallocator.
1125
1126 @type iallocator_slot: string
1127 @param iallocator_slot: the name of the opcode iallocator slot
1128 @type node_slot: string
1129 @param node_slot: the name of the opcode target node slot
1130
1131 """
1132 node = getattr(lu.op, node_slot, None)
1133 ialloc = getattr(lu.op, iallocator_slot, None)
1134 if node == []:
1135 node = None
1136
1137 if node is not None and ialloc is not None:
1138 raise errors.OpPrereqError("Do not specify both, iallocator and node",
1139 errors.ECODE_INVAL)
1140 elif ((node is None and ialloc is None) or
1141 ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1142 default_iallocator = lu.cfg.GetDefaultIAllocator()
1143 if default_iallocator:
1144 setattr(lu.op, iallocator_slot, default_iallocator)
1145 else:
1146 raise errors.OpPrereqError("No iallocator or node given and no"
1147 " cluster-wide default iallocator found;"
1148 " please specify either an iallocator or a"
1149 " node, or set a cluster-wide default"
1150 " iallocator", errors.ECODE_INVAL)
1151
1152
1168
1169
1171 """Ensure that a given node is online.
1172
1173 @param lu: the LU on behalf of which we make the check
1174 @param node_uuid: the node to check
1175 @param msg: if passed, should be a message to replace the default one
1176 @raise errors.OpPrereqError: if the node is offline
1177
1178 """
1179 if msg is None:
1180 msg = "Can't use offline node"
1181 if lu.cfg.GetNodeInfo(node_uuid).offline:
1182 raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),
1183 errors.ECODE_STATE)
1184
1185
1187 """Helper function to check if a disk template is enabled.
1188
1189 @type cluster: C{objects.Cluster}
1190 @param cluster: the cluster's configuration
1191 @type disk_template: str
1192 @param disk_template: the disk template to be checked
1193
1194 """
1195 assert disk_template is not None
1196 if disk_template not in constants.DISK_TEMPLATES:
1197 raise errors.OpPrereqError("'%s' is not a valid disk template."
1198 " Valid disk templates are: %s" %
1199 (disk_template,
1200 ",".join(constants.DISK_TEMPLATES)))
1201 if not disk_template in cluster.enabled_disk_templates:
1202 raise errors.OpPrereqError("Disk template '%s' is not enabled in cluster."
1203 " Enabled disk templates are: %s" %
1204 (disk_template,
1205 ",".join(cluster.enabled_disk_templates)))
1206
1207
1209 """Helper function to check if a storage type is enabled.
1210
1211 @type cluster: C{objects.Cluster}
1212 @param cluster: the cluster's configuration
1213 @type storage_type: str
1214 @param storage_type: the storage type to be checked
1215
1216 """
1217 assert storage_type is not None
1218 assert storage_type in constants.STORAGE_TYPES
1219
1220
1221 if storage_type == constants.ST_LVM_PV:
1222 CheckStorageTypeEnabled(cluster, constants.ST_LVM_VG)
1223 else:
1224 possible_disk_templates = \
1225 utils.storage.GetDiskTemplatesOfStorageTypes(storage_type)
1226 for disk_template in possible_disk_templates:
1227 if disk_template in cluster.enabled_disk_templates:
1228 return
1229 raise errors.OpPrereqError("No disk template of storage type '%s' is"
1230 " enabled in this cluster. Enabled disk"
1231 " templates are: %s" % (storage_type,
1232 ",".join(cluster.enabled_disk_templates)))
1233
1234
1236 """Checks ipolicy disk templates against enabled disk tempaltes.
1237
1238 @type ipolicy: dict
1239 @param ipolicy: the new ipolicy
1240 @type enabled_disk_templates: list of string
1241 @param enabled_disk_templates: list of enabled disk templates on the
1242 cluster
1243 @raises errors.OpPrereqError: if there is at least one allowed disk
1244 template that is not also enabled.
1245
1246 """
1247 assert constants.IPOLICY_DTS in ipolicy
1248 allowed_disk_templates = ipolicy[constants.IPOLICY_DTS]
1249 not_enabled = set(allowed_disk_templates) - set(enabled_disk_templates)
1250 if not_enabled:
1251 raise errors.OpPrereqError("The following disk templates are allowed"
1252 " by the ipolicy, but not enabled on the"
1253 " cluster: %s" % utils.CommaJoin(not_enabled),
1254 errors.ECODE_INVAL)
1255
1256
1274
1275
1277 """Checks if the access param is consistent with the cluster configuration.
1278
1279 @note: requires a configuration lock to run.
1280 @param parameters: the parameters to validate
1281 @param cfg: the cfg object of the cluster
1282 @param group: if set, only check for consistency within this group.
1283 @raise errors.OpPrereqError: if the LU attempts to change the access parameter
1284 to an invalid value, such as "pink bunny".
1285 @raise errors.OpPrereqError: if the LU attempts to change the access parameter
1286 to an inconsistent value, such as asking for RBD
1287 userspace access to the chroot hypervisor.
1288
1289 """
1290 CheckDiskAccessModeValidity(parameters)
1291
1292 for disk_template in parameters:
1293 access = parameters[disk_template].get(constants.LDP_ACCESS,
1294 constants.DISK_KERNELSPACE)
1295
1296 if disk_template not in constants.DTS_HAVE_ACCESS:
1297 continue
1298
1299
1300
1301 inst_uuids = cfg.GetNodeGroupInstances(group) if group else \
1302 cfg.GetInstanceList()
1303
1304 for entry in inst_uuids:
1305 inst = cfg.GetInstanceInfo(entry)
1306 disks = cfg.GetInstanceDisks(entry)
1307 for disk in disks:
1308
1309 if disk.dev_type != disk_template:
1310 continue
1311
1312 hv = inst.hypervisor
1313
1314 if not IsValidDiskAccessModeCombination(hv, disk.dev_type, access):
1315 raise errors.OpPrereqError("Instance {i}: cannot use '{a}' access"
1316 " setting with {h} hypervisor and {d} disk"
1317 " type.".format(i=inst.name,
1318 a=access,
1319 h=hv,
1320 d=disk.dev_type))
1321
1322
1324 """Checks if an hypervisor can read a disk template with given mode.
1325
1326 @param hv: the hypervisor that will access the data
1327 @param disk_template: the disk template the data is stored as
1328 @param mode: how the hypervisor should access the data
1329 @return: True if the hypervisor can read a given read disk_template
1330 in the specified mode.
1331
1332 """
1333 if mode == constants.DISK_KERNELSPACE:
1334 return True
1335
1336 if (hv == constants.HT_KVM and
1337 disk_template in constants.DTS_HAVE_ACCESS and
1338 mode == constants.DISK_USERSPACE):
1339 return True
1340
1341
1342 return False
1343
1344
1366
1367
1369 """Removes the node's certificate from the candidate certificates list.
1370
1371 @type cfg: C{config.ConfigWriter}
1372 @param cfg: the cluster's configuration
1373 @type node_uuid: string
1374 @param node_uuid: the node's UUID
1375
1376 """
1377 cfg.RemoveNodeFromCandidateCerts(node_uuid)
1378
1379
1404
1405
1429
1430
1432 """Create an OpCode that connects a group to the instance
1433 communication network.
1434
1435 This OpCode contains the configuration necessary for the instance
1436 communication network.
1437
1438 @type group_uuid: string
1439 @param group_uuid: UUID of the group to connect
1440
1441 @type network: string
1442 @param network: name or UUID of the network to connect to, i.e., the
1443 instance communication network
1444
1445 @rtype: L{ganeti.opcodes.OpCode}
1446 @return: OpCode that connects the group to the instance
1447 communication network
1448
1449 """
1450 return opcodes.OpNetworkConnect(
1451 group_name=group_uuid,
1452 network_name=network,
1453 network_mode=constants.INSTANCE_COMMUNICATION_NETWORK_MODE,
1454 network_link=constants.INSTANCE_COMMUNICATION_NETWORK_LINK,
1455 conflicts_check=True)
1456
1457
1459 """Determines the size of the specified image.
1460
1461 @type image: string
1462 @param image: absolute filepath or URL of the image
1463
1464 @type node_uuid: string
1465 @param node_uuid: if L{image} is a filepath, this is the UUID of the
1466 node where the image is located
1467
1468 @rtype: int
1469 @return: size of the image in MB, rounded up
1470 @raise OpExecError: if the image does not exist
1471
1472 """
1473
1474 class _HeadRequest(urllib2.Request):
1475 def get_method(self):
1476 return "HEAD"
1477
1478 if utils.IsUrl(image):
1479 try:
1480 response = urllib2.urlopen(_HeadRequest(image))
1481 except urllib2.URLError:
1482 raise errors.OpExecError("Could not retrieve image from given url '%s'" %
1483 image)
1484
1485 content_length_str = response.info().getheader('content-length')
1486
1487 if not content_length_str:
1488 raise errors.OpExecError("Could not determine image size from given url"
1489 " '%s'" % image)
1490
1491 byte_size = int(content_length_str)
1492 else:
1493
1494 result = lu.rpc.call_get_file_info(node_uuid, image)
1495 result.Raise("Could not determine size of file '%s'" % image)
1496
1497 success, attributes = result.payload
1498 if not success:
1499 raise errors.OpExecError("Could not open file '%s'" % image)
1500 byte_size = attributes[constants.STAT_SIZE]
1501
1502
1503 return math.ceil(byte_size / 1024. / 1024.)
1504
1505
1507 """Ensure KVM daemon is running on nodes with KVM instances.
1508
1509 If user shutdown is enabled in the cluster:
1510 - The KVM daemon will be started on VM capable nodes containing
1511 KVM instances.
1512 - The KVM daemon will be stopped on non VM capable nodes.
1513
1514 If user shutdown is disabled in the cluster:
1515 - The KVM daemon will be stopped on all nodes
1516
1517 Issues a warning for each failed RPC call.
1518
1519 @type lu: L{LogicalUnit}
1520 @param lu: logical unit on whose behalf we execute
1521
1522 @type feedback_fn: callable
1523 @param feedback_fn: feedback function
1524
1525 @type nodes: list of string
1526 @param nodes: if supplied, it overrides the node uuids to start/stop;
1527 this is used mainly for optimization
1528
1529 """
1530 cluster = lu.cfg.GetClusterInfo()
1531
1532
1533 if nodes is not None:
1534 node_uuids = set(nodes)
1535 else:
1536 node_uuids = lu.cfg.GetNodeList()
1537
1538
1539 if constants.HT_KVM in cluster.enabled_hypervisors and \
1540 cluster.enabled_user_shutdown:
1541 start_nodes = []
1542 stop_nodes = []
1543
1544 for node_uuid in node_uuids:
1545 if lu.cfg.GetNodeInfo(node_uuid).vm_capable:
1546 start_nodes.append(node_uuid)
1547 else:
1548 stop_nodes.append(node_uuid)
1549 else:
1550 start_nodes = []
1551 stop_nodes = node_uuids
1552
1553
1554 if start_nodes:
1555 results = lu.rpc.call_node_ensure_daemon(start_nodes, constants.KVMD, True)
1556 for node_uuid in start_nodes:
1557 results[node_uuid].Warn("Failed to start KVM daemon in node '%s'" %
1558 node_uuid, feedback_fn)
1559
1560
1561 if stop_nodes:
1562 results = lu.rpc.call_node_ensure_daemon(stop_nodes, constants.KVMD, False)
1563 for node_uuid in stop_nodes:
1564 results[node_uuid].Warn("Failed to stop KVM daemon in node '%s'" %
1565 node_uuid, feedback_fn)
1566
1567
1569 node_errors = result[master_uuid].payload
1570 if node_errors:
1571 feedback_fn("Some nodes' SSH key files could not be updated:")
1572 for node_name, error_msg in node_errors:
1573 feedback_fn("%s: %s" % (node_name, error_msg))
1574