Package ganeti :: Package cmdlib :: Module instance_create
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_create

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30  """Logical unit for creating a single instance.""" 
  31   
  32  import OpenSSL 
  33  import logging 
  34  import os 
  35   
  36   
  37  from ganeti import compat 
  38  from ganeti import constants 
  39  from ganeti import errors 
  40  from ganeti import hypervisor 
  41  from ganeti import locking 
  42  from ganeti.masterd import iallocator 
  43  from ganeti import masterd 
  44  from ganeti import netutils 
  45  from ganeti import objects 
  46  from ganeti import pathutils 
  47  from ganeti import utils 
  48  from ganeti import serializer 
  49   
  50  from ganeti.cmdlib.base import LogicalUnit 
  51   
  52  from ganeti.cmdlib.common import \ 
  53    CheckNodeOnline, \ 
  54    CheckParamsNotGlobal, \ 
  55    IsExclusiveStorageEnabledNode, CheckHVParams, CheckOSParams, \ 
  56    ExpandNodeUuidAndName, \ 
  57    IsValidDiskAccessModeCombination, \ 
  58    CheckDiskTemplateEnabled, CheckIAllocatorOrNode, CheckOSImage 
  59  from ganeti.cmdlib.instance_helpervm import RunWithHelperVM 
  60  from ganeti.cmdlib.instance_storage import CalculateFileStorageDir, \ 
  61    CheckNodesFreeDiskPerVG, CheckRADOSFreeSpace, CheckSpindlesExclusiveStorage, \ 
  62    ComputeDiskSizePerVG, CreateDisks, \ 
  63    GenerateDiskTemplate, CommitDisks, \ 
  64    WaitForSync, ComputeDisks, \ 
  65    ImageDisks, WipeDisks 
  66  from ganeti.cmdlib.instance_utils import \ 
  67    CheckNodeNotDrained, CopyLockList, \ 
  68    ReleaseLocks, CheckNodeVmCapable, \ 
  69    RemoveDisks, CheckNodeFreeMemory, \ 
  70    UpdateMetadata, CheckForConflictingIp, \ 
  71    ComputeInstanceCommunicationNIC, \ 
  72    ComputeIPolicyInstanceSpecViolation, \ 
  73    CheckHostnameSane, CheckOpportunisticLocking, \ 
  74    ComputeFullBeParams, ComputeNics, GetClusterDomainSecret, \ 
  75    CheckInstanceExistence, CreateInstanceAllocRequest, BuildInstanceHookEnv, \ 
  76    NICListToTuple, CheckNicsBridgesExist, CheckCompressionTool 
  77  import ganeti.masterd.instance 
  78   
  79   
80 -class LUInstanceCreate(LogicalUnit):
81 """Create an instance. 82 83 """ 84 HPATH = "instance-add" 85 HTYPE = constants.HTYPE_INSTANCE 86 REQ_BGL = False 87
88 - def _CheckDiskTemplateValid(self):
89 """Checks validity of disk template. 90 91 """ 92 cluster = self.cfg.GetClusterInfo() 93 if self.op.disk_template is None: 94 # FIXME: It would be better to take the default disk template from the 95 # ipolicy, but for the ipolicy we need the primary node, which we get from 96 # the iallocator, which wants the disk template as input. To solve this 97 # chicken-and-egg problem, it should be possible to specify just a node 98 # group from the iallocator and take the ipolicy from that. 99 self.op.disk_template = cluster.enabled_disk_templates[0] 100 CheckDiskTemplateEnabled(cluster, self.op.disk_template)
101
102 - def _CheckDiskArguments(self):
103 """Checks validity of disk-related arguments. 104 105 """ 106 # check that disk's names are unique and valid 107 utils.ValidateDeviceNames("disk", self.op.disks) 108 109 self._CheckDiskTemplateValid() 110 111 # check disks. parameter names and consistent adopt/no-adopt strategy 112 has_adopt = has_no_adopt = False 113 for disk in self.op.disks: 114 if self.op.disk_template != constants.DT_EXT: 115 utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES) 116 if constants.IDISK_ADOPT in disk: 117 has_adopt = True 118 else: 119 has_no_adopt = True 120 if has_adopt and has_no_adopt: 121 raise errors.OpPrereqError("Either all disks are adopted or none is", 122 errors.ECODE_INVAL) 123 if has_adopt: 124 if self.op.disk_template not in constants.DTS_MAY_ADOPT: 125 raise errors.OpPrereqError("Disk adoption is not supported for the" 126 " '%s' disk template" % 127 self.op.disk_template, 128 errors.ECODE_INVAL) 129 if self.op.iallocator is not None: 130 raise errors.OpPrereqError("Disk adoption not allowed with an" 131 " iallocator script", errors.ECODE_INVAL) 132 if self.op.mode == constants.INSTANCE_IMPORT: 133 raise errors.OpPrereqError("Disk adoption not allowed for" 134 " instance import", errors.ECODE_INVAL) 135 else: 136 if self.op.disk_template in constants.DTS_MUST_ADOPT: 137 raise errors.OpPrereqError("Disk template %s requires disk adoption," 138 " but no 'adopt' parameter given" % 139 self.op.disk_template, 140 errors.ECODE_INVAL) 141 142 self.adopt_disks = has_adopt
143
144 - def _CheckVLANArguments(self):
145 """ Check validity of VLANs if given 146 147 """ 148 for nic in self.op.nics: 149 vlan = nic.get(constants.INIC_VLAN, None) 150 if vlan: 151 if vlan[0] == ".": 152 # vlan starting with dot means single untagged vlan, 153 # might be followed by trunk (:) 154 if not vlan[1:].isdigit(): 155 vlanlist = vlan[1:].split(':') 156 for vl in vlanlist: 157 if not vl.isdigit(): 158 raise errors.OpPrereqError("Specified VLAN parameter is " 159 "invalid : %s" % vlan, 160 errors.ECODE_INVAL) 161 elif vlan[0] == ":": 162 # Trunk - tagged only 163 vlanlist = vlan[1:].split(':') 164 for vl in vlanlist: 165 if not vl.isdigit(): 166 raise errors.OpPrereqError("Specified VLAN parameter is invalid" 167 " : %s" % vlan, errors.ECODE_INVAL) 168 elif vlan.isdigit(): 169 # This is the simplest case. No dots, only single digit 170 # -> Create untagged access port, dot needs to be added 171 nic[constants.INIC_VLAN] = "." + vlan 172 else: 173 raise errors.OpPrereqError("Specified VLAN parameter is invalid" 174 " : %s" % vlan, errors.ECODE_INVAL)
175
176 - def CheckArguments(self):
177 """Check arguments. 178 179 """ 180 if self.op.forthcoming and self.op.commit: 181 raise errors.OpPrereqError("Forthcoming generation and commiting are" 182 " mutually exclusive", errors.ECODE_INVAL) 183 184 # do not require name_check to ease forward/backward compatibility 185 # for tools 186 if self.op.no_install and self.op.start: 187 self.LogInfo("No-installation mode selected, disabling startup") 188 self.op.start = False 189 # validate/normalize the instance name 190 self.op.instance_name = \ 191 netutils.Hostname.GetNormalizedName(self.op.instance_name) 192 193 if self.op.ip_check and not self.op.name_check: 194 # TODO: make the ip check more flexible and not depend on the name check 195 raise errors.OpPrereqError("Cannot do IP address check without a name" 196 " check", errors.ECODE_INVAL) 197 198 # instance name verification 199 if self.op.name_check: 200 self.hostname = CheckHostnameSane(self, self.op.instance_name) 201 self.op.instance_name = self.hostname.name 202 # used in CheckPrereq for ip ping check 203 self.check_ip = self.hostname.ip 204 else: 205 self.check_ip = None 206 207 # add NIC for instance communication 208 if self.op.instance_communication: 209 nic_name = ComputeInstanceCommunicationNIC(self.op.instance_name) 210 211 for nic in self.op.nics: 212 if nic.get(constants.INIC_NAME, None) == nic_name: 213 break 214 else: 215 self.op.nics.append({constants.INIC_NAME: nic_name, 216 constants.INIC_MAC: constants.VALUE_GENERATE, 217 constants.INIC_IP: constants.NIC_IP_POOL, 218 constants.INIC_NETWORK: 219 self.cfg.GetInstanceCommunicationNetwork()}) 220 221 # timeouts for unsafe OS installs 222 if self.op.helper_startup_timeout is None: 223 self.op.helper_startup_timeout = constants.HELPER_VM_STARTUP 224 225 if self.op.helper_shutdown_timeout is None: 226 self.op.helper_shutdown_timeout = constants.HELPER_VM_SHUTDOWN 227 228 # check nics' parameter names 229 for nic in self.op.nics: 230 utils.ForceDictType(nic, constants.INIC_PARAMS_TYPES) 231 # check that NIC's parameters names are unique and valid 232 utils.ValidateDeviceNames("NIC", self.op.nics) 233 234 self._CheckVLANArguments() 235 236 self._CheckDiskArguments() 237 assert self.op.disk_template is not None 238 239 # file storage checks 240 if (self.op.file_driver and 241 not self.op.file_driver in constants.FILE_DRIVER): 242 raise errors.OpPrereqError("Invalid file driver name '%s'" % 243 self.op.file_driver, errors.ECODE_INVAL) 244 245 # set default file_driver if unset and required 246 if (not self.op.file_driver and 247 self.op.disk_template in constants.DTS_FILEBASED): 248 self.op.file_driver = constants.FD_DEFAULT 249 250 ### Node/iallocator related checks 251 CheckIAllocatorOrNode(self, "iallocator", "pnode") 252 253 if self.op.pnode is not None: 254 if self.op.disk_template in constants.DTS_INT_MIRROR: 255 if self.op.snode is None: 256 raise errors.OpPrereqError("The networked disk templates need" 257 " a mirror node", errors.ECODE_INVAL) 258 elif self.op.snode: 259 self.LogWarning("Secondary node will be ignored on non-mirrored disk" 260 " template") 261 self.op.snode = None 262 263 CheckOpportunisticLocking(self.op) 264 265 if self.op.mode == constants.INSTANCE_IMPORT: 266 # On import force_variant must be True, because if we forced it at 267 # initial install, our only chance when importing it back is that it 268 # works again! 269 self.op.force_variant = True 270 271 if self.op.no_install: 272 self.LogInfo("No-installation mode has no effect during import") 273 274 if objects.GetOSImage(self.op.osparams): 275 self.LogInfo("OS image has no effect during import") 276 elif self.op.mode == constants.INSTANCE_CREATE: 277 os_image = CheckOSImage(self.op) 278 279 if self.op.os_type is None and os_image is None: 280 raise errors.OpPrereqError("No guest OS or OS image specified", 281 errors.ECODE_INVAL) 282 283 if self.op.os_type is not None \ 284 and self.op.os_type in self.cfg.GetClusterInfo().blacklisted_os: 285 raise errors.OpPrereqError("Guest OS '%s' is not allowed for" 286 " installation" % self.op.os_type, 287 errors.ECODE_STATE) 288 elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: 289 if objects.GetOSImage(self.op.osparams): 290 self.LogInfo("OS image has no effect during import") 291 292 self._cds = GetClusterDomainSecret() 293 294 # Check handshake to ensure both clusters have the same domain secret 295 src_handshake = self.op.source_handshake 296 if not src_handshake: 297 raise errors.OpPrereqError("Missing source handshake", 298 errors.ECODE_INVAL) 299 300 errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds, 301 src_handshake) 302 if errmsg: 303 raise errors.OpPrereqError("Invalid handshake: %s" % errmsg, 304 errors.ECODE_INVAL) 305 306 # Load and check source CA 307 self.source_x509_ca_pem = self.op.source_x509_ca 308 if not self.source_x509_ca_pem: 309 raise errors.OpPrereqError("Missing source X509 CA", 310 errors.ECODE_INVAL) 311 312 try: 313 (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem, 314 self._cds) 315 except OpenSSL.crypto.Error, err: 316 raise errors.OpPrereqError("Unable to load source X509 CA (%s)" % 317 (err, ), errors.ECODE_INVAL) 318 319 (errcode, msg) = utils.VerifyX509Certificate(cert, None, None) 320 if errcode is not None: 321 raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ), 322 errors.ECODE_INVAL) 323 324 self.source_x509_ca = cert 325 326 src_instance_name = self.op.source_instance_name 327 if not src_instance_name: 328 raise errors.OpPrereqError("Missing source instance name", 329 errors.ECODE_INVAL) 330 331 self.source_instance_name = \ 332 netutils.GetHostname(name=src_instance_name).name 333 334 else: 335 raise errors.OpPrereqError("Invalid instance creation mode %r" % 336 self.op.mode, errors.ECODE_INVAL)
337
338 - def ExpandNames(self):
339 """ExpandNames for CreateInstance. 340 341 Figure out the right locks for instance creation. 342 343 """ 344 self.needed_locks = {} 345 346 if self.op.commit: 347 (uuid, name) = self.cfg.ExpandInstanceName(self.op.instance_name) 348 if name is None: 349 raise errors.OpPrereqError("Instance %s unknown" % 350 self.op.instance_name, 351 errors.ECODE_INVAL) 352 self.op.instance_name = name 353 if not self.cfg.GetInstanceInfo(uuid).forthcoming: 354 raise errors.OpPrereqError("Instance %s (with uuid %s) not forthcoming" 355 " but --commit was passed." % (name, uuid), 356 errors.ECODE_STATE) 357 logging.debug("Verified that instance %s with uuid %s is forthcoming", 358 name, uuid) 359 else: 360 # this is just a preventive check, but someone might still add this 361 # instance in the meantime; we check again in CheckPrereq 362 CheckInstanceExistence(self, self.op.instance_name) 363 364 self.add_locks[locking.LEVEL_INSTANCE] = self.op.instance_name 365 366 if self.op.commit: 367 (uuid, _) = self.cfg.ExpandInstanceName(self.op.instance_name) 368 self.needed_locks[locking.LEVEL_NODE] = self.cfg.GetInstanceNodes(uuid) 369 logging.debug("Forthcoming instance %s resides on %s", uuid, 370 self.needed_locks[locking.LEVEL_NODE]) 371 elif self.op.iallocator: 372 # TODO: Find a solution to not lock all nodes in the cluster, e.g. by 373 # specifying a group on instance creation and then selecting nodes from 374 # that group 375 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 376 377 if self.op.opportunistic_locking: 378 self.opportunistic_locks[locking.LEVEL_NODE] = True 379 self.opportunistic_locks[locking.LEVEL_NODE_RES] = True 380 if self.op.disk_template == constants.DT_DRBD8: 381 self.opportunistic_locks_count[locking.LEVEL_NODE] = 2 382 self.opportunistic_locks_count[locking.LEVEL_NODE_RES] = 2 383 else: 384 (self.op.pnode_uuid, self.op.pnode) = \ 385 ExpandNodeUuidAndName(self.cfg, self.op.pnode_uuid, self.op.pnode) 386 nodelist = [self.op.pnode_uuid] 387 if self.op.snode is not None: 388 (self.op.snode_uuid, self.op.snode) = \ 389 ExpandNodeUuidAndName(self.cfg, self.op.snode_uuid, self.op.snode) 390 nodelist.append(self.op.snode_uuid) 391 self.needed_locks[locking.LEVEL_NODE] = nodelist 392 393 # in case of import lock the source node too 394 if self.op.mode == constants.INSTANCE_IMPORT: 395 src_node = self.op.src_node 396 src_path = self.op.src_path 397 398 if src_path is None: 399 self.op.src_path = src_path = self.op.instance_name 400 401 if src_node is None: 402 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 403 self.op.src_node = None 404 if os.path.isabs(src_path): 405 raise errors.OpPrereqError("Importing an instance from a path" 406 " requires a source node option", 407 errors.ECODE_INVAL) 408 else: 409 (self.op.src_node_uuid, self.op.src_node) = (_, src_node) = \ 410 ExpandNodeUuidAndName(self.cfg, self.op.src_node_uuid, src_node) 411 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: 412 self.needed_locks[locking.LEVEL_NODE].append(self.op.src_node_uuid) 413 if not os.path.isabs(src_path): 414 self.op.src_path = \ 415 utils.PathJoin(pathutils.EXPORT_DIR, src_path) 416 417 self.needed_locks[locking.LEVEL_NODE_RES] = \ 418 CopyLockList(self.needed_locks[locking.LEVEL_NODE]) 419 420 # Optimistically acquire shared group locks (we're reading the 421 # configuration). We can't just call GetInstanceNodeGroups, because the 422 # instance doesn't exist yet. Therefore we lock all node groups of all 423 # nodes we have. 424 if self.needed_locks[locking.LEVEL_NODE] == locking.ALL_SET: 425 # In the case we lock all nodes for opportunistic allocation, we have no 426 # choice than to lock all groups, because they're allocated before nodes. 427 # This is sad, but true. At least we release all those we don't need in 428 # CheckPrereq later. 429 self.needed_locks[locking.LEVEL_NODEGROUP] = locking.ALL_SET 430 else: 431 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 432 list(self.cfg.GetNodeGroupsFromNodes( 433 self.needed_locks[locking.LEVEL_NODE])) 434 self.share_locks[locking.LEVEL_NODEGROUP] = 1
435
436 - def DeclareLocks(self, level):
437 if level == locking.LEVEL_NODE_RES: 438 if self.op.opportunistic_locking: 439 self.needed_locks[locking.LEVEL_NODE_RES] = \ 440 CopyLockList(list(self.owned_locks(locking.LEVEL_NODE)))
441
442 - def _RunAllocator(self):
443 """Run the allocator based on input opcode. 444 445 """ 446 if self.op.opportunistic_locking: 447 # Only consider nodes for which a lock is held 448 node_name_whitelist = self.cfg.GetNodeNames( 449 set(self.owned_locks(locking.LEVEL_NODE)) & 450 set(self.owned_locks(locking.LEVEL_NODE_RES))) 451 else: 452 node_name_whitelist = None 453 454 req = CreateInstanceAllocRequest(self.op, self.disks, 455 self.nics, self.be_full, 456 node_name_whitelist) 457 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 458 459 ial.Run(self.op.iallocator) 460 461 if not ial.success: 462 # When opportunistic locks are used only a temporary failure is generated 463 if self.op.opportunistic_locking: 464 ecode = errors.ECODE_TEMP_NORES 465 self.LogInfo("IAllocator '%s' failed on opportunistically acquired" 466 " nodes: %s", self.op.iallocator, ial.info) 467 else: 468 ecode = errors.ECODE_NORES 469 470 raise errors.OpPrereqError("Can't compute nodes using" 471 " iallocator '%s': %s" % 472 (self.op.iallocator, ial.info), 473 ecode) 474 475 (self.op.pnode_uuid, self.op.pnode) = \ 476 ExpandNodeUuidAndName(self.cfg, None, ial.result[0]) 477 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 478 self.op.instance_name, self.op.iallocator, 479 utils.CommaJoin(ial.result)) 480 481 assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator" 482 483 if req.RequiredNodes() == 2: 484 (self.op.snode_uuid, self.op.snode) = \ 485 ExpandNodeUuidAndName(self.cfg, None, ial.result[1])
486
487 - def BuildHooksEnv(self):
488 """Build hooks env. 489 490 This runs on master, primary and secondary nodes of the instance. 491 492 """ 493 env = { 494 "ADD_MODE": self.op.mode, 495 } 496 if self.op.mode == constants.INSTANCE_IMPORT: 497 env["SRC_NODE"] = self.op.src_node 498 env["SRC_PATH"] = self.op.src_path 499 env["SRC_IMAGES"] = self.src_images 500 501 env.update(BuildInstanceHookEnv( 502 name=self.op.instance_name, 503 primary_node_name=self.op.pnode, 504 secondary_node_names=self.cfg.GetNodeNames(self.secondaries), 505 status=self.op.start, 506 os_type=self.op.os_type, 507 minmem=self.be_full[constants.BE_MINMEM], 508 maxmem=self.be_full[constants.BE_MAXMEM], 509 vcpus=self.be_full[constants.BE_VCPUS], 510 nics=NICListToTuple(self, self.nics), 511 disk_template=self.op.disk_template, 512 # Note that self.disks here is not a list with objects.Disk 513 # but with dicts as returned by ComputeDisks. 514 disks=self.disks, 515 bep=self.be_full, 516 hvp=self.hv_full, 517 hypervisor_name=self.op.hypervisor, 518 tags=self.op.tags, 519 )) 520 521 return env
522
523 - def BuildHooksNodes(self):
524 """Build hooks nodes. 525 526 """ 527 nl = [self.cfg.GetMasterNode(), self.op.pnode_uuid] + self.secondaries 528 return nl, nl
529
530 - def _ReadExportInfo(self):
531 """Reads the export information from disk. 532 533 It will override the opcode source node and path with the actual 534 information, if these two were not specified before. 535 536 @return: the export information 537 538 """ 539 assert self.op.mode == constants.INSTANCE_IMPORT 540 541 if self.op.src_node_uuid is None: 542 locked_nodes = self.owned_locks(locking.LEVEL_NODE) 543 exp_list = self.rpc.call_export_list(locked_nodes) 544 found = False 545 for node_uuid in exp_list: 546 if exp_list[node_uuid].fail_msg: 547 continue 548 if self.op.src_path in exp_list[node_uuid].payload: 549 found = True 550 self.op.src_node = self.cfg.GetNodeInfo(node_uuid).name 551 self.op.src_node_uuid = node_uuid 552 self.op.src_path = utils.PathJoin(pathutils.EXPORT_DIR, 553 self.op.src_path) 554 break 555 if not found: 556 raise errors.OpPrereqError("No export found for relative path %s" % 557 self.op.src_path, errors.ECODE_INVAL) 558 559 CheckNodeOnline(self, self.op.src_node_uuid) 560 result = self.rpc.call_export_info(self.op.src_node_uuid, self.op.src_path) 561 result.Raise("No export or invalid export found in dir %s" % 562 self.op.src_path) 563 564 export_info = objects.SerializableConfigParser.Loads(str(result.payload)) 565 if not export_info.has_section(constants.INISECT_EXP): 566 raise errors.ProgrammerError("Corrupted export config", 567 errors.ECODE_ENVIRON) 568 569 ei_version = export_info.get(constants.INISECT_EXP, "version") 570 if int(ei_version) != constants.EXPORT_VERSION: 571 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" % 572 (ei_version, constants.EXPORT_VERSION), 573 errors.ECODE_ENVIRON) 574 return export_info
575
576 - def _ReadExportParams(self, einfo):
577 """Use export parameters as defaults. 578 579 In case the opcode doesn't specify (as in override) some instance 580 parameters, then try to use them from the export information, if 581 that declares them. 582 583 """ 584 self.op.os_type = einfo.get(constants.INISECT_EXP, "os") 585 586 if not self.op.disks: 587 disks = [] 588 # TODO: import the disk iv_name too 589 for idx in range(constants.MAX_DISKS): 590 if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx): 591 disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx) 592 disk_name = einfo.get(constants.INISECT_INS, "disk%d_name" % idx) 593 disk = { 594 constants.IDISK_SIZE: disk_sz, 595 constants.IDISK_NAME: disk_name 596 } 597 disks.append(disk) 598 self.op.disks = disks 599 if not disks and self.op.disk_template != constants.DT_DISKLESS: 600 raise errors.OpPrereqError("No disk info specified and the export" 601 " is missing the disk information", 602 errors.ECODE_INVAL) 603 604 if not self.op.nics: 605 nics = [] 606 for idx in range(constants.MAX_NICS): 607 if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx): 608 ndict = {} 609 for name in [constants.INIC_IP, 610 constants.INIC_MAC, constants.INIC_NAME]: 611 nic_param_name = "nic%d_%s" % (idx, name) 612 if einfo.has_option(constants.INISECT_INS, nic_param_name): 613 v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) 614 ndict[name] = v 615 network = einfo.get(constants.INISECT_INS, 616 "nic%d_%s" % (idx, constants.INIC_NETWORK)) 617 # in case network is given link and mode are inherited 618 # from nodegroup's netparams and thus should not be passed here 619 if network: 620 ndict[constants.INIC_NETWORK] = network 621 else: 622 for name in list(constants.NICS_PARAMETERS): 623 v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) 624 ndict[name] = v 625 nics.append(ndict) 626 else: 627 break 628 self.op.nics = nics 629 630 if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"): 631 self.op.tags = einfo.get(constants.INISECT_INS, "tags").split() 632 633 if (self.op.hypervisor is None and 634 einfo.has_option(constants.INISECT_INS, "hypervisor")): 635 self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor") 636 637 if einfo.has_section(constants.INISECT_HYP): 638 # use the export parameters but do not override the ones 639 # specified by the user 640 for name, value in einfo.items(constants.INISECT_HYP): 641 if name not in self.op.hvparams: 642 self.op.hvparams[name] = value 643 644 if einfo.has_section(constants.INISECT_BEP): 645 # use the parameters, without overriding 646 for name, value in einfo.items(constants.INISECT_BEP): 647 if name not in self.op.beparams: 648 self.op.beparams[name] = value 649 # Compatibility for the old "memory" be param 650 if name == constants.BE_MEMORY: 651 if constants.BE_MAXMEM not in self.op.beparams: 652 self.op.beparams[constants.BE_MAXMEM] = value 653 if constants.BE_MINMEM not in self.op.beparams: 654 self.op.beparams[constants.BE_MINMEM] = value 655 else: 656 # try to read the parameters old style, from the main section 657 for name in constants.BES_PARAMETERS: 658 if (name not in self.op.beparams and 659 einfo.has_option(constants.INISECT_INS, name)): 660 self.op.beparams[name] = einfo.get(constants.INISECT_INS, name) 661 662 if einfo.has_section(constants.INISECT_OSP): 663 # use the parameters, without overriding 664 for name, value in einfo.items(constants.INISECT_OSP): 665 if name not in self.op.osparams: 666 self.op.osparams[name] = value 667 668 if einfo.has_section(constants.INISECT_OSP_PRIVATE): 669 # use the parameters, without overriding 670 for name, value in einfo.items(constants.INISECT_OSP_PRIVATE): 671 if name not in self.op.osparams_private: 672 self.op.osparams_private[name] = serializer.Private(value, descr=name)
673
674 - def _RevertToDefaults(self, cluster):
675 """Revert the instance parameters to the default values. 676 677 """ 678 # hvparams 679 hv_defs = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, {}) 680 for name in self.op.hvparams.keys(): 681 if name in hv_defs and hv_defs[name] == self.op.hvparams[name]: 682 del self.op.hvparams[name] 683 # beparams 684 be_defs = cluster.SimpleFillBE({}) 685 for name in self.op.beparams.keys(): 686 if name in be_defs and be_defs[name] == self.op.beparams[name]: 687 del self.op.beparams[name] 688 # nic params 689 nic_defs = cluster.SimpleFillNIC({}) 690 for nic in self.op.nics: 691 for name in constants.NICS_PARAMETERS: 692 if name in nic and name in nic_defs and nic[name] == nic_defs[name]: 693 del nic[name] 694 # osparams 695 os_defs = cluster.SimpleFillOS(self.op.os_type, {}) 696 for name in self.op.osparams.keys(): 697 if name in os_defs and os_defs[name] == self.op.osparams[name]: 698 del self.op.osparams[name] 699 700 os_defs_ = cluster.SimpleFillOS(self.op.os_type, {}, 701 os_params_private={}) 702 for name in self.op.osparams_private.keys(): 703 if name in os_defs_ and os_defs_[name] == self.op.osparams_private[name]: 704 del self.op.osparams_private[name]
705
707 """Set nodes as in the forthcoming instance 708 709 """ 710 (uuid, name) = self.cfg.ExpandInstanceName(self.op.instance_name) 711 inst = self.cfg.GetInstanceInfo(uuid) 712 self.op.pnode_uuid = inst.primary_node 713 self.op.pnode = self.cfg.GetNodeName(inst.primary_node) 714 sec_nodes = self.cfg.GetInstanceSecondaryNodes(uuid) 715 node_names = [self.op.pnode] 716 if sec_nodes: 717 self.op.snode_uuid = sec_nodes[0] 718 self.op.snode = self.cfg.GetNodeName(sec_nodes[0]) 719 node_names.append(self.op.snode) 720 self.LogInfo("Nodes of instance %s: %s", name, node_names)
721
722 - def CheckPrereq(self): # pylint: disable=R0914
723 """Check prerequisites. 724 725 """ 726 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) 727 728 if self.op.commit: 729 # Check that the instance is still on the cluster, forthcoming, and 730 # still resides on the nodes we acquired. 731 (uuid, name) = self.cfg.ExpandInstanceName(self.op.instance_name) 732 if uuid is None: 733 raise errors.OpPrereqError("Instance %s disappeared from the cluster" 734 " while waiting for locks" 735 % (self.op.instance_name,), 736 errors.ECODE_STATE) 737 if not self.cfg.GetInstanceInfo(uuid).forthcoming: 738 raise errors.OpPrereqError("Instance %s (with uuid %s) is no longer" 739 " forthcoming" % (name, uuid), 740 errors.ECODE_STATE) 741 required_nodes = self.cfg.GetInstanceNodes(uuid) 742 if not owned_nodes.issuperset(required_nodes): 743 raise errors.OpPrereqError("Forthcoming instance %s nodes changed" 744 " since locks were acquired; retry the" 745 " operation" % self.op.instance_name, 746 errors.ECODE_STATE) 747 else: 748 CheckInstanceExistence(self, self.op.instance_name) 749 750 # Check that the optimistically acquired groups are correct wrt the 751 # acquired nodes 752 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) 753 cur_groups = list(self.cfg.GetNodeGroupsFromNodes(owned_nodes)) 754 if not owned_groups.issuperset(cur_groups): 755 raise errors.OpPrereqError("New instance %s's node groups changed since" 756 " locks were acquired, current groups are" 757 " are '%s', owning groups '%s'; retry the" 758 " operation" % 759 (self.op.instance_name, 760 utils.CommaJoin(cur_groups), 761 utils.CommaJoin(owned_groups)), 762 errors.ECODE_STATE) 763 764 self.instance_file_storage_dir = CalculateFileStorageDir( 765 self.op.disk_template, self.cfg, self.op.instance_name, 766 self.op.file_storage_dir) 767 768 if self.op.mode == constants.INSTANCE_IMPORT: 769 export_info = self._ReadExportInfo() 770 self._ReadExportParams(export_info) 771 self._old_instance_name = export_info.get(constants.INISECT_INS, "name") 772 else: 773 self._old_instance_name = None 774 775 if (not self.cfg.GetVGName() and 776 self.op.disk_template not in constants.DTS_NOT_LVM): 777 raise errors.OpPrereqError("Cluster does not support lvm-based" 778 " instances", errors.ECODE_STATE) 779 780 if (self.op.hypervisor is None or 781 self.op.hypervisor == constants.VALUE_AUTO): 782 self.op.hypervisor = self.cfg.GetHypervisorType() 783 784 cluster = self.cfg.GetClusterInfo() 785 enabled_hvs = cluster.enabled_hypervisors 786 if self.op.hypervisor not in enabled_hvs: 787 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the" 788 " cluster (%s)" % 789 (self.op.hypervisor, ",".join(enabled_hvs)), 790 errors.ECODE_STATE) 791 792 # Check tag validity 793 for tag in self.op.tags: 794 objects.TaggableObject.ValidateTag(tag) 795 796 # check hypervisor parameter syntax (locally) 797 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) 798 filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, 799 self.op.hvparams) 800 hv_type = hypervisor.GetHypervisorClass(self.op.hypervisor) 801 hv_type.CheckParameterSyntax(filled_hvp) 802 self.hv_full = filled_hvp 803 # check that we don't specify global parameters on an instance 804 CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS, "hypervisor", 805 "instance", "cluster") 806 807 # fill and remember the beparams dict 808 self.be_full = ComputeFullBeParams(self.op, cluster) 809 810 # build os parameters 811 if self.op.osparams_private is None: 812 self.op.osparams_private = serializer.PrivateDict() 813 if self.op.osparams_secret is None: 814 self.op.osparams_secret = serializer.PrivateDict() 815 816 self.os_full = cluster.SimpleFillOS( 817 self.op.os_type, 818 self.op.osparams, 819 os_params_private=self.op.osparams_private, 820 os_params_secret=self.op.osparams_secret 821 ) 822 823 # now that hvp/bep are in final format, let's reset to defaults, 824 # if told to do so 825 if self.op.identify_defaults: 826 self._RevertToDefaults(cluster) 827 828 # NIC buildup 829 self.nics = ComputeNics(self.op, cluster, self.check_ip, self.cfg, 830 self.proc.GetECId()) 831 832 # disk checks/pre-build 833 default_vg = self.cfg.GetVGName() 834 self.disks = ComputeDisks(self.op.disks, self.op.disk_template, default_vg) 835 836 if self.op.mode == constants.INSTANCE_IMPORT: 837 disk_images = [] 838 for idx in range(len(self.disks)): 839 option = "disk%d_dump" % idx 840 if export_info.has_option(constants.INISECT_INS, option): 841 # FIXME: are the old os-es, disk sizes, etc. useful? 842 export_name = export_info.get(constants.INISECT_INS, option) 843 image = utils.PathJoin(self.op.src_path, export_name) 844 disk_images.append(image) 845 else: 846 disk_images.append(False) 847 848 self.src_images = disk_images 849 850 if self.op.instance_name == self._old_instance_name: 851 for idx, nic in enumerate(self.nics): 852 if nic.mac == constants.VALUE_AUTO: 853 nic_mac_ini = "nic%d_mac" % idx 854 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini) 855 856 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT 857 858 # ip ping checks (we use the same ip that was resolved in ExpandNames) 859 if self.op.ip_check: 860 if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): 861 raise errors.OpPrereqError("IP %s of instance %s already in use" % 862 (self.check_ip, self.op.instance_name), 863 errors.ECODE_NOTUNIQUE) 864 865 #### mac address generation 866 # By generating here the mac address both the allocator and the hooks get 867 # the real final mac address rather than the 'auto' or 'generate' value. 868 # There is a race condition between the generation and the instance object 869 # creation, which means that we know the mac is valid now, but we're not 870 # sure it will be when we actually add the instance. If things go bad 871 # adding the instance will abort because of a duplicate mac, and the 872 # creation job will fail. 873 for nic in self.nics: 874 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): 875 nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId()) 876 877 #### allocator run 878 879 if self.op.iallocator is not None: 880 if self.op.commit: 881 self._GetNodesFromForthcomingInstance() 882 else: 883 self._RunAllocator() 884 885 # Release all unneeded node locks 886 keep_locks = filter(None, [self.op.pnode_uuid, self.op.snode_uuid, 887 self.op.src_node_uuid]) 888 ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks) 889 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks) 890 # Release all unneeded group locks 891 ReleaseLocks(self, locking.LEVEL_NODEGROUP, 892 keep=self.cfg.GetNodeGroupsFromNodes(keep_locks)) 893 894 assert (self.owned_locks(locking.LEVEL_NODE) == 895 self.owned_locks(locking.LEVEL_NODE_RES)), \ 896 "Node locks differ from node resource locks" 897 898 #### node related checks 899 900 # check primary node 901 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode_uuid) 902 assert self.pnode is not None, \ 903 "Cannot retrieve locked node %s" % self.op.pnode_uuid 904 if pnode.offline: 905 raise errors.OpPrereqError("Cannot use offline primary node '%s'" % 906 pnode.name, errors.ECODE_STATE) 907 if pnode.drained: 908 raise errors.OpPrereqError("Cannot use drained primary node '%s'" % 909 pnode.name, errors.ECODE_STATE) 910 if not pnode.vm_capable: 911 raise errors.OpPrereqError("Cannot use non-vm_capable primary node" 912 " '%s'" % pnode.name, errors.ECODE_STATE) 913 914 self.secondaries = [] 915 916 # Fill in any IPs from IP pools. This must happen here, because we need to 917 # know the nic's primary node, as specified by the iallocator 918 for idx, nic in enumerate(self.nics): 919 net_uuid = nic.network 920 if net_uuid is not None: 921 nobj = self.cfg.GetNetwork(net_uuid) 922 netparams = self.cfg.GetGroupNetParams(net_uuid, self.pnode.uuid) 923 if netparams is None: 924 raise errors.OpPrereqError("No netparams found for network" 925 " %s. Probably not connected to" 926 " node's %s nodegroup" % 927 (nobj.name, self.pnode.name), 928 errors.ECODE_INVAL) 929 self.LogInfo("NIC/%d inherits netparams %s" % 930 (idx, netparams.values())) 931 nic.nicparams = dict(netparams) 932 if nic.ip is not None: 933 if nic.ip.lower() == constants.NIC_IP_POOL: 934 try: 935 nic.ip = self.cfg.GenerateIp(net_uuid, self.proc.GetECId()) 936 except errors.ReservationError: 937 raise errors.OpPrereqError("Unable to get a free IP for NIC %d" 938 " from the address pool" % idx, 939 errors.ECODE_STATE) 940 self.LogInfo("Chose IP %s from network %s", nic.ip, nobj.name) 941 else: 942 try: 943 self.cfg.ReserveIp(net_uuid, nic.ip, self.proc.GetECId(), 944 check=self.op.conflicts_check) 945 except errors.ReservationError: 946 raise errors.OpPrereqError("IP address %s already in use" 947 " or does not belong to network %s" % 948 (nic.ip, nobj.name), 949 errors.ECODE_NOTUNIQUE) 950 951 # net is None, ip None or given 952 elif self.op.conflicts_check: 953 CheckForConflictingIp(self, nic.ip, self.pnode.uuid) 954 955 # mirror node verification 956 if self.op.disk_template in constants.DTS_INT_MIRROR: 957 if self.op.snode_uuid == pnode.uuid: 958 raise errors.OpPrereqError("The secondary node cannot be the" 959 " primary node", errors.ECODE_INVAL) 960 CheckNodeOnline(self, self.op.snode_uuid) 961 CheckNodeNotDrained(self, self.op.snode_uuid) 962 CheckNodeVmCapable(self, self.op.snode_uuid) 963 self.secondaries.append(self.op.snode_uuid) 964 965 snode = self.cfg.GetNodeInfo(self.op.snode_uuid) 966 if pnode.group != snode.group: 967 self.LogWarning("The primary and secondary nodes are in two" 968 " different node groups; the disk parameters" 969 " from the first disk's node group will be" 970 " used") 971 972 nodes = [pnode] 973 if self.op.disk_template in constants.DTS_INT_MIRROR: 974 nodes.append(snode) 975 has_es = lambda n: IsExclusiveStorageEnabledNode(self.cfg, n) 976 excl_stor = compat.any(map(has_es, nodes)) 977 if excl_stor and not self.op.disk_template in constants.DTS_EXCL_STORAGE: 978 raise errors.OpPrereqError("Disk template %s not supported with" 979 " exclusive storage" % self.op.disk_template, 980 errors.ECODE_STATE) 981 for disk in self.disks: 982 CheckSpindlesExclusiveStorage(disk, excl_stor, True) 983 984 node_uuids = [pnode.uuid] + self.secondaries 985 986 if not self.adopt_disks: 987 if self.op.disk_template == constants.DT_RBD: 988 # _CheckRADOSFreeSpace() is just a placeholder. 989 # Any function that checks prerequisites can be placed here. 990 # Check if there is enough space on the RADOS cluster. 991 CheckRADOSFreeSpace() 992 elif self.op.disk_template == constants.DT_EXT: 993 # FIXME: Function that checks prereqs if needed 994 pass 995 elif self.op.disk_template in constants.DTS_LVM: 996 # Check lv size requirements, if not adopting 997 req_sizes = ComputeDiskSizePerVG(self.op.disk_template, self.disks) 998 CheckNodesFreeDiskPerVG(self, node_uuids, req_sizes) 999 else: 1000 # FIXME: add checks for other, non-adopting, non-lvm disk templates 1001 pass 1002 1003 elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data 1004 all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG], 1005 disk[constants.IDISK_ADOPT]) 1006 for disk in self.disks]) 1007 if len(all_lvs) != len(self.disks): 1008 raise errors.OpPrereqError("Duplicate volume names given for adoption", 1009 errors.ECODE_INVAL) 1010 for lv_name in all_lvs: 1011 try: 1012 # FIXME: lv_name here is "vg/lv" need to ensure that other calls 1013 # to ReserveLV uses the same syntax 1014 self.cfg.ReserveLV(lv_name, self.proc.GetECId()) 1015 except errors.ReservationError: 1016 raise errors.OpPrereqError("LV named %s used by another instance" % 1017 lv_name, errors.ECODE_NOTUNIQUE) 1018 1019 vg_names = self.rpc.call_vg_list([pnode.uuid])[pnode.uuid] 1020 vg_names.Raise("Cannot get VG information from node %s" % pnode.name, 1021 prereq=True) 1022 1023 node_lvs = self.rpc.call_lv_list([pnode.uuid], 1024 vg_names.payload.keys())[pnode.uuid] 1025 node_lvs.Raise("Cannot get LV information from node %s" % pnode.name, 1026 prereq=True) 1027 node_lvs = node_lvs.payload 1028 1029 delta = all_lvs.difference(node_lvs.keys()) 1030 if delta: 1031 raise errors.OpPrereqError("Missing logical volume(s): %s" % 1032 utils.CommaJoin(delta), 1033 errors.ECODE_INVAL) 1034 online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]] 1035 if online_lvs: 1036 raise errors.OpPrereqError("Online logical volumes found, cannot" 1037 " adopt: %s" % utils.CommaJoin(online_lvs), 1038 errors.ECODE_STATE) 1039 # update the size of disk based on what is found 1040 for dsk in self.disks: 1041 dsk[constants.IDISK_SIZE] = \ 1042 int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG], 1043 dsk[constants.IDISK_ADOPT])][0])) 1044 1045 elif self.op.disk_template == constants.DT_BLOCK: 1046 # Normalize and de-duplicate device paths 1047 all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT]) 1048 for disk in self.disks]) 1049 if len(all_disks) != len(self.disks): 1050 raise errors.OpPrereqError("Duplicate disk names given for adoption", 1051 errors.ECODE_INVAL) 1052 baddisks = [d for d in all_disks 1053 if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)] 1054 if baddisks: 1055 raise errors.OpPrereqError("Device node(s) %s lie outside %s and" 1056 " cannot be adopted" % 1057 (utils.CommaJoin(baddisks), 1058 constants.ADOPTABLE_BLOCKDEV_ROOT), 1059 errors.ECODE_INVAL) 1060 1061 node_disks = self.rpc.call_bdev_sizes([pnode.uuid], 1062 list(all_disks))[pnode.uuid] 1063 node_disks.Raise("Cannot get block device information from node %s" % 1064 pnode.name, prereq=True) 1065 node_disks = node_disks.payload 1066 delta = all_disks.difference(node_disks.keys()) 1067 if delta: 1068 raise errors.OpPrereqError("Missing block device(s): %s" % 1069 utils.CommaJoin(delta), 1070 errors.ECODE_INVAL) 1071 for dsk in self.disks: 1072 dsk[constants.IDISK_SIZE] = \ 1073 int(float(node_disks[dsk[constants.IDISK_ADOPT]])) 1074 1075 # Check disk access param to be compatible with specified hypervisor 1076 node_info = self.cfg.GetNodeInfo(self.op.pnode_uuid) 1077 node_group = self.cfg.GetNodeGroup(node_info.group) 1078 group_disk_params = self.cfg.GetGroupDiskParams(node_group) 1079 group_access_type = group_disk_params[self.op.disk_template].get( 1080 constants.RBD_ACCESS, constants.DISK_KERNELSPACE 1081 ) 1082 for dsk in self.disks: 1083 access_type = dsk.get(constants.IDISK_ACCESS, group_access_type) 1084 if not IsValidDiskAccessModeCombination(self.op.hypervisor, 1085 self.op.disk_template, 1086 access_type): 1087 raise errors.OpPrereqError("Selected hypervisor (%s) cannot be" 1088 " used with %s disk access param" % 1089 (self.op.hypervisor, access_type), 1090 errors.ECODE_STATE) 1091 1092 # Verify instance specs 1093 spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None) 1094 ispec = { 1095 constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None), 1096 constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None), 1097 constants.ISPEC_DISK_COUNT: len(self.disks), 1098 constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE] 1099 for disk in self.disks], 1100 constants.ISPEC_NIC_COUNT: len(self.nics), 1101 constants.ISPEC_SPINDLE_USE: spindle_use, 1102 } 1103 1104 group_info = self.cfg.GetNodeGroup(pnode.group) 1105 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info) 1106 disk_types = [self.op.disk_template] * len(self.disks) 1107 res = ComputeIPolicyInstanceSpecViolation(ipolicy, ispec, disk_types) 1108 if not self.op.ignore_ipolicy and res: 1109 msg = ("Instance allocation to group %s (%s) violates policy: %s" % 1110 (pnode.group, group_info.name, utils.CommaJoin(res))) 1111 raise errors.OpPrereqError(msg, errors.ECODE_INVAL) 1112 1113 CheckHVParams(self, node_uuids, self.op.hypervisor, self.op.hvparams) 1114 1115 CheckOSParams(self, True, node_uuids, self.op.os_type, self.os_full, 1116 self.op.force_variant) 1117 1118 CheckNicsBridgesExist(self, self.nics, self.pnode.uuid) 1119 1120 CheckCompressionTool(self, self.op.compress) 1121 1122 #TODO: _CheckExtParams (remotely) 1123 # Check parameters for extstorage 1124 1125 # memory check on primary node 1126 #TODO(dynmem): use MINMEM for checking 1127 if self.op.start: 1128 hvfull = objects.FillDict(cluster.hvparams.get(self.op.hypervisor, {}), 1129 self.op.hvparams) 1130 CheckNodeFreeMemory(self, self.pnode.uuid, 1131 "creating instance %s" % self.op.instance_name, 1132 self.be_full[constants.BE_MAXMEM], 1133 self.op.hypervisor, hvfull) 1134 1135 self.dry_run_result = list(node_uuids)
1136
1137 - def _RemoveDegradedDisks(self, feedback_fn, disk_abort, instance):
1138 """Removes degraded disks and instance. 1139 1140 It optionally checks whether disks are degraded. If the disks are 1141 degraded, they are removed and the instance is also removed from 1142 the configuration. 1143 1144 If L{disk_abort} is True, then the disks are considered degraded 1145 and removed, and the instance is removed from the configuration. 1146 1147 If L{disk_abort} is False, then it first checks whether disks are 1148 degraded and, if so, it removes the disks and the instance is 1149 removed from the configuration. 1150 1151 @type feedback_fn: callable 1152 @param feedback_fn: function used send feedback back to the caller 1153 1154 @type disk_abort: boolean 1155 @param disk_abort: 1156 True if disks are degraded, False to first check if disks are 1157 degraded 1158 @type instance: L{objects.Instance} 1159 @param instance: instance containing the disks to check 1160 1161 @rtype: NoneType 1162 @return: None 1163 @raise errors.OpPrereqError: if disks are degraded 1164 1165 """ 1166 disk_info = self.cfg.GetInstanceDisks(instance.uuid) 1167 if disk_abort: 1168 pass 1169 elif self.op.wait_for_sync: 1170 disk_abort = not WaitForSync(self, instance) 1171 elif utils.AnyDiskOfType(disk_info, constants.DTS_INT_MIRROR): 1172 # make sure the disks are not degraded (still sync-ing is ok) 1173 feedback_fn("* checking mirrors status") 1174 disk_abort = not WaitForSync(self, instance, oneshot=True) 1175 else: 1176 disk_abort = False 1177 1178 if disk_abort: 1179 RemoveDisks(self, instance) 1180 for disk_uuid in instance.disks: 1181 self.cfg.RemoveInstanceDisk(instance.uuid, disk_uuid) 1182 self.cfg.RemoveInstance(instance.uuid) 1183 raise errors.OpExecError("There are some degraded disks for" 1184 " this instance")
1185
1186 - def RunOsScripts(self, feedback_fn, iobj):
1187 """Run OS scripts 1188 1189 If necessary, disks are paused. It handles instance create, 1190 import, and remote import. 1191 1192 @type feedback_fn: callable 1193 @param feedback_fn: function used send feedback back to the caller 1194 1195 @type iobj: L{objects.Instance} 1196 @param iobj: instance object 1197 1198 """ 1199 if iobj.disks and not self.adopt_disks: 1200 disks = self.cfg.GetInstanceDisks(iobj.uuid) 1201 if self.op.mode == constants.INSTANCE_CREATE: 1202 os_image = objects.GetOSImage(self.op.osparams) 1203 1204 if os_image is None and not self.op.no_install: 1205 pause_sync = (not self.op.wait_for_sync and 1206 utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR)) 1207 if pause_sync: 1208 feedback_fn("* pausing disk sync to install instance OS") 1209 result = self.rpc.call_blockdev_pause_resume_sync(self.pnode.uuid, 1210 (disks, iobj), 1211 True) 1212 for idx, success in enumerate(result.payload): 1213 if not success: 1214 logging.warn("pause-sync of instance %s for disk %d failed", 1215 self.op.instance_name, idx) 1216 1217 feedback_fn("* running the instance OS create scripts...") 1218 # FIXME: pass debug option from opcode to backend 1219 os_add_result = \ 1220 self.rpc.call_instance_os_add(self.pnode.uuid, 1221 (iobj, self.op.osparams_secret), 1222 False, 1223 self.op.debug_level) 1224 if pause_sync: 1225 feedback_fn("* resuming disk sync") 1226 result = self.rpc.call_blockdev_pause_resume_sync(self.pnode.uuid, 1227 (disks, iobj), 1228 False) 1229 for idx, success in enumerate(result.payload): 1230 if not success: 1231 logging.warn("resume-sync of instance %s for disk %d failed", 1232 self.op.instance_name, idx) 1233 1234 os_add_result.Raise("Could not add os for instance %s" 1235 " on node %s" % (self.op.instance_name, 1236 self.pnode.name)) 1237 1238 else: 1239 if self.op.mode == constants.INSTANCE_IMPORT: 1240 feedback_fn("* running the instance OS import scripts...") 1241 1242 transfers = [] 1243 1244 for idx, image in enumerate(self.src_images): 1245 if not image: 1246 continue 1247 1248 if iobj.os: 1249 dst_io = constants.IEIO_SCRIPT 1250 dst_ioargs = ((disks[idx], iobj), idx) 1251 else: 1252 dst_io = constants.IEIO_RAW_DISK 1253 dst_ioargs = (disks[idx], iobj) 1254 1255 # FIXME: pass debug option from opcode to backend 1256 dt = masterd.instance.DiskTransfer("disk/%s" % idx, 1257 constants.IEIO_FILE, (image, ), 1258 dst_io, dst_ioargs, 1259 None) 1260 transfers.append(dt) 1261 1262 import_result = \ 1263 masterd.instance.TransferInstanceData(self, feedback_fn, 1264 self.op.src_node_uuid, 1265 self.pnode.uuid, 1266 self.pnode.secondary_ip, 1267 self.op.compress, 1268 iobj, transfers) 1269 if not compat.all(import_result): 1270 self.LogWarning("Some disks for instance %s on node %s were not" 1271 " imported successfully" % (self.op.instance_name, 1272 self.pnode.name)) 1273 1274 rename_from = self._old_instance_name 1275 1276 elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: 1277 feedback_fn("* preparing remote import...") 1278 # The source cluster will stop the instance before attempting to make 1279 # a connection. In some cases stopping an instance can take a long 1280 # time, hence the shutdown timeout is added to the connection 1281 # timeout. 1282 connect_timeout = (constants.RIE_CONNECT_TIMEOUT + 1283 self.op.source_shutdown_timeout) 1284 timeouts = masterd.instance.ImportExportTimeouts(connect_timeout) 1285 1286 assert iobj.primary_node == self.pnode.uuid 1287 disk_results = \ 1288 masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode, 1289 self.source_x509_ca, 1290 self._cds, self.op.compress, timeouts) 1291 if not compat.all(disk_results): 1292 # TODO: Should the instance still be started, even if some disks 1293 # failed to import (valid for local imports, too)? 1294 self.LogWarning("Some disks for instance %s on node %s were not" 1295 " imported successfully" % (self.op.instance_name, 1296 self.pnode.name)) 1297 1298 rename_from = self.source_instance_name 1299 1300 else: 1301 # also checked in the prereq part 1302 raise errors.ProgrammerError("Unknown OS initialization mode '%s'" 1303 % self.op.mode) 1304 1305 assert iobj.name == self.op.instance_name 1306 1307 # Run rename script on newly imported instance 1308 if iobj.os: 1309 feedback_fn("Running rename script for %s" % self.op.instance_name) 1310 result = self.rpc.call_instance_run_rename(self.pnode.uuid, iobj, 1311 rename_from, 1312 self.op.debug_level) 1313 result.Warn("Failed to run rename script for %s on node %s" % 1314 (self.op.instance_name, self.pnode.name), self.LogWarning)
1315
1316 - def GetOsInstallPackageEnvironment(self, instance, script):
1317 """Returns the OS scripts environment for the helper VM 1318 1319 @type instance: L{objects.Instance} 1320 @param instance: instance for which the OS scripts are run 1321 1322 @type script: string 1323 @param script: script to run (e.g., 1324 constants.OS_SCRIPT_CREATE_UNTRUSTED) 1325 1326 @rtype: dict of string to string 1327 @return: OS scripts environment for the helper VM 1328 1329 """ 1330 env = {"OS_SCRIPT": script} 1331 1332 # We pass only the instance's disks, not the helper VM's disks. 1333 if instance.hypervisor == constants.HT_KVM: 1334 prefix = "/dev/vd" 1335 elif instance.hypervisor in [constants.HT_XEN_PVM, constants.HT_XEN_HVM]: 1336 prefix = "/dev/xvd" 1337 else: 1338 raise errors.OpExecError("Cannot run OS scripts in a virtualized" 1339 " environment for hypervisor '%s'" 1340 % instance.hypervisor) 1341 1342 num_disks = len(self.cfg.GetInstanceDisks(instance.uuid)) 1343 1344 for idx, disk_label in enumerate(utils.GetDiskLabels(prefix, num_disks + 1, 1345 start=1)): 1346 env["DISK_%d_PATH" % idx] = disk_label 1347 1348 return env
1349
1350 - def UpdateInstanceOsInstallPackage(self, feedback_fn, instance, override_env):
1351 """Updates the OS parameter 'os-install-package' for an instance. 1352 1353 The OS install package is an archive containing an OS definition 1354 and a file containing the environment variables needed to run the 1355 OS scripts. 1356 1357 The OS install package is served by the metadata daemon to the 1358 instances, so the OS scripts can run inside the virtualized 1359 environment. 1360 1361 @type feedback_fn: callable 1362 @param feedback_fn: function used send feedback back to the caller 1363 1364 @type instance: L{objects.Instance} 1365 @param instance: instance for which the OS parameter 1366 'os-install-package' is updated 1367 1368 @type override_env: dict of string to string 1369 @param override_env: if supplied, it overrides the environment of 1370 the export OS scripts archive 1371 1372 """ 1373 if "os-install-package" in instance.osparams: 1374 feedback_fn("Using OS install package '%s'" % 1375 instance.osparams["os-install-package"]) 1376 else: 1377 result = self.rpc.call_os_export(instance.primary_node, instance, 1378 override_env) 1379 result.Raise("Could not export OS '%s'" % instance.os) 1380 instance.osparams["os-install-package"] = result.payload 1381 1382 feedback_fn("Created OS install package '%s'" % result.payload)
1383
1384 - def RunOsScriptsVirtualized(self, feedback_fn, instance):
1385 """Runs the OS scripts inside a safe virtualized environment. 1386 1387 The virtualized environment reuses the instance and temporarily 1388 creates a disk onto which the image of the helper VM is dumped. 1389 The temporary disk is used to boot the helper VM. The OS scripts 1390 are passed to the helper VM through the metadata daemon and the OS 1391 install package. 1392 1393 @type feedback_fn: callable 1394 @param feedback_fn: function used send feedback back to the caller 1395 1396 @type instance: L{objects.Instance} 1397 @param instance: instance for which the OS scripts must be run 1398 inside the virtualized environment 1399 1400 """ 1401 install_image = self.cfg.GetInstallImage() 1402 1403 if not install_image: 1404 raise errors.OpExecError("Cannot create install instance because an" 1405 " install image has not been specified") 1406 1407 env = self.GetOsInstallPackageEnvironment( 1408 instance, 1409 constants.OS_SCRIPT_CREATE_UNTRUSTED) 1410 self.UpdateInstanceOsInstallPackage(feedback_fn, instance, env) 1411 UpdateMetadata(feedback_fn, self.rpc, instance, 1412 osparams_private=self.op.osparams_private, 1413 osparams_secret=self.op.osparams_secret) 1414 1415 RunWithHelperVM(self, instance, install_image, 1416 self.op.helper_startup_timeout, 1417 self.op.helper_shutdown_timeout, 1418 log_prefix="Running OS create script", 1419 feedback_fn=feedback_fn)
1420
1421 - def Exec(self, feedback_fn):
1422 """Create and add the instance to the cluster. 1423 1424 """ 1425 assert not (self.owned_locks(locking.LEVEL_NODE_RES) - 1426 self.owned_locks(locking.LEVEL_NODE)), \ 1427 "Node locks differ from node resource locks" 1428 1429 ht_kind = self.op.hypervisor 1430 if ht_kind in constants.HTS_REQ_PORT: 1431 network_port = self.cfg.AllocatePort() 1432 else: 1433 network_port = None 1434 1435 if self.op.commit: 1436 (instance_uuid, _) = self.cfg.ExpandInstanceName(self.op.instance_name) 1437 else: 1438 instance_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId()) 1439 1440 # This is ugly but we got a chicken-egg problem here 1441 # We can only take the group disk parameters, as the instance 1442 # has no disks yet (we are generating them right here). 1443 nodegroup = self.cfg.GetNodeGroup(self.pnode.group) 1444 1445 if self.op.commit: 1446 disks = self.cfg.GetInstanceDisks(instance_uuid) 1447 CommitDisks(disks) 1448 else: 1449 disks = GenerateDiskTemplate(self, 1450 self.op.disk_template, 1451 instance_uuid, self.pnode.uuid, 1452 self.secondaries, 1453 self.disks, 1454 self.instance_file_storage_dir, 1455 self.op.file_driver, 1456 0, 1457 feedback_fn, 1458 self.cfg.GetGroupDiskParams(nodegroup), 1459 forthcoming=self.op.forthcoming) 1460 1461 if self.op.os_type is None: 1462 os_type = "" 1463 else: 1464 os_type = self.op.os_type 1465 1466 iobj = objects.Instance(name=self.op.instance_name, 1467 uuid=instance_uuid, 1468 os=os_type, 1469 primary_node=self.pnode.uuid, 1470 nics=self.nics, disks=[], 1471 disk_template=self.op.disk_template, 1472 disks_active=False, 1473 admin_state=constants.ADMINST_DOWN, 1474 admin_state_source=constants.ADMIN_SOURCE, 1475 network_port=network_port, 1476 beparams=self.op.beparams, 1477 hvparams=self.op.hvparams, 1478 hypervisor=self.op.hypervisor, 1479 osparams=self.op.osparams, 1480 osparams_private=self.op.osparams_private, 1481 forthcoming=self.op.forthcoming, 1482 ) 1483 1484 if self.op.tags: 1485 for tag in self.op.tags: 1486 iobj.AddTag(tag) 1487 1488 if self.adopt_disks: 1489 if self.op.disk_template == constants.DT_PLAIN: 1490 # rename LVs to the newly-generated names; we need to construct 1491 # 'fake' LV disks with the old data, plus the new unique_id 1492 tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks] 1493 rename_to = [] 1494 for t_dsk, a_dsk in zip(tmp_disks, self.disks): 1495 rename_to.append(t_dsk.logical_id) 1496 t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT]) 1497 result = self.rpc.call_blockdev_rename(self.pnode.uuid, 1498 zip(tmp_disks, rename_to)) 1499 result.Raise("Failed to rename adoped LVs") 1500 elif self.op.forthcoming: 1501 feedback_fn("Instance is forthcoming, not creating disks") 1502 else: 1503 feedback_fn("* creating instance disks...") 1504 try: 1505 CreateDisks(self, iobj, disks=disks) 1506 except errors.OpExecError: 1507 self.LogWarning("Device creation failed") 1508 for disk in disks: 1509 self.cfg.ReleaseDRBDMinors(disk.uuid) 1510 raise 1511 1512 feedback_fn("adding instance %s to cluster config" % self.op.instance_name) 1513 self.cfg.AddInstance(iobj, self.proc.GetECId(), replace=self.op.commit) 1514 1515 feedback_fn("adding disks to cluster config") 1516 for disk in disks: 1517 self.cfg.AddInstanceDisk(iobj.uuid, disk, replace=self.op.commit) 1518 1519 if self.op.forthcoming: 1520 feedback_fn("Instance is forthcoming; not creating the actual instance") 1521 return self.cfg.GetNodeNames(list(self.cfg.GetInstanceNodes(iobj.uuid))) 1522 1523 # re-read the instance from the configuration 1524 iobj = self.cfg.GetInstanceInfo(iobj.uuid) 1525 1526 if self.op.mode == constants.INSTANCE_IMPORT: 1527 # Release unused nodes 1528 ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node_uuid]) 1529 else: 1530 # Release all nodes 1531 ReleaseLocks(self, locking.LEVEL_NODE) 1532 1533 # Wipe disks 1534 disk_abort = False 1535 if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks: 1536 feedback_fn("* wiping instance disks...") 1537 try: 1538 WipeDisks(self, iobj) 1539 except errors.OpExecError, err: 1540 logging.exception("Wiping disks failed") 1541 self.LogWarning("Wiping instance disks failed (%s)", err) 1542 disk_abort = True 1543 1544 self._RemoveDegradedDisks(feedback_fn, disk_abort, iobj) 1545 1546 # Image disks 1547 os_image = objects.GetOSImage(iobj.osparams) 1548 disk_abort = False 1549 1550 if not self.adopt_disks and os_image is not None: 1551 feedback_fn("* imaging instance disks...") 1552 try: 1553 ImageDisks(self, iobj, os_image) 1554 except errors.OpExecError, err: 1555 logging.exception("Imaging disks failed") 1556 self.LogWarning("Imaging instance disks failed (%s)", err) 1557 disk_abort = True 1558 1559 self._RemoveDegradedDisks(feedback_fn, disk_abort, iobj) 1560 1561 # instance disks are now active 1562 iobj.disks_active = True 1563 1564 # Release all node resource locks 1565 ReleaseLocks(self, locking.LEVEL_NODE_RES) 1566 1567 if iobj.os: 1568 result = self.rpc.call_os_diagnose([iobj.primary_node])[iobj.primary_node] 1569 result.Raise("Failed to get OS '%s'" % iobj.os) 1570 1571 trusted = None 1572 1573 for (name, _, _, _, _, _, _, os_trusted) in result.payload: 1574 if name == objects.OS.GetName(iobj.os): 1575 trusted = os_trusted 1576 break 1577 1578 if trusted is None: 1579 raise errors.OpPrereqError("OS '%s' is not available in node '%s'" % 1580 (iobj.os, iobj.primary_node)) 1581 elif trusted: 1582 self.RunOsScripts(feedback_fn, iobj) 1583 else: 1584 self.RunOsScriptsVirtualized(feedback_fn, iobj) 1585 # Instance is modified by 'RunOsScriptsVirtualized', 1586 # therefore, it must be retrieved once again from the 1587 # configuration, otherwise there will be a config object 1588 # version mismatch. 1589 iobj = self.cfg.GetInstanceInfo(iobj.uuid) 1590 1591 # Update instance metadata so that it can be reached from the 1592 # metadata service. 1593 UpdateMetadata(feedback_fn, self.rpc, iobj, 1594 osparams_private=self.op.osparams_private, 1595 osparams_secret=self.op.osparams_secret) 1596 1597 assert not self.owned_locks(locking.LEVEL_NODE_RES) 1598 1599 if self.op.start: 1600 iobj.admin_state = constants.ADMINST_UP 1601 self.cfg.Update(iobj, feedback_fn) 1602 logging.info("Starting instance %s on node %s", self.op.instance_name, 1603 self.pnode.name) 1604 feedback_fn("* starting instance...") 1605 result = self.rpc.call_instance_start(self.pnode.uuid, (iobj, None, None), 1606 False, self.op.reason) 1607 result.Raise("Could not start instance") 1608 1609 return self.cfg.GetNodeNames(list(self.cfg.GetInstanceNodes(iobj.uuid)))
1610
1611 - def PrepareRetry(self, feedback_fn):
1612 # A temporary lack of resources can only happen if opportunistic locking 1613 # is used. 1614 assert self.op.opportunistic_locking 1615 1616 logging.info("Opportunistic locking did not suceed, falling back to" 1617 " full lock allocation") 1618 feedback_fn("* falling back to full lock allocation") 1619 self.op.opportunistic_locking = False
1620