Package ganeti :: Package cmdlib :: Module common
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.common

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Common functions used by multiple logical units.""" 
  23   
  24  import copy 
  25  import os 
  26   
  27  from ganeti import compat 
  28  from ganeti import constants 
  29  from ganeti import errors 
  30  from ganeti import hypervisor 
  31  from ganeti import locking 
  32  from ganeti import objects 
  33  from ganeti import opcodes 
  34  from ganeti import pathutils 
  35  from ganeti import rpc 
  36  from ganeti import ssconf 
  37  from ganeti import utils 
  38   
  39   
  40  # States of instance 
  41  INSTANCE_DOWN = [constants.ADMINST_DOWN] 
  42  INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP] 
  43  INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE] 
  44   
  45  #: Instance status in which an instance can be marked as offline/online 
  46  CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([ 
  47    constants.ADMINST_OFFLINE, 
  48    ])) 
  49   
  50   
51 -def _ExpandItemName(expand_fn, name, kind):
52 """Expand an item name. 53 54 @param expand_fn: the function to use for expansion 55 @param name: requested item name 56 @param kind: text description ('Node' or 'Instance') 57 @return: the result of the expand_fn, if successful 58 @raise errors.OpPrereqError: if the item is not found 59 60 """ 61 (uuid, full_name) = expand_fn(name) 62 if uuid is None or full_name is None: 63 raise errors.OpPrereqError("%s '%s' not known" % (kind, name), 64 errors.ECODE_NOENT) 65 return (uuid, full_name)
66 67
68 -def ExpandInstanceUuidAndName(cfg, expected_uuid, name):
69 """Wrapper over L{_ExpandItemName} for instance.""" 70 (uuid, full_name) = _ExpandItemName(cfg.ExpandInstanceName, name, "Instance") 71 if expected_uuid is not None and uuid != expected_uuid: 72 raise errors.OpPrereqError( 73 "The instances UUID '%s' does not match the expected UUID '%s' for" 74 " instance '%s'. Maybe the instance changed since you submitted this" 75 " job." % (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE) 76 return (uuid, full_name)
77 78
79 -def ExpandNodeUuidAndName(cfg, expected_uuid, name):
80 """Expand a short node name into the node UUID and full name. 81 82 @type cfg: L{config.ConfigWriter} 83 @param cfg: The cluster configuration 84 @type expected_uuid: string 85 @param expected_uuid: expected UUID for the node (or None if there is no 86 expectation). If it does not match, a L{errors.OpPrereqError} is 87 raised. 88 @type name: string 89 @param name: the short node name 90 91 """ 92 (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node") 93 if expected_uuid is not None and uuid != expected_uuid: 94 raise errors.OpPrereqError( 95 "The nodes UUID '%s' does not match the expected UUID '%s' for node" 96 " '%s'. Maybe the node changed since you submitted this job." % 97 (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE) 98 return (uuid, full_name)
99 100
101 -def ShareAll():
102 """Returns a dict declaring all lock levels shared. 103 104 """ 105 return dict.fromkeys(locking.LEVELS, 1)
106 107
108 -def CheckNodeGroupInstances(cfg, group_uuid, owned_instance_names):
109 """Checks if the instances in a node group are still correct. 110 111 @type cfg: L{config.ConfigWriter} 112 @param cfg: The cluster configuration 113 @type group_uuid: string 114 @param group_uuid: Node group UUID 115 @type owned_instance_names: set or frozenset 116 @param owned_instance_names: List of currently owned instances 117 118 """ 119 wanted_instances = frozenset(cfg.GetInstanceNames( 120 cfg.GetNodeGroupInstances(group_uuid))) 121 if owned_instance_names != wanted_instances: 122 raise errors.OpPrereqError("Instances in node group '%s' changed since" 123 " locks were acquired, wanted '%s', have '%s';" 124 " retry the operation" % 125 (group_uuid, 126 utils.CommaJoin(wanted_instances), 127 utils.CommaJoin(owned_instance_names)), 128 errors.ECODE_STATE) 129 130 return wanted_instances
131 132
133 -def GetWantedNodes(lu, short_node_names):
134 """Returns list of checked and expanded node names. 135 136 @type lu: L{LogicalUnit} 137 @param lu: the logical unit on whose behalf we execute 138 @type short_node_names: list 139 @param short_node_names: list of node names or None for all nodes 140 @rtype: tuple of lists 141 @return: tupe with (list of node UUIDs, list of node names) 142 @raise errors.ProgrammerError: if the nodes parameter is wrong type 143 144 """ 145 if short_node_names: 146 node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0] 147 for name in short_node_names] 148 else: 149 node_uuids = lu.cfg.GetNodeList() 150 151 return (node_uuids, [lu.cfg.GetNodeName(uuid) for uuid in node_uuids])
152 153
154 -def GetWantedInstances(lu, short_inst_names):
155 """Returns list of checked and expanded instance names. 156 157 @type lu: L{LogicalUnit} 158 @param lu: the logical unit on whose behalf we execute 159 @type short_inst_names: list 160 @param short_inst_names: list of instance names or None for all instances 161 @rtype: tuple of lists 162 @return: tuple of (instance UUIDs, instance names) 163 @raise errors.OpPrereqError: if the instances parameter is wrong type 164 @raise errors.OpPrereqError: if any of the passed instances is not found 165 166 """ 167 if short_inst_names: 168 inst_uuids = [ExpandInstanceUuidAndName(lu.cfg, None, name)[0] 169 for name in short_inst_names] 170 else: 171 inst_uuids = lu.cfg.GetInstanceList() 172 return (inst_uuids, [lu.cfg.GetInstanceName(uuid) for uuid in inst_uuids])
173 174
175 -def RunPostHook(lu, node_name):
176 """Runs the post-hook for an opcode on a single node. 177 178 """ 179 hm = lu.proc.BuildHooksManager(lu) 180 try: 181 hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=[node_name]) 182 except Exception, err: # pylint: disable=W0703 183 lu.LogWarning("Errors occurred running hooks on %s: %s", 184 node_name, err)
185 186
187 -def RedistributeAncillaryFiles(lu):
188 """Distribute additional files which are part of the cluster configuration. 189 190 ConfigWriter takes care of distributing the config and ssconf files, but 191 there are more files which should be distributed to all nodes. This function 192 makes sure those are copied. 193 194 """ 195 # Gather target nodes 196 cluster = lu.cfg.GetClusterInfo() 197 master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode()) 198 199 online_node_uuids = lu.cfg.GetOnlineNodeList() 200 online_node_uuid_set = frozenset(online_node_uuids) 201 vm_node_uuids = list(online_node_uuid_set.intersection( 202 lu.cfg.GetVmCapableNodeList())) 203 204 # Never distribute to master node 205 for node_uuids in [online_node_uuids, vm_node_uuids]: 206 if master_info.uuid in node_uuids: 207 node_uuids.remove(master_info.uuid) 208 209 # Gather file lists 210 (files_all, _, files_mc, files_vm) = \ 211 ComputeAncillaryFiles(cluster, True) 212 213 # Never re-distribute configuration file from here 214 assert not (pathutils.CLUSTER_CONF_FILE in files_all or 215 pathutils.CLUSTER_CONF_FILE in files_vm) 216 assert not files_mc, "Master candidates not handled in this function" 217 218 filemap = [ 219 (online_node_uuids, files_all), 220 (vm_node_uuids, files_vm), 221 ] 222 223 # Upload the files 224 for (node_uuids, files) in filemap: 225 for fname in files: 226 UploadHelper(lu, node_uuids, fname)
227 228
229 -def ComputeAncillaryFiles(cluster, redist):
230 """Compute files external to Ganeti which need to be consistent. 231 232 @type redist: boolean 233 @param redist: Whether to include files which need to be redistributed 234 235 """ 236 # Compute files for all nodes 237 files_all = set([ 238 pathutils.SSH_KNOWN_HOSTS_FILE, 239 pathutils.CONFD_HMAC_KEY, 240 pathutils.CLUSTER_DOMAIN_SECRET_FILE, 241 pathutils.SPICE_CERT_FILE, 242 pathutils.SPICE_CACERT_FILE, 243 pathutils.RAPI_USERS_FILE, 244 ]) 245 246 if redist: 247 # we need to ship at least the RAPI certificate 248 files_all.add(pathutils.RAPI_CERT_FILE) 249 else: 250 files_all.update(pathutils.ALL_CERT_FILES) 251 files_all.update(ssconf.SimpleStore().GetFileList()) 252 253 if cluster.modify_etc_hosts: 254 files_all.add(pathutils.ETC_HOSTS) 255 256 if cluster.use_external_mip_script: 257 files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT) 258 259 # Files which are optional, these must: 260 # - be present in one other category as well 261 # - either exist or not exist on all nodes of that category (mc, vm all) 262 files_opt = set([ 263 pathutils.RAPI_USERS_FILE, 264 ]) 265 266 # Files which should only be on master candidates 267 files_mc = set() 268 269 if not redist: 270 files_mc.add(pathutils.CLUSTER_CONF_FILE) 271 272 # File storage 273 if (not redist and (cluster.IsFileStorageEnabled() or 274 cluster.IsSharedFileStorageEnabled())): 275 files_all.add(pathutils.FILE_STORAGE_PATHS_FILE) 276 files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE) 277 278 # Files which should only be on VM-capable nodes 279 files_vm = set( 280 filename 281 for hv_name in cluster.enabled_hypervisors 282 for filename in 283 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0]) 284 285 files_opt |= set( 286 filename 287 for hv_name in cluster.enabled_hypervisors 288 for filename in 289 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1]) 290 291 # Filenames in each category must be unique 292 all_files_set = files_all | files_mc | files_vm 293 assert (len(all_files_set) == 294 sum(map(len, [files_all, files_mc, files_vm]))), \ 295 "Found file listed in more than one file list" 296 297 # Optional files must be present in one other category 298 assert all_files_set.issuperset(files_opt), \ 299 "Optional file not in a different required list" 300 301 # This one file should never ever be re-distributed via RPC 302 assert not (redist and 303 pathutils.FILE_STORAGE_PATHS_FILE in all_files_set) 304 305 return (files_all, files_opt, files_mc, files_vm)
306 307
308 -def UploadHelper(lu, node_uuids, fname):
309 """Helper for uploading a file and showing warnings. 310 311 """ 312 if os.path.exists(fname): 313 result = lu.rpc.call_upload_file(node_uuids, fname) 314 for to_node_uuids, to_result in result.items(): 315 msg = to_result.fail_msg 316 if msg: 317 msg = ("Copy of file %s to node %s failed: %s" % 318 (fname, lu.cfg.GetNodeName(to_node_uuids), msg)) 319 lu.LogWarning(msg)
320 321
322 -def MergeAndVerifyHvState(op_input, obj_input):
323 """Combines the hv state from an opcode with the one of the object 324 325 @param op_input: The input dict from the opcode 326 @param obj_input: The input dict from the objects 327 @return: The verified and updated dict 328 329 """ 330 if op_input: 331 invalid_hvs = set(op_input) - constants.HYPER_TYPES 332 if invalid_hvs: 333 raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:" 334 " %s" % utils.CommaJoin(invalid_hvs), 335 errors.ECODE_INVAL) 336 if obj_input is None: 337 obj_input = {} 338 type_check = constants.HVSTS_PARAMETER_TYPES 339 return _UpdateAndVerifySubDict(obj_input, op_input, type_check) 340 341 return None
342 343
344 -def MergeAndVerifyDiskState(op_input, obj_input):
345 """Combines the disk state from an opcode with the one of the object 346 347 @param op_input: The input dict from the opcode 348 @param obj_input: The input dict from the objects 349 @return: The verified and updated dict 350 """ 351 if op_input: 352 invalid_dst = set(op_input) - constants.DS_VALID_TYPES 353 if invalid_dst: 354 raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" % 355 utils.CommaJoin(invalid_dst), 356 errors.ECODE_INVAL) 357 type_check = constants.DSS_PARAMETER_TYPES 358 if obj_input is None: 359 obj_input = {} 360 return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value, 361 type_check)) 362 for key, value in op_input.items()) 363 364 return None
365 366
367 -def CheckOSParams(lu, required, node_uuids, osname, osparams):
368 """OS parameters validation. 369 370 @type lu: L{LogicalUnit} 371 @param lu: the logical unit for which we check 372 @type required: boolean 373 @param required: whether the validation should fail if the OS is not 374 found 375 @type node_uuids: list 376 @param node_uuids: the list of nodes on which we should check 377 @type osname: string 378 @param osname: the name of the hypervisor we should use 379 @type osparams: dict 380 @param osparams: the parameters which we need to check 381 @raise errors.OpPrereqError: if the parameters are not valid 382 383 """ 384 node_uuids = _FilterVmNodes(lu, node_uuids) 385 result = lu.rpc.call_os_validate(node_uuids, required, osname, 386 [constants.OS_VALIDATE_PARAMETERS], 387 osparams) 388 for node_uuid, nres in result.items(): 389 # we don't check for offline cases since this should be run only 390 # against the master node and/or an instance's nodes 391 nres.Raise("OS Parameters validation failed on node %s" % 392 lu.cfg.GetNodeName(node_uuid)) 393 if not nres.payload: 394 lu.LogInfo("OS %s not found on node %s, validation skipped", 395 osname, lu.cfg.GetNodeName(node_uuid))
396 397
398 -def CheckHVParams(lu, node_uuids, hvname, hvparams):
399 """Hypervisor parameter validation. 400 401 This function abstract the hypervisor parameter validation to be 402 used in both instance create and instance modify. 403 404 @type lu: L{LogicalUnit} 405 @param lu: the logical unit for which we check 406 @type node_uuids: list 407 @param node_uuids: the list of nodes on which we should check 408 @type hvname: string 409 @param hvname: the name of the hypervisor we should use 410 @type hvparams: dict 411 @param hvparams: the parameters which we need to check 412 @raise errors.OpPrereqError: if the parameters are not valid 413 414 """ 415 node_uuids = _FilterVmNodes(lu, node_uuids) 416 417 cluster = lu.cfg.GetClusterInfo() 418 hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams) 419 420 hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull) 421 for node_uuid in node_uuids: 422 info = hvinfo[node_uuid] 423 if info.offline: 424 continue 425 info.Raise("Hypervisor parameter validation failed on node %s" % 426 lu.cfg.GetNodeName(node_uuid))
427 428
429 -def AdjustCandidatePool(lu, exceptions):
430 """Adjust the candidate pool after node operations. 431 432 """ 433 mod_list = lu.cfg.MaintainCandidatePool(exceptions) 434 if mod_list: 435 lu.LogInfo("Promoted nodes to master candidate role: %s", 436 utils.CommaJoin(node.name for node in mod_list)) 437 for node in mod_list: 438 lu.context.ReaddNode(node) 439 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions) 440 if mc_now > mc_max: 441 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" % 442 (mc_now, mc_max))
443 444
445 -def CheckNodePVs(nresult, exclusive_storage):
446 """Check node PVs. 447 448 """ 449 pvlist_dict = nresult.get(constants.NV_PVLIST, None) 450 if pvlist_dict is None: 451 return (["Can't get PV list from node"], None) 452 pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict) 453 errlist = [] 454 # check that ':' is not present in PV names, since it's a 455 # special character for lvcreate (denotes the range of PEs to 456 # use on the PV) 457 for pv in pvlist: 458 if ":" in pv.name: 459 errlist.append("Invalid character ':' in PV '%s' of VG '%s'" % 460 (pv.name, pv.vg_name)) 461 es_pvinfo = None 462 if exclusive_storage: 463 (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist) 464 errlist.extend(errmsgs) 465 shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None) 466 if shared_pvs: 467 for (pvname, lvlist) in shared_pvs: 468 # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...) 469 errlist.append("PV %s is shared among unrelated LVs (%s)" % 470 (pvname, utils.CommaJoin(lvlist))) 471 return (errlist, es_pvinfo)
472 473
474 -def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
475 """Computes if value is in the desired range. 476 477 @param name: name of the parameter for which we perform the check 478 @param qualifier: a qualifier used in the error message (e.g. 'disk/1', 479 not just 'disk') 480 @param ispecs: dictionary containing min and max values 481 @param value: actual value that we want to use 482 @return: None or an error string 483 484 """ 485 if value in [None, constants.VALUE_AUTO]: 486 return None 487 max_v = ispecs[constants.ISPECS_MAX].get(name, value) 488 min_v = ispecs[constants.ISPECS_MIN].get(name, value) 489 if value > max_v or min_v > value: 490 if qualifier: 491 fqn = "%s/%s" % (name, qualifier) 492 else: 493 fqn = name 494 return ("%s value %s is not in range [%s, %s]" % 495 (fqn, value, min_v, max_v)) 496 return None
497 498
499 -def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count, 500 nic_count, disk_sizes, spindle_use, 501 disk_template, 502 _compute_fn=_ComputeMinMaxSpec):
503 """Verifies ipolicy against provided specs. 504 505 @type ipolicy: dict 506 @param ipolicy: The ipolicy 507 @type mem_size: int 508 @param mem_size: The memory size 509 @type cpu_count: int 510 @param cpu_count: Used cpu cores 511 @type disk_count: int 512 @param disk_count: Number of disks used 513 @type nic_count: int 514 @param nic_count: Number of nics used 515 @type disk_sizes: list of ints 516 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count}) 517 @type spindle_use: int 518 @param spindle_use: The number of spindles this instance uses 519 @type disk_template: string 520 @param disk_template: The disk template of the instance 521 @param _compute_fn: The compute function (unittest only) 522 @return: A list of violations, or an empty list of no violations are found 523 524 """ 525 assert disk_count == len(disk_sizes) 526 527 test_settings = [ 528 (constants.ISPEC_MEM_SIZE, "", mem_size), 529 (constants.ISPEC_CPU_COUNT, "", cpu_count), 530 (constants.ISPEC_NIC_COUNT, "", nic_count), 531 (constants.ISPEC_SPINDLE_USE, "", spindle_use), 532 ] + [(constants.ISPEC_DISK_SIZE, str(idx), d) 533 for idx, d in enumerate(disk_sizes)] 534 if disk_template != constants.DT_DISKLESS: 535 # This check doesn't make sense for diskless instances 536 test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count)) 537 ret = [] 538 allowed_dts = ipolicy[constants.IPOLICY_DTS] 539 if disk_template not in allowed_dts: 540 ret.append("Disk template %s is not allowed (allowed templates: %s)" % 541 (disk_template, utils.CommaJoin(allowed_dts))) 542 543 min_errs = None 544 for minmax in ipolicy[constants.ISPECS_MINMAX]: 545 errs = filter(None, 546 (_compute_fn(name, qualifier, minmax, value) 547 for (name, qualifier, value) in test_settings)) 548 if min_errs is None or len(errs) < len(min_errs): 549 min_errs = errs 550 assert min_errs is not None 551 return ret + min_errs
552 553
554 -def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg, 555 _compute_fn=ComputeIPolicySpecViolation):
556 """Compute if instance meets the specs of ipolicy. 557 558 @type ipolicy: dict 559 @param ipolicy: The ipolicy to verify against 560 @type instance: L{objects.Instance} 561 @param instance: The instance to verify 562 @type cfg: L{config.ConfigWriter} 563 @param cfg: Cluster configuration 564 @param _compute_fn: The function to verify ipolicy (unittest only) 565 @see: L{ComputeIPolicySpecViolation} 566 567 """ 568 ret = [] 569 be_full = cfg.GetClusterInfo().FillBE(instance) 570 mem_size = be_full[constants.BE_MAXMEM] 571 cpu_count = be_full[constants.BE_VCPUS] 572 es_flags = rpc.GetExclusiveStorageForNodes(cfg, instance.all_nodes) 573 if any(es_flags.values()): 574 # With exclusive storage use the actual spindles 575 try: 576 spindle_use = sum([disk.spindles for disk in instance.disks]) 577 except TypeError: 578 ret.append("Number of spindles not configured for disks of instance %s" 579 " while exclusive storage is enabled, try running gnt-cluster" 580 " repair-disk-sizes" % instance.name) 581 # _ComputeMinMaxSpec ignores 'None's 582 spindle_use = None 583 else: 584 spindle_use = be_full[constants.BE_SPINDLE_USE] 585 disk_count = len(instance.disks) 586 disk_sizes = [disk.size for disk in instance.disks] 587 nic_count = len(instance.nics) 588 disk_template = instance.disk_template 589 590 return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count, 591 disk_sizes, spindle_use, disk_template)
592 593
594 -def _ComputeViolatingInstances(ipolicy, instances, cfg):
595 """Computes a set of instances who violates given ipolicy. 596 597 @param ipolicy: The ipolicy to verify 598 @type instances: L{objects.Instance} 599 @param instances: List of instances to verify 600 @type cfg: L{config.ConfigWriter} 601 @param cfg: Cluster configuration 602 @return: A frozenset of instance names violating the ipolicy 603 604 """ 605 return frozenset([inst.name for inst in instances 606 if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
607 608
609 -def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
610 """Computes a set of any instances that would violate the new ipolicy. 611 612 @param old_ipolicy: The current (still in-place) ipolicy 613 @param new_ipolicy: The new (to become) ipolicy 614 @param instances: List of instances to verify 615 @type cfg: L{config.ConfigWriter} 616 @param cfg: Cluster configuration 617 @return: A list of instances which violates the new ipolicy but 618 did not before 619 620 """ 621 return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) - 622 _ComputeViolatingInstances(old_ipolicy, instances, cfg))
623 624
625 -def GetUpdatedParams(old_params, update_dict, 626 use_default=True, use_none=False):
627 """Return the new version of a parameter dictionary. 628 629 @type old_params: dict 630 @param old_params: old parameters 631 @type update_dict: dict 632 @param update_dict: dict containing new parameter values, or 633 constants.VALUE_DEFAULT to reset the parameter to its default 634 value 635 @param use_default: boolean 636 @type use_default: whether to recognise L{constants.VALUE_DEFAULT} 637 values as 'to be deleted' values 638 @param use_none: boolean 639 @type use_none: whether to recognise C{None} values as 'to be 640 deleted' values 641 @rtype: dict 642 @return: the new parameter dictionary 643 644 """ 645 params_copy = copy.deepcopy(old_params) 646 for key, val in update_dict.iteritems(): 647 if ((use_default and val == constants.VALUE_DEFAULT) or 648 (use_none and val is None)): 649 try: 650 del params_copy[key] 651 except KeyError: 652 pass 653 else: 654 params_copy[key] = val 655 return params_copy
656 657
658 -def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
659 """Return the new version of an instance policy. 660 661 @param group_policy: whether this policy applies to a group and thus 662 we should support removal of policy entries 663 664 """ 665 ipolicy = copy.deepcopy(old_ipolicy) 666 for key, value in new_ipolicy.items(): 667 if key not in constants.IPOLICY_ALL_KEYS: 668 raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key, 669 errors.ECODE_INVAL) 670 if (not value or value == [constants.VALUE_DEFAULT] or 671 value == constants.VALUE_DEFAULT): 672 if group_policy: 673 if key in ipolicy: 674 del ipolicy[key] 675 else: 676 raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'" 677 " on the cluster'" % key, 678 errors.ECODE_INVAL) 679 else: 680 if key in constants.IPOLICY_PARAMETERS: 681 # FIXME: we assume all such values are float 682 try: 683 ipolicy[key] = float(value) 684 except (TypeError, ValueError), err: 685 raise errors.OpPrereqError("Invalid value for attribute" 686 " '%s': '%s', error: %s" % 687 (key, value, err), errors.ECODE_INVAL) 688 elif key == constants.ISPECS_MINMAX: 689 for minmax in value: 690 for k in minmax.keys(): 691 utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES) 692 ipolicy[key] = value 693 elif key == constants.ISPECS_STD: 694 if group_policy: 695 msg = "%s cannot appear in group instance specs" % key 696 raise errors.OpPrereqError(msg, errors.ECODE_INVAL) 697 ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value, 698 use_none=False, use_default=False) 699 utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES) 700 else: 701 # FIXME: we assume all others are lists; this should be redone 702 # in a nicer way 703 ipolicy[key] = list(value) 704 try: 705 objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy) 706 except errors.ConfigurationError, err: 707 raise errors.OpPrereqError("Invalid instance policy: %s" % err, 708 errors.ECODE_INVAL) 709 return ipolicy
710 711
712 -def AnnotateDiskParams(instance, devs, cfg):
713 """Little helper wrapper to the rpc annotation method. 714 715 @param instance: The instance object 716 @type devs: List of L{objects.Disk} 717 @param devs: The root devices (not any of its children!) 718 @param cfg: The config object 719 @returns The annotated disk copies 720 @see L{rpc.AnnotateDiskParams} 721 722 """ 723 return rpc.AnnotateDiskParams(instance.disk_template, devs, 724 cfg.GetInstanceDiskParams(instance))
725 726
727 -def SupportsOob(cfg, node):
728 """Tells if node supports OOB. 729 730 @type cfg: L{config.ConfigWriter} 731 @param cfg: The cluster configuration 732 @type node: L{objects.Node} 733 @param node: The node 734 @return: The OOB script if supported or an empty string otherwise 735 736 """ 737 return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
738 739
740 -def _UpdateAndVerifySubDict(base, updates, type_check):
741 """Updates and verifies a dict with sub dicts of the same type. 742 743 @param base: The dict with the old data 744 @param updates: The dict with the new data 745 @param type_check: Dict suitable to ForceDictType to verify correct types 746 @returns: A new dict with updated and verified values 747 748 """ 749 def fn(old, value): 750 new = GetUpdatedParams(old, value) 751 utils.ForceDictType(new, type_check) 752 return new
753 754 ret = copy.deepcopy(base) 755 ret.update(dict((key, fn(base.get(key, {}), value)) 756 for key, value in updates.items())) 757 return ret 758 759
760 -def _FilterVmNodes(lu, node_uuids):
761 """Filters out non-vm_capable nodes from a list. 762 763 @type lu: L{LogicalUnit} 764 @param lu: the logical unit for which we check 765 @type node_uuids: list 766 @param node_uuids: the list of nodes on which we should check 767 @rtype: list 768 @return: the list of vm-capable nodes 769 770 """ 771 vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList()) 772 return [uuid for uuid in node_uuids if uuid not in vm_nodes]
773 774
775 -def GetDefaultIAllocator(cfg, ialloc):
776 """Decides on which iallocator to use. 777 778 @type cfg: L{config.ConfigWriter} 779 @param cfg: Cluster configuration object 780 @type ialloc: string or None 781 @param ialloc: Iallocator specified in opcode 782 @rtype: string 783 @return: Iallocator name 784 785 """ 786 if not ialloc: 787 # Use default iallocator 788 ialloc = cfg.GetDefaultIAllocator() 789 790 if not ialloc: 791 raise errors.OpPrereqError("No iallocator was specified, neither in the" 792 " opcode nor as a cluster-wide default", 793 errors.ECODE_INVAL) 794 795 return ialloc
796 797
798 -def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_node_uuids, 799 cur_group_uuid):
800 """Checks if node groups for locked instances are still correct. 801 802 @type cfg: L{config.ConfigWriter} 803 @param cfg: Cluster configuration 804 @type instances: dict; string as key, L{objects.Instance} as value 805 @param instances: Dictionary, instance UUID as key, instance object as value 806 @type owned_groups: iterable of string 807 @param owned_groups: List of owned groups 808 @type owned_node_uuids: iterable of string 809 @param owned_node_uuids: List of owned nodes 810 @type cur_group_uuid: string or None 811 @param cur_group_uuid: Optional group UUID to check against instance's groups 812 813 """ 814 for (uuid, inst) in instances.items(): 815 assert owned_node_uuids.issuperset(inst.all_nodes), \ 816 "Instance %s's nodes changed while we kept the lock" % inst.name 817 818 inst_groups = CheckInstanceNodeGroups(cfg, uuid, owned_groups) 819 820 assert cur_group_uuid is None or cur_group_uuid in inst_groups, \ 821 "Instance %s has no node in group %s" % (inst.name, cur_group_uuid)
822 823
824 -def CheckInstanceNodeGroups(cfg, inst_uuid, owned_groups, primary_only=False):
825 """Checks if the owned node groups are still correct for an instance. 826 827 @type cfg: L{config.ConfigWriter} 828 @param cfg: The cluster configuration 829 @type inst_uuid: string 830 @param inst_uuid: Instance UUID 831 @type owned_groups: set or frozenset 832 @param owned_groups: List of currently owned node groups 833 @type primary_only: boolean 834 @param primary_only: Whether to check node groups for only the primary node 835 836 """ 837 inst_groups = cfg.GetInstanceNodeGroups(inst_uuid, primary_only) 838 839 if not owned_groups.issuperset(inst_groups): 840 raise errors.OpPrereqError("Instance %s's node groups changed since" 841 " locks were acquired, current groups are" 842 " are '%s', owning groups '%s'; retry the" 843 " operation" % 844 (cfg.GetInstanceName(inst_uuid), 845 utils.CommaJoin(inst_groups), 846 utils.CommaJoin(owned_groups)), 847 errors.ECODE_STATE) 848 849 return inst_groups
850 851
852 -def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
853 """Unpacks the result of change-group and node-evacuate iallocator requests. 854 855 Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and 856 L{constants.IALLOCATOR_MODE_CHG_GROUP}. 857 858 @type lu: L{LogicalUnit} 859 @param lu: Logical unit instance 860 @type alloc_result: tuple/list 861 @param alloc_result: Result from iallocator 862 @type early_release: bool 863 @param early_release: Whether to release locks early if possible 864 @type use_nodes: bool 865 @param use_nodes: Whether to display node names instead of groups 866 867 """ 868 (moved, failed, jobs) = alloc_result 869 870 if failed: 871 failreason = utils.CommaJoin("%s (%s)" % (name, reason) 872 for (name, reason) in failed) 873 lu.LogWarning("Unable to evacuate instances %s", failreason) 874 raise errors.OpExecError("Unable to evacuate instances %s" % failreason) 875 876 if moved: 877 lu.LogInfo("Instances to be moved: %s", 878 utils.CommaJoin( 879 "%s (to %s)" % 880 (name, _NodeEvacDest(use_nodes, group, node_names)) 881 for (name, group, node_names) in moved)) 882 883 return [map(compat.partial(_SetOpEarlyRelease, early_release), 884 map(opcodes.OpCode.LoadOpCode, ops)) 885 for ops in jobs]
886 887
888 -def _NodeEvacDest(use_nodes, group, node_names):
889 """Returns group or nodes depending on caller's choice. 890 891 """ 892 if use_nodes: 893 return utils.CommaJoin(node_names) 894 else: 895 return group
896 897
898 -def _SetOpEarlyRelease(early_release, op):
899 """Sets C{early_release} flag on opcodes if available. 900 901 """ 902 try: 903 op.early_release = early_release 904 except AttributeError: 905 assert not isinstance(op, opcodes.OpInstanceReplaceDisks) 906 907 return op
908 909
910 -def MapInstanceLvsToNodes(instances):
911 """Creates a map from (node, volume) to instance name. 912 913 @type instances: list of L{objects.Instance} 914 @rtype: dict; tuple of (node uuid, volume name) as key, L{objects.Instance} 915 object as value 916 917 """ 918 return dict(((node_uuid, vol), inst) 919 for inst in instances 920 for (node_uuid, vols) in inst.MapLVsByNode().items() 921 for vol in vols)
922 923
924 -def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
925 """Make sure that none of the given paramters is global. 926 927 If a global parameter is found, an L{errors.OpPrereqError} exception is 928 raised. This is used to avoid setting global parameters for individual nodes. 929 930 @type params: dictionary 931 @param params: Parameters to check 932 @type glob_pars: dictionary 933 @param glob_pars: Forbidden parameters 934 @type kind: string 935 @param kind: Kind of parameters (e.g. "node") 936 @type bad_levels: string 937 @param bad_levels: Level(s) at which the parameters are forbidden (e.g. 938 "instance") 939 @type good_levels: strings 940 @param good_levels: Level(s) at which the parameters are allowed (e.g. 941 "cluster or group") 942 943 """ 944 used_globals = glob_pars.intersection(params) 945 if used_globals: 946 msg = ("The following %s parameters are global and cannot" 947 " be customized at %s level, please modify them at" 948 " %s level: %s" % 949 (kind, bad_levels, good_levels, utils.CommaJoin(used_globals))) 950 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
951 952
953 -def IsExclusiveStorageEnabledNode(cfg, node):
954 """Whether exclusive_storage is in effect for the given node. 955 956 @type cfg: L{config.ConfigWriter} 957 @param cfg: The cluster configuration 958 @type node: L{objects.Node} 959 @param node: The node 960 @rtype: bool 961 @return: The effective value of exclusive_storage 962 963 """ 964 return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
965 966
967 -def CheckInstanceState(lu, instance, req_states, msg=None):
968 """Ensure that an instance is in one of the required states. 969 970 @param lu: the LU on behalf of which we make the check 971 @param instance: the instance to check 972 @param msg: if passed, should be a message to replace the default one 973 @raise errors.OpPrereqError: if the instance is not in the required state 974 975 """ 976 if msg is None: 977 msg = ("can't use instance from outside %s states" % 978 utils.CommaJoin(req_states)) 979 if instance.admin_state not in req_states: 980 raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" % 981 (instance.name, instance.admin_state, msg), 982 errors.ECODE_STATE) 983 984 if constants.ADMINST_UP not in req_states: 985 pnode_uuid = instance.primary_node 986 if not lu.cfg.GetNodeInfo(pnode_uuid).offline: 987 all_hvparams = lu.cfg.GetClusterInfo().hvparams 988 ins_l = lu.rpc.call_instance_list( 989 [pnode_uuid], [instance.hypervisor], all_hvparams)[pnode_uuid] 990 ins_l.Raise("Can't contact node %s for instance information" % 991 lu.cfg.GetNodeName(pnode_uuid), 992 prereq=True, ecode=errors.ECODE_ENVIRON) 993 if instance.name in ins_l.payload: 994 raise errors.OpPrereqError("Instance %s is running, %s" % 995 (instance.name, msg), errors.ECODE_STATE) 996 else: 997 lu.LogWarning("Primary node offline, ignoring check that instance" 998 " is down")
999 1000
1001 -def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1002 """Check the sanity of iallocator and node arguments and use the 1003 cluster-wide iallocator if appropriate. 1004 1005 Check that at most one of (iallocator, node) is specified. If none is 1006 specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT}, 1007 then the LU's opcode's iallocator slot is filled with the cluster-wide 1008 default iallocator. 1009 1010 @type iallocator_slot: string 1011 @param iallocator_slot: the name of the opcode iallocator slot 1012 @type node_slot: string 1013 @param node_slot: the name of the opcode target node slot 1014 1015 """ 1016 node = getattr(lu.op, node_slot, None) 1017 ialloc = getattr(lu.op, iallocator_slot, None) 1018 if node == []: 1019 node = None 1020 1021 if node is not None and ialloc is not None: 1022 raise errors.OpPrereqError("Do not specify both, iallocator and node", 1023 errors.ECODE_INVAL) 1024 elif ((node is None and ialloc is None) or 1025 ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT): 1026 default_iallocator = lu.cfg.GetDefaultIAllocator() 1027 if default_iallocator: 1028 setattr(lu.op, iallocator_slot, default_iallocator) 1029 else: 1030 raise errors.OpPrereqError("No iallocator or node given and no" 1031 " cluster-wide default iallocator found;" 1032 " please specify either an iallocator or a" 1033 " node, or set a cluster-wide default" 1034 " iallocator", errors.ECODE_INVAL)
1035 1036
1037 -def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_uuid, prereq):
1038 faulty = [] 1039 1040 for dev in instance.disks: 1041 cfg.SetDiskID(dev, node_uuid) 1042 1043 result = rpc_runner.call_blockdev_getmirrorstatus( 1044 node_uuid, (instance.disks, instance)) 1045 result.Raise("Failed to get disk status from node %s" % 1046 cfg.GetNodeName(node_uuid), 1047 prereq=prereq, ecode=errors.ECODE_ENVIRON) 1048 1049 for idx, bdev_status in enumerate(result.payload): 1050 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY: 1051 faulty.append(idx) 1052 1053 return faulty
1054 1055
1056 -def CheckNodeOnline(lu, node_uuid, msg=None):
1057 """Ensure that a given node is online. 1058 1059 @param lu: the LU on behalf of which we make the check 1060 @param node_uuid: the node to check 1061 @param msg: if passed, should be a message to replace the default one 1062 @raise errors.OpPrereqError: if the node is offline 1063 1064 """ 1065 if msg is None: 1066 msg = "Can't use offline node" 1067 if lu.cfg.GetNodeInfo(node_uuid).offline: 1068 raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)), 1069 errors.ECODE_STATE)
1070 1071
1072 -def CheckDiskTemplateEnabled(cluster, disk_template):
1073 """Helper function to check if a disk template is enabled. 1074 1075 @type cluster: C{objects.Cluster} 1076 @param cluster: the cluster's configuration 1077 @type disk_template: str 1078 @param disk_template: the disk template to be checked 1079 1080 """ 1081 assert disk_template is not None 1082 if disk_template not in constants.DISK_TEMPLATES: 1083 raise errors.OpPrereqError("'%s' is not a valid disk template." 1084 " Valid disk templates are: %s" % 1085 (disk_template, 1086 ",".join(constants.DISK_TEMPLATES))) 1087 if not disk_template in cluster.enabled_disk_templates: 1088 raise errors.OpPrereqError("Disk template '%s' is not enabled in cluster." 1089 " Enabled disk templates are: %s" % 1090 (disk_template, 1091 ",".join(cluster.enabled_disk_templates)))
1092 1093
1094 -def CheckStorageTypeEnabled(cluster, storage_type):
1095 """Helper function to check if a storage type is enabled. 1096 1097 @type cluster: C{objects.Cluster} 1098 @param cluster: the cluster's configuration 1099 @type storage_type: str 1100 @param storage_type: the storage type to be checked 1101 1102 """ 1103 assert storage_type is not None 1104 assert storage_type in constants.STORAGE_TYPES 1105 # special case for lvm-pv, because it cannot be enabled 1106 # via disk templates 1107 if storage_type == constants.ST_LVM_PV: 1108 CheckStorageTypeEnabled(cluster, constants.ST_LVM_VG) 1109 else: 1110 possible_disk_templates = \ 1111 utils.storage.GetDiskTemplatesOfStorageType(storage_type) 1112 for disk_template in possible_disk_templates: 1113 if disk_template in cluster.enabled_disk_templates: 1114 return 1115 raise errors.OpPrereqError("No disk template of storage type '%s' is" 1116 " enabled in this cluster. Enabled disk" 1117 " templates are: %s" % (storage_type, 1118 ",".join(cluster.enabled_disk_templates)))
1119 1120
1121 -def CheckIpolicyVsDiskTemplates(ipolicy, enabled_disk_templates):
1122 """Checks ipolicy disk templates against enabled disk tempaltes. 1123 1124 @type ipolicy: dict 1125 @param ipolicy: the new ipolicy 1126 @type enabled_disk_templates: list of string 1127 @param enabled_disk_templates: list of enabled disk templates on the 1128 cluster 1129 @raises errors.OpPrereqError: if there is at least one allowed disk 1130 template that is not also enabled. 1131 1132 """ 1133 assert constants.IPOLICY_DTS in ipolicy 1134 allowed_disk_templates = ipolicy[constants.IPOLICY_DTS] 1135 not_enabled = set(allowed_disk_templates) - set(enabled_disk_templates) 1136 if not_enabled: 1137 raise errors.OpPrereqError("The following disk template are allowed" 1138 " by the ipolicy, but not enabled on the" 1139 " cluster: %s" % utils.CommaJoin(not_enabled))
1140