Package ganeti :: Package cmdlib :: Module common
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.common

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Common functions used by multiple logical units.""" 
  23   
  24  import copy 
  25  import os 
  26   
  27  from ganeti import compat 
  28  from ganeti import constants 
  29  from ganeti import errors 
  30  from ganeti import hypervisor 
  31  from ganeti import locking 
  32  from ganeti import objects 
  33  from ganeti import opcodes 
  34  from ganeti import pathutils 
  35  from ganeti import rpc 
  36  from ganeti import ssconf 
  37  from ganeti import utils 
  38   
  39   
  40  # States of instance 
  41  INSTANCE_DOWN = [constants.ADMINST_DOWN] 
  42  INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP] 
  43  INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE] 
  44   
  45  #: Instance status in which an instance can be marked as offline/online 
  46  CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([ 
  47    constants.ADMINST_OFFLINE, 
  48    ])) 
  49   
  50   
51 -def _ExpandItemName(fn, name, kind):
52 """Expand an item name. 53 54 @param fn: the function to use for expansion 55 @param name: requested item name 56 @param kind: text description ('Node' or 'Instance') 57 @return: the resolved (full) name 58 @raise errors.OpPrereqError: if the item is not found 59 60 """ 61 full_name = fn(name) 62 if full_name is None: 63 raise errors.OpPrereqError("%s '%s' not known" % (kind, name), 64 errors.ECODE_NOENT) 65 return full_name
66 67
68 -def ExpandInstanceName(cfg, name):
69 """Wrapper over L{_ExpandItemName} for instance.""" 70 return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
71 72
73 -def ExpandNodeName(cfg, name):
74 """Wrapper over L{_ExpandItemName} for nodes.""" 75 return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
76 77
78 -def ShareAll():
79 """Returns a dict declaring all lock levels shared. 80 81 """ 82 return dict.fromkeys(locking.LEVELS, 1)
83 84
85 -def CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
86 """Checks if the instances in a node group are still correct. 87 88 @type cfg: L{config.ConfigWriter} 89 @param cfg: The cluster configuration 90 @type group_uuid: string 91 @param group_uuid: Node group UUID 92 @type owned_instances: set or frozenset 93 @param owned_instances: List of currently owned instances 94 95 """ 96 wanted_instances = cfg.GetNodeGroupInstances(group_uuid) 97 if owned_instances != wanted_instances: 98 raise errors.OpPrereqError("Instances in node group '%s' changed since" 99 " locks were acquired, wanted '%s', have '%s';" 100 " retry the operation" % 101 (group_uuid, 102 utils.CommaJoin(wanted_instances), 103 utils.CommaJoin(owned_instances)), 104 errors.ECODE_STATE) 105 106 return wanted_instances
107 108
109 -def GetWantedNodes(lu, nodes):
110 """Returns list of checked and expanded node names. 111 112 @type lu: L{LogicalUnit} 113 @param lu: the logical unit on whose behalf we execute 114 @type nodes: list 115 @param nodes: list of node names or None for all nodes 116 @rtype: list 117 @return: the list of nodes, sorted 118 @raise errors.ProgrammerError: if the nodes parameter is wrong type 119 120 """ 121 if nodes: 122 return [ExpandNodeName(lu.cfg, name) for name in nodes] 123 124 return utils.NiceSort(lu.cfg.GetNodeList())
125 126
127 -def GetWantedInstances(lu, instances):
128 """Returns list of checked and expanded instance names. 129 130 @type lu: L{LogicalUnit} 131 @param lu: the logical unit on whose behalf we execute 132 @type instances: list 133 @param instances: list of instance names or None for all instances 134 @rtype: list 135 @return: the list of instances, sorted 136 @raise errors.OpPrereqError: if the instances parameter is wrong type 137 @raise errors.OpPrereqError: if any of the passed instances is not found 138 139 """ 140 if instances: 141 wanted = [ExpandInstanceName(lu.cfg, name) for name in instances] 142 else: 143 wanted = utils.NiceSort(lu.cfg.GetInstanceList()) 144 return wanted
145 146
147 -def RunPostHook(lu, node_name):
148 """Runs the post-hook for an opcode on a single node. 149 150 """ 151 hm = lu.proc.BuildHooksManager(lu) 152 try: 153 hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name]) 154 except Exception, err: # pylint: disable=W0703 155 lu.LogWarning("Errors occurred running hooks on %s: %s", 156 node_name, err)
157 158
159 -def RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
160 """Distribute additional files which are part of the cluster configuration. 161 162 ConfigWriter takes care of distributing the config and ssconf files, but 163 there are more files which should be distributed to all nodes. This function 164 makes sure those are copied. 165 166 @param lu: calling logical unit 167 @param additional_nodes: list of nodes not in the config to distribute to 168 @type additional_vm: boolean 169 @param additional_vm: whether the additional nodes are vm-capable or not 170 171 """ 172 # Gather target nodes 173 cluster = lu.cfg.GetClusterInfo() 174 master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode()) 175 176 online_nodes = lu.cfg.GetOnlineNodeList() 177 online_set = frozenset(online_nodes) 178 vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList())) 179 180 if additional_nodes is not None: 181 online_nodes.extend(additional_nodes) 182 if additional_vm: 183 vm_nodes.extend(additional_nodes) 184 185 # Never distribute to master node 186 for nodelist in [online_nodes, vm_nodes]: 187 if master_info.name in nodelist: 188 nodelist.remove(master_info.name) 189 190 # Gather file lists 191 (files_all, _, files_mc, files_vm) = \ 192 ComputeAncillaryFiles(cluster, True) 193 194 # Never re-distribute configuration file from here 195 assert not (pathutils.CLUSTER_CONF_FILE in files_all or 196 pathutils.CLUSTER_CONF_FILE in files_vm) 197 assert not files_mc, "Master candidates not handled in this function" 198 199 filemap = [ 200 (online_nodes, files_all), 201 (vm_nodes, files_vm), 202 ] 203 204 # Upload the files 205 for (node_list, files) in filemap: 206 for fname in files: 207 UploadHelper(lu, node_list, fname)
208 209
210 -def ComputeAncillaryFiles(cluster, redist):
211 """Compute files external to Ganeti which need to be consistent. 212 213 @type redist: boolean 214 @param redist: Whether to include files which need to be redistributed 215 216 """ 217 # Compute files for all nodes 218 files_all = set([ 219 pathutils.SSH_KNOWN_HOSTS_FILE, 220 pathutils.CONFD_HMAC_KEY, 221 pathutils.CLUSTER_DOMAIN_SECRET_FILE, 222 pathutils.SPICE_CERT_FILE, 223 pathutils.SPICE_CACERT_FILE, 224 pathutils.RAPI_USERS_FILE, 225 ]) 226 227 if redist: 228 # we need to ship at least the RAPI certificate 229 files_all.add(pathutils.RAPI_CERT_FILE) 230 else: 231 files_all.update(pathutils.ALL_CERT_FILES) 232 files_all.update(ssconf.SimpleStore().GetFileList()) 233 234 if cluster.modify_etc_hosts: 235 files_all.add(pathutils.ETC_HOSTS) 236 237 if cluster.use_external_mip_script: 238 files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT) 239 240 # Files which are optional, these must: 241 # - be present in one other category as well 242 # - either exist or not exist on all nodes of that category (mc, vm all) 243 files_opt = set([ 244 pathutils.RAPI_USERS_FILE, 245 ]) 246 247 # Files which should only be on master candidates 248 files_mc = set() 249 250 if not redist: 251 files_mc.add(pathutils.CLUSTER_CONF_FILE) 252 253 # File storage 254 if (not redist and (constants.ENABLE_FILE_STORAGE or 255 constants.ENABLE_SHARED_FILE_STORAGE)): 256 files_all.add(pathutils.FILE_STORAGE_PATHS_FILE) 257 files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE) 258 259 # Files which should only be on VM-capable nodes 260 files_vm = set( 261 filename 262 for hv_name in cluster.enabled_hypervisors 263 for filename in 264 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0]) 265 266 files_opt |= set( 267 filename 268 for hv_name in cluster.enabled_hypervisors 269 for filename in 270 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1]) 271 272 # Filenames in each category must be unique 273 all_files_set = files_all | files_mc | files_vm 274 assert (len(all_files_set) == 275 sum(map(len, [files_all, files_mc, files_vm]))), \ 276 "Found file listed in more than one file list" 277 278 # Optional files must be present in one other category 279 assert all_files_set.issuperset(files_opt), \ 280 "Optional file not in a different required list" 281 282 # This one file should never ever be re-distributed via RPC 283 assert not (redist and 284 pathutils.FILE_STORAGE_PATHS_FILE in all_files_set) 285 286 return (files_all, files_opt, files_mc, files_vm)
287 288
289 -def UploadHelper(lu, nodes, fname):
290 """Helper for uploading a file and showing warnings. 291 292 """ 293 if os.path.exists(fname): 294 result = lu.rpc.call_upload_file(nodes, fname) 295 for to_node, to_result in result.items(): 296 msg = to_result.fail_msg 297 if msg: 298 msg = ("Copy of file %s to node %s failed: %s" % 299 (fname, to_node, msg)) 300 lu.LogWarning(msg)
301 302
303 -def MergeAndVerifyHvState(op_input, obj_input):
304 """Combines the hv state from an opcode with the one of the object 305 306 @param op_input: The input dict from the opcode 307 @param obj_input: The input dict from the objects 308 @return: The verified and updated dict 309 310 """ 311 if op_input: 312 invalid_hvs = set(op_input) - constants.HYPER_TYPES 313 if invalid_hvs: 314 raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:" 315 " %s" % utils.CommaJoin(invalid_hvs), 316 errors.ECODE_INVAL) 317 if obj_input is None: 318 obj_input = {} 319 type_check = constants.HVSTS_PARAMETER_TYPES 320 return _UpdateAndVerifySubDict(obj_input, op_input, type_check) 321 322 return None
323 324
325 -def MergeAndVerifyDiskState(op_input, obj_input):
326 """Combines the disk state from an opcode with the one of the object 327 328 @param op_input: The input dict from the opcode 329 @param obj_input: The input dict from the objects 330 @return: The verified and updated dict 331 """ 332 if op_input: 333 invalid_dst = set(op_input) - constants.DS_VALID_TYPES 334 if invalid_dst: 335 raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" % 336 utils.CommaJoin(invalid_dst), 337 errors.ECODE_INVAL) 338 type_check = constants.DSS_PARAMETER_TYPES 339 if obj_input is None: 340 obj_input = {} 341 return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value, 342 type_check)) 343 for key, value in op_input.items()) 344 345 return None
346 347
348 -def CheckOSParams(lu, required, nodenames, osname, osparams):
349 """OS parameters validation. 350 351 @type lu: L{LogicalUnit} 352 @param lu: the logical unit for which we check 353 @type required: boolean 354 @param required: whether the validation should fail if the OS is not 355 found 356 @type nodenames: list 357 @param nodenames: the list of nodes on which we should check 358 @type osname: string 359 @param osname: the name of the hypervisor we should use 360 @type osparams: dict 361 @param osparams: the parameters which we need to check 362 @raise errors.OpPrereqError: if the parameters are not valid 363 364 """ 365 nodenames = _FilterVmNodes(lu, nodenames) 366 result = lu.rpc.call_os_validate(nodenames, required, osname, 367 [constants.OS_VALIDATE_PARAMETERS], 368 osparams) 369 for node, nres in result.items(): 370 # we don't check for offline cases since this should be run only 371 # against the master node and/or an instance's nodes 372 nres.Raise("OS Parameters validation failed on node %s" % node) 373 if not nres.payload: 374 lu.LogInfo("OS %s not found on node %s, validation skipped", 375 osname, node)
376 377
378 -def CheckHVParams(lu, nodenames, hvname, hvparams):
379 """Hypervisor parameter validation. 380 381 This function abstract the hypervisor parameter validation to be 382 used in both instance create and instance modify. 383 384 @type lu: L{LogicalUnit} 385 @param lu: the logical unit for which we check 386 @type nodenames: list 387 @param nodenames: the list of nodes on which we should check 388 @type hvname: string 389 @param hvname: the name of the hypervisor we should use 390 @type hvparams: dict 391 @param hvparams: the parameters which we need to check 392 @raise errors.OpPrereqError: if the parameters are not valid 393 394 """ 395 nodenames = _FilterVmNodes(lu, nodenames) 396 397 cluster = lu.cfg.GetClusterInfo() 398 hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams) 399 400 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull) 401 for node in nodenames: 402 info = hvinfo[node] 403 if info.offline: 404 continue 405 info.Raise("Hypervisor parameter validation failed on node %s" % node)
406 407
408 -def AdjustCandidatePool(lu, exceptions):
409 """Adjust the candidate pool after node operations. 410 411 """ 412 mod_list = lu.cfg.MaintainCandidatePool(exceptions) 413 if mod_list: 414 lu.LogInfo("Promoted nodes to master candidate role: %s", 415 utils.CommaJoin(node.name for node in mod_list)) 416 for name in mod_list: 417 lu.context.ReaddNode(name) 418 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions) 419 if mc_now > mc_max: 420 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" % 421 (mc_now, mc_max))
422 423
424 -def CheckNodePVs(nresult, exclusive_storage):
425 """Check node PVs. 426 427 """ 428 pvlist_dict = nresult.get(constants.NV_PVLIST, None) 429 if pvlist_dict is None: 430 return (["Can't get PV list from node"], None) 431 pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict) 432 errlist = [] 433 # check that ':' is not present in PV names, since it's a 434 # special character for lvcreate (denotes the range of PEs to 435 # use on the PV) 436 for pv in pvlist: 437 if ":" in pv.name: 438 errlist.append("Invalid character ':' in PV '%s' of VG '%s'" % 439 (pv.name, pv.vg_name)) 440 es_pvinfo = None 441 if exclusive_storage: 442 (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist) 443 errlist.extend(errmsgs) 444 shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None) 445 if shared_pvs: 446 for (pvname, lvlist) in shared_pvs: 447 # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...) 448 errlist.append("PV %s is shared among unrelated LVs (%s)" % 449 (pvname, utils.CommaJoin(lvlist))) 450 return (errlist, es_pvinfo)
451 452
453 -def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
454 """Computes if value is in the desired range. 455 456 @param name: name of the parameter for which we perform the check 457 @param qualifier: a qualifier used in the error message (e.g. 'disk/1', 458 not just 'disk') 459 @param ispecs: dictionary containing min and max values 460 @param value: actual value that we want to use 461 @return: None or an error string 462 463 """ 464 if value in [None, constants.VALUE_AUTO]: 465 return None 466 max_v = ispecs[constants.ISPECS_MAX].get(name, value) 467 min_v = ispecs[constants.ISPECS_MIN].get(name, value) 468 if value > max_v or min_v > value: 469 if qualifier: 470 fqn = "%s/%s" % (name, qualifier) 471 else: 472 fqn = name 473 return ("%s value %s is not in range [%s, %s]" % 474 (fqn, value, min_v, max_v)) 475 return None
476 477
478 -def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count, 479 nic_count, disk_sizes, spindle_use, 480 disk_template, 481 _compute_fn=_ComputeMinMaxSpec):
482 """Verifies ipolicy against provided specs. 483 484 @type ipolicy: dict 485 @param ipolicy: The ipolicy 486 @type mem_size: int 487 @param mem_size: The memory size 488 @type cpu_count: int 489 @param cpu_count: Used cpu cores 490 @type disk_count: int 491 @param disk_count: Number of disks used 492 @type nic_count: int 493 @param nic_count: Number of nics used 494 @type disk_sizes: list of ints 495 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count}) 496 @type spindle_use: int 497 @param spindle_use: The number of spindles this instance uses 498 @type disk_template: string 499 @param disk_template: The disk template of the instance 500 @param _compute_fn: The compute function (unittest only) 501 @return: A list of violations, or an empty list of no violations are found 502 503 """ 504 assert disk_count == len(disk_sizes) 505 506 test_settings = [ 507 (constants.ISPEC_MEM_SIZE, "", mem_size), 508 (constants.ISPEC_CPU_COUNT, "", cpu_count), 509 (constants.ISPEC_NIC_COUNT, "", nic_count), 510 (constants.ISPEC_SPINDLE_USE, "", spindle_use), 511 ] + [(constants.ISPEC_DISK_SIZE, str(idx), d) 512 for idx, d in enumerate(disk_sizes)] 513 if disk_template != constants.DT_DISKLESS: 514 # This check doesn't make sense for diskless instances 515 test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count)) 516 ret = [] 517 allowed_dts = ipolicy[constants.IPOLICY_DTS] 518 if disk_template not in allowed_dts: 519 ret.append("Disk template %s is not allowed (allowed templates: %s)" % 520 (disk_template, utils.CommaJoin(allowed_dts))) 521 522 min_errs = None 523 for minmax in ipolicy[constants.ISPECS_MINMAX]: 524 errs = filter(None, 525 (_compute_fn(name, qualifier, minmax, value) 526 for (name, qualifier, value) in test_settings)) 527 if min_errs is None or len(errs) < len(min_errs): 528 min_errs = errs 529 assert min_errs is not None 530 return ret + min_errs
531 532
533 -def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg, 534 _compute_fn=ComputeIPolicySpecViolation):
535 """Compute if instance meets the specs of ipolicy. 536 537 @type ipolicy: dict 538 @param ipolicy: The ipolicy to verify against 539 @type instance: L{objects.Instance} 540 @param instance: The instance to verify 541 @type cfg: L{config.ConfigWriter} 542 @param cfg: Cluster configuration 543 @param _compute_fn: The function to verify ipolicy (unittest only) 544 @see: L{ComputeIPolicySpecViolation} 545 546 """ 547 be_full = cfg.GetClusterInfo().FillBE(instance) 548 mem_size = be_full[constants.BE_MAXMEM] 549 cpu_count = be_full[constants.BE_VCPUS] 550 spindle_use = be_full[constants.BE_SPINDLE_USE] 551 disk_count = len(instance.disks) 552 disk_sizes = [disk.size for disk in instance.disks] 553 nic_count = len(instance.nics) 554 disk_template = instance.disk_template 555 556 return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count, 557 disk_sizes, spindle_use, disk_template)
558 559
560 -def _ComputeViolatingInstances(ipolicy, instances, cfg):
561 """Computes a set of instances who violates given ipolicy. 562 563 @param ipolicy: The ipolicy to verify 564 @type instances: L{objects.Instance} 565 @param instances: List of instances to verify 566 @type cfg: L{config.ConfigWriter} 567 @param cfg: Cluster configuration 568 @return: A frozenset of instance names violating the ipolicy 569 570 """ 571 return frozenset([inst.name for inst in instances 572 if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
573 574
575 -def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
576 """Computes a set of any instances that would violate the new ipolicy. 577 578 @param old_ipolicy: The current (still in-place) ipolicy 579 @param new_ipolicy: The new (to become) ipolicy 580 @param instances: List of instances to verify 581 @type cfg: L{config.ConfigWriter} 582 @param cfg: Cluster configuration 583 @return: A list of instances which violates the new ipolicy but 584 did not before 585 586 """ 587 return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) - 588 _ComputeViolatingInstances(old_ipolicy, instances, cfg))
589 590
591 -def GetUpdatedParams(old_params, update_dict, 592 use_default=True, use_none=False):
593 """Return the new version of a parameter dictionary. 594 595 @type old_params: dict 596 @param old_params: old parameters 597 @type update_dict: dict 598 @param update_dict: dict containing new parameter values, or 599 constants.VALUE_DEFAULT to reset the parameter to its default 600 value 601 @param use_default: boolean 602 @type use_default: whether to recognise L{constants.VALUE_DEFAULT} 603 values as 'to be deleted' values 604 @param use_none: boolean 605 @type use_none: whether to recognise C{None} values as 'to be 606 deleted' values 607 @rtype: dict 608 @return: the new parameter dictionary 609 610 """ 611 params_copy = copy.deepcopy(old_params) 612 for key, val in update_dict.iteritems(): 613 if ((use_default and val == constants.VALUE_DEFAULT) or 614 (use_none and val is None)): 615 try: 616 del params_copy[key] 617 except KeyError: 618 pass 619 else: 620 params_copy[key] = val 621 return params_copy
622 623
624 -def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
625 """Return the new version of an instance policy. 626 627 @param group_policy: whether this policy applies to a group and thus 628 we should support removal of policy entries 629 630 """ 631 ipolicy = copy.deepcopy(old_ipolicy) 632 for key, value in new_ipolicy.items(): 633 if key not in constants.IPOLICY_ALL_KEYS: 634 raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key, 635 errors.ECODE_INVAL) 636 if (not value or value == [constants.VALUE_DEFAULT] or 637 value == constants.VALUE_DEFAULT): 638 if group_policy: 639 if key in ipolicy: 640 del ipolicy[key] 641 else: 642 raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'" 643 " on the cluster'" % key, 644 errors.ECODE_INVAL) 645 else: 646 if key in constants.IPOLICY_PARAMETERS: 647 # FIXME: we assume all such values are float 648 try: 649 ipolicy[key] = float(value) 650 except (TypeError, ValueError), err: 651 raise errors.OpPrereqError("Invalid value for attribute" 652 " '%s': '%s', error: %s" % 653 (key, value, err), errors.ECODE_INVAL) 654 elif key == constants.ISPECS_MINMAX: 655 for minmax in value: 656 for k in minmax.keys(): 657 utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES) 658 ipolicy[key] = value 659 elif key == constants.ISPECS_STD: 660 if group_policy: 661 msg = "%s cannot appear in group instance specs" % key 662 raise errors.OpPrereqError(msg, errors.ECODE_INVAL) 663 ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value, 664 use_none=False, use_default=False) 665 utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES) 666 else: 667 # FIXME: we assume all others are lists; this should be redone 668 # in a nicer way 669 ipolicy[key] = list(value) 670 try: 671 objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy) 672 except errors.ConfigurationError, err: 673 raise errors.OpPrereqError("Invalid instance policy: %s" % err, 674 errors.ECODE_INVAL) 675 return ipolicy
676 677
678 -def AnnotateDiskParams(instance, devs, cfg):
679 """Little helper wrapper to the rpc annotation method. 680 681 @param instance: The instance object 682 @type devs: List of L{objects.Disk} 683 @param devs: The root devices (not any of its children!) 684 @param cfg: The config object 685 @returns The annotated disk copies 686 @see L{rpc.AnnotateDiskParams} 687 688 """ 689 return rpc.AnnotateDiskParams(instance.disk_template, devs, 690 cfg.GetInstanceDiskParams(instance))
691 692
693 -def SupportsOob(cfg, node):
694 """Tells if node supports OOB. 695 696 @type cfg: L{config.ConfigWriter} 697 @param cfg: The cluster configuration 698 @type node: L{objects.Node} 699 @param node: The node 700 @return: The OOB script if supported or an empty string otherwise 701 702 """ 703 return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
704 705
706 -def _UpdateAndVerifySubDict(base, updates, type_check):
707 """Updates and verifies a dict with sub dicts of the same type. 708 709 @param base: The dict with the old data 710 @param updates: The dict with the new data 711 @param type_check: Dict suitable to ForceDictType to verify correct types 712 @returns: A new dict with updated and verified values 713 714 """ 715 def fn(old, value): 716 new = GetUpdatedParams(old, value) 717 utils.ForceDictType(new, type_check) 718 return new
719 720 ret = copy.deepcopy(base) 721 ret.update(dict((key, fn(base.get(key, {}), value)) 722 for key, value in updates.items())) 723 return ret 724 725
726 -def _FilterVmNodes(lu, nodenames):
727 """Filters out non-vm_capable nodes from a list. 728 729 @type lu: L{LogicalUnit} 730 @param lu: the logical unit for which we check 731 @type nodenames: list 732 @param nodenames: the list of nodes on which we should check 733 @rtype: list 734 @return: the list of vm-capable nodes 735 736 """ 737 vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList()) 738 return [name for name in nodenames if name not in vm_nodes]
739 740
741 -def GetDefaultIAllocator(cfg, ialloc):
742 """Decides on which iallocator to use. 743 744 @type cfg: L{config.ConfigWriter} 745 @param cfg: Cluster configuration object 746 @type ialloc: string or None 747 @param ialloc: Iallocator specified in opcode 748 @rtype: string 749 @return: Iallocator name 750 751 """ 752 if not ialloc: 753 # Use default iallocator 754 ialloc = cfg.GetDefaultIAllocator() 755 756 if not ialloc: 757 raise errors.OpPrereqError("No iallocator was specified, neither in the" 758 " opcode nor as a cluster-wide default", 759 errors.ECODE_INVAL) 760 761 return ialloc
762 763
764 -def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes, 765 cur_group_uuid):
766 """Checks if node groups for locked instances are still correct. 767 768 @type cfg: L{config.ConfigWriter} 769 @param cfg: Cluster configuration 770 @type instances: dict; string as key, L{objects.Instance} as value 771 @param instances: Dictionary, instance name as key, instance object as value 772 @type owned_groups: iterable of string 773 @param owned_groups: List of owned groups 774 @type owned_nodes: iterable of string 775 @param owned_nodes: List of owned nodes 776 @type cur_group_uuid: string or None 777 @param cur_group_uuid: Optional group UUID to check against instance's groups 778 779 """ 780 for (name, inst) in instances.items(): 781 assert owned_nodes.issuperset(inst.all_nodes), \ 782 "Instance %s's nodes changed while we kept the lock" % name 783 784 inst_groups = CheckInstanceNodeGroups(cfg, name, owned_groups) 785 786 assert cur_group_uuid is None or cur_group_uuid in inst_groups, \ 787 "Instance %s has no node in group %s" % (name, cur_group_uuid)
788 789
790 -def CheckInstanceNodeGroups(cfg, instance_name, owned_groups, 791 primary_only=False):
792 """Checks if the owned node groups are still correct for an instance. 793 794 @type cfg: L{config.ConfigWriter} 795 @param cfg: The cluster configuration 796 @type instance_name: string 797 @param instance_name: Instance name 798 @type owned_groups: set or frozenset 799 @param owned_groups: List of currently owned node groups 800 @type primary_only: boolean 801 @param primary_only: Whether to check node groups for only the primary node 802 803 """ 804 inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only) 805 806 if not owned_groups.issuperset(inst_groups): 807 raise errors.OpPrereqError("Instance %s's node groups changed since" 808 " locks were acquired, current groups are" 809 " are '%s', owning groups '%s'; retry the" 810 " operation" % 811 (instance_name, 812 utils.CommaJoin(inst_groups), 813 utils.CommaJoin(owned_groups)), 814 errors.ECODE_STATE) 815 816 return inst_groups
817 818
819 -def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
820 """Unpacks the result of change-group and node-evacuate iallocator requests. 821 822 Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and 823 L{constants.IALLOCATOR_MODE_CHG_GROUP}. 824 825 @type lu: L{LogicalUnit} 826 @param lu: Logical unit instance 827 @type alloc_result: tuple/list 828 @param alloc_result: Result from iallocator 829 @type early_release: bool 830 @param early_release: Whether to release locks early if possible 831 @type use_nodes: bool 832 @param use_nodes: Whether to display node names instead of groups 833 834 """ 835 (moved, failed, jobs) = alloc_result 836 837 if failed: 838 failreason = utils.CommaJoin("%s (%s)" % (name, reason) 839 for (name, reason) in failed) 840 lu.LogWarning("Unable to evacuate instances %s", failreason) 841 raise errors.OpExecError("Unable to evacuate instances %s" % failreason) 842 843 if moved: 844 lu.LogInfo("Instances to be moved: %s", 845 utils.CommaJoin("%s (to %s)" % 846 (name, _NodeEvacDest(use_nodes, group, nodes)) 847 for (name, group, nodes) in moved)) 848 849 return [map(compat.partial(_SetOpEarlyRelease, early_release), 850 map(opcodes.OpCode.LoadOpCode, ops)) 851 for ops in jobs]
852 853
854 -def _NodeEvacDest(use_nodes, group, nodes):
855 """Returns group or nodes depending on caller's choice. 856 857 """ 858 if use_nodes: 859 return utils.CommaJoin(nodes) 860 else: 861 return group
862 863
864 -def _SetOpEarlyRelease(early_release, op):
865 """Sets C{early_release} flag on opcodes if available. 866 867 """ 868 try: 869 op.early_release = early_release 870 except AttributeError: 871 assert not isinstance(op, opcodes.OpInstanceReplaceDisks) 872 873 return op
874 875
876 -def MapInstanceDisksToNodes(instances):
877 """Creates a map from (node, volume) to instance name. 878 879 @type instances: list of L{objects.Instance} 880 @rtype: dict; tuple of (node name, volume name) as key, instance name as value 881 882 """ 883 return dict(((node, vol), inst.name) 884 for inst in instances 885 for (node, vols) in inst.MapLVsByNode().items() 886 for vol in vols)
887 888
889 -def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
890 """Make sure that none of the given paramters is global. 891 892 If a global parameter is found, an L{errors.OpPrereqError} exception is 893 raised. This is used to avoid setting global parameters for individual nodes. 894 895 @type params: dictionary 896 @param params: Parameters to check 897 @type glob_pars: dictionary 898 @param glob_pars: Forbidden parameters 899 @type kind: string 900 @param kind: Kind of parameters (e.g. "node") 901 @type bad_levels: string 902 @param bad_levels: Level(s) at which the parameters are forbidden (e.g. 903 "instance") 904 @type good_levels: strings 905 @param good_levels: Level(s) at which the parameters are allowed (e.g. 906 "cluster or group") 907 908 """ 909 used_globals = glob_pars.intersection(params) 910 if used_globals: 911 msg = ("The following %s parameters are global and cannot" 912 " be customized at %s level, please modify them at" 913 " %s level: %s" % 914 (kind, bad_levels, good_levels, utils.CommaJoin(used_globals))) 915 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
916 917
918 -def IsExclusiveStorageEnabledNode(cfg, node):
919 """Whether exclusive_storage is in effect for the given node. 920 921 @type cfg: L{config.ConfigWriter} 922 @param cfg: The cluster configuration 923 @type node: L{objects.Node} 924 @param node: The node 925 @rtype: bool 926 @return: The effective value of exclusive_storage 927 928 """ 929 return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
930 931
932 -def CheckInstanceState(lu, instance, req_states, msg=None):
933 """Ensure that an instance is in one of the required states. 934 935 @param lu: the LU on behalf of which we make the check 936 @param instance: the instance to check 937 @param msg: if passed, should be a message to replace the default one 938 @raise errors.OpPrereqError: if the instance is not in the required state 939 940 """ 941 if msg is None: 942 msg = ("can't use instance from outside %s states" % 943 utils.CommaJoin(req_states)) 944 if instance.admin_state not in req_states: 945 raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" % 946 (instance.name, instance.admin_state, msg), 947 errors.ECODE_STATE) 948 949 if constants.ADMINST_UP not in req_states: 950 pnode = instance.primary_node 951 if not lu.cfg.GetNodeInfo(pnode).offline: 952 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] 953 ins_l.Raise("Can't contact node %s for instance information" % pnode, 954 prereq=True, ecode=errors.ECODE_ENVIRON) 955 if instance.name in ins_l.payload: 956 raise errors.OpPrereqError("Instance %s is running, %s" % 957 (instance.name, msg), errors.ECODE_STATE) 958 else: 959 lu.LogWarning("Primary node offline, ignoring check that instance" 960 " is down")
961 962
963 -def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
964 """Check the sanity of iallocator and node arguments and use the 965 cluster-wide iallocator if appropriate. 966 967 Check that at most one of (iallocator, node) is specified. If none is 968 specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT}, 969 then the LU's opcode's iallocator slot is filled with the cluster-wide 970 default iallocator. 971 972 @type iallocator_slot: string 973 @param iallocator_slot: the name of the opcode iallocator slot 974 @type node_slot: string 975 @param node_slot: the name of the opcode target node slot 976 977 """ 978 node = getattr(lu.op, node_slot, None) 979 ialloc = getattr(lu.op, iallocator_slot, None) 980 if node == []: 981 node = None 982 983 if node is not None and ialloc is not None: 984 raise errors.OpPrereqError("Do not specify both, iallocator and node", 985 errors.ECODE_INVAL) 986 elif ((node is None and ialloc is None) or 987 ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT): 988 default_iallocator = lu.cfg.GetDefaultIAllocator() 989 if default_iallocator: 990 setattr(lu.op, iallocator_slot, default_iallocator) 991 else: 992 raise errors.OpPrereqError("No iallocator or node given and no" 993 " cluster-wide default iallocator found;" 994 " please specify either an iallocator or a" 995 " node, or set a cluster-wide default" 996 " iallocator", errors.ECODE_INVAL)
997 998
999 -def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1000 faulty = [] 1001 1002 for dev in instance.disks: 1003 cfg.SetDiskID(dev, node_name) 1004 1005 result = rpc_runner.call_blockdev_getmirrorstatus(node_name, 1006 (instance.disks, 1007 instance)) 1008 result.Raise("Failed to get disk status from node %s" % node_name, 1009 prereq=prereq, ecode=errors.ECODE_ENVIRON) 1010 1011 for idx, bdev_status in enumerate(result.payload): 1012 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY: 1013 faulty.append(idx) 1014 1015 return faulty
1016 1017
1018 -def CheckNodeOnline(lu, node, msg=None):
1019 """Ensure that a given node is online. 1020 1021 @param lu: the LU on behalf of which we make the check 1022 @param node: the node to check 1023 @param msg: if passed, should be a message to replace the default one 1024 @raise errors.OpPrereqError: if the node is offline 1025 1026 """ 1027 if msg is None: 1028 msg = "Can't use offline node" 1029 if lu.cfg.GetNodeInfo(node).offline: 1030 raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
1031