Package ganeti :: Package cmdlib :: Module instance_create
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_create

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30  """Logical unit for creating a single instance.""" 
  31   
  32  import OpenSSL 
  33  import logging 
  34  import os 
  35   
  36   
  37  from ganeti import compat 
  38  from ganeti import constants 
  39  from ganeti import errors 
  40  from ganeti import hypervisor 
  41  from ganeti import locking 
  42  from ganeti.masterd import iallocator 
  43  from ganeti import masterd 
  44  from ganeti import netutils 
  45  from ganeti import objects 
  46  from ganeti import pathutils 
  47  from ganeti import utils 
  48  from ganeti.utils import retry 
  49  from ganeti import serializer 
  50   
  51  from ganeti.cmdlib.base import LogicalUnit 
  52   
  53  from ganeti.cmdlib.common import \ 
  54    CheckNodeOnline, \ 
  55    CheckParamsNotGlobal, \ 
  56    IsExclusiveStorageEnabledNode, CheckHVParams, CheckOSParams, \ 
  57    ExpandNodeUuidAndName, \ 
  58    IsValidDiskAccessModeCombination, \ 
  59    CheckDiskTemplateEnabled, CheckIAllocatorOrNode, CheckOSImage, \ 
  60    IsInstanceRunning, DetermineImageSize 
  61  from ganeti.cmdlib.instance_storage import CalculateFileStorageDir, \ 
  62    CheckNodesFreeDiskPerVG, CheckRADOSFreeSpace, CheckSpindlesExclusiveStorage, \ 
  63    ComputeDiskSizePerVG, CreateDisks, \ 
  64    GenerateDiskTemplate, CommitDisks, StartInstanceDisks, \ 
  65    WaitForSync, ComputeDisks, \ 
  66    TemporaryDisk, ImageDisks, WipeDisks 
  67  from ganeti.cmdlib.instance_utils import \ 
  68    CheckNodeNotDrained, CopyLockList, \ 
  69    ReleaseLocks, CheckNodeVmCapable, \ 
  70    RemoveDisks, CheckNodeFreeMemory, \ 
  71    UpdateMetadata, CheckForConflictingIp, \ 
  72    ComputeInstanceCommunicationNIC, \ 
  73    ComputeIPolicyInstanceSpecViolation, \ 
  74    CheckHostnameSane, CheckOpportunisticLocking, \ 
  75    ComputeFullBeParams, ComputeNics, GetClusterDomainSecret, \ 
  76    CheckInstanceExistence, CreateInstanceAllocRequest, BuildInstanceHookEnv, \ 
  77    NICListToTuple, CheckNicsBridgesExist, CheckCompressionTool 
  78  import ganeti.masterd.instance 
  79   
  80   
81 -class LUInstanceCreate(LogicalUnit):
82 """Create an instance. 83 84 """ 85 HPATH = "instance-add" 86 HTYPE = constants.HTYPE_INSTANCE 87 REQ_BGL = False 88
89 - def _CheckDiskTemplateValid(self):
90 """Checks validity of disk template. 91 92 """ 93 cluster = self.cfg.GetClusterInfo() 94 if self.op.disk_template is None: 95 # FIXME: It would be better to take the default disk template from the 96 # ipolicy, but for the ipolicy we need the primary node, which we get from 97 # the iallocator, which wants the disk template as input. To solve this 98 # chicken-and-egg problem, it should be possible to specify just a node 99 # group from the iallocator and take the ipolicy from that. 100 self.op.disk_template = cluster.enabled_disk_templates[0] 101 CheckDiskTemplateEnabled(cluster, self.op.disk_template)
102
103 - def _CheckDiskArguments(self):
104 """Checks validity of disk-related arguments. 105 106 """ 107 # check that disk's names are unique and valid 108 utils.ValidateDeviceNames("disk", self.op.disks) 109 110 self._CheckDiskTemplateValid() 111 112 # check disks. parameter names and consistent adopt/no-adopt strategy 113 has_adopt = has_no_adopt = False 114 for disk in self.op.disks: 115 if self.op.disk_template != constants.DT_EXT: 116 utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES) 117 if constants.IDISK_ADOPT in disk: 118 has_adopt = True 119 else: 120 has_no_adopt = True 121 if has_adopt and has_no_adopt: 122 raise errors.OpPrereqError("Either all disks are adopted or none is", 123 errors.ECODE_INVAL) 124 if has_adopt: 125 if self.op.disk_template not in constants.DTS_MAY_ADOPT: 126 raise errors.OpPrereqError("Disk adoption is not supported for the" 127 " '%s' disk template" % 128 self.op.disk_template, 129 errors.ECODE_INVAL) 130 if self.op.iallocator is not None: 131 raise errors.OpPrereqError("Disk adoption not allowed with an" 132 " iallocator script", errors.ECODE_INVAL) 133 if self.op.mode == constants.INSTANCE_IMPORT: 134 raise errors.OpPrereqError("Disk adoption not allowed for" 135 " instance import", errors.ECODE_INVAL) 136 else: 137 if self.op.disk_template in constants.DTS_MUST_ADOPT: 138 raise errors.OpPrereqError("Disk template %s requires disk adoption," 139 " but no 'adopt' parameter given" % 140 self.op.disk_template, 141 errors.ECODE_INVAL) 142 143 self.adopt_disks = has_adopt
144
145 - def _CheckVLANArguments(self):
146 """ Check validity of VLANs if given 147 148 """ 149 for nic in self.op.nics: 150 vlan = nic.get(constants.INIC_VLAN, None) 151 if vlan: 152 if vlan[0] == ".": 153 # vlan starting with dot means single untagged vlan, 154 # might be followed by trunk (:) 155 if not vlan[1:].isdigit(): 156 vlanlist = vlan[1:].split(':') 157 for vl in vlanlist: 158 if not vl.isdigit(): 159 raise errors.OpPrereqError("Specified VLAN parameter is " 160 "invalid : %s" % vlan, 161 errors.ECODE_INVAL) 162 elif vlan[0] == ":": 163 # Trunk - tagged only 164 vlanlist = vlan[1:].split(':') 165 for vl in vlanlist: 166 if not vl.isdigit(): 167 raise errors.OpPrereqError("Specified VLAN parameter is invalid" 168 " : %s" % vlan, errors.ECODE_INVAL) 169 elif vlan.isdigit(): 170 # This is the simplest case. No dots, only single digit 171 # -> Create untagged access port, dot needs to be added 172 nic[constants.INIC_VLAN] = "." + vlan 173 else: 174 raise errors.OpPrereqError("Specified VLAN parameter is invalid" 175 " : %s" % vlan, errors.ECODE_INVAL)
176
177 - def CheckArguments(self):
178 """Check arguments. 179 180 """ 181 if self.op.forthcoming and self.op.commit: 182 raise errors.OpPrereqError("Forthcoming generation and commiting are" 183 " mutually exclusive", errors.ECODE_INVAL) 184 185 # do not require name_check to ease forward/backward compatibility 186 # for tools 187 if self.op.no_install and self.op.start: 188 self.LogInfo("No-installation mode selected, disabling startup") 189 self.op.start = False 190 # validate/normalize the instance name 191 self.op.instance_name = \ 192 netutils.Hostname.GetNormalizedName(self.op.instance_name) 193 194 if self.op.ip_check and not self.op.name_check: 195 # TODO: make the ip check more flexible and not depend on the name check 196 raise errors.OpPrereqError("Cannot do IP address check without a name" 197 " check", errors.ECODE_INVAL) 198 199 # instance name verification 200 if self.op.name_check: 201 self.hostname = CheckHostnameSane(self, self.op.instance_name) 202 self.op.instance_name = self.hostname.name 203 # used in CheckPrereq for ip ping check 204 self.check_ip = self.hostname.ip 205 else: 206 self.check_ip = None 207 208 # add NIC for instance communication 209 if self.op.instance_communication: 210 nic_name = ComputeInstanceCommunicationNIC(self.op.instance_name) 211 212 for nic in self.op.nics: 213 if nic.get(constants.INIC_NAME, None) == nic_name: 214 break 215 else: 216 self.op.nics.append({constants.INIC_NAME: nic_name, 217 constants.INIC_MAC: constants.VALUE_GENERATE, 218 constants.INIC_IP: constants.NIC_IP_POOL, 219 constants.INIC_NETWORK: 220 self.cfg.GetInstanceCommunicationNetwork()}) 221 222 # timeouts for unsafe OS installs 223 if self.op.helper_startup_timeout is None: 224 self.op.helper_startup_timeout = constants.HELPER_VM_STARTUP 225 226 if self.op.helper_shutdown_timeout is None: 227 self.op.helper_shutdown_timeout = constants.HELPER_VM_SHUTDOWN 228 229 # check nics' parameter names 230 for nic in self.op.nics: 231 utils.ForceDictType(nic, constants.INIC_PARAMS_TYPES) 232 # check that NIC's parameters names are unique and valid 233 utils.ValidateDeviceNames("NIC", self.op.nics) 234 235 self._CheckVLANArguments() 236 237 self._CheckDiskArguments() 238 assert self.op.disk_template is not None 239 240 # file storage checks 241 if (self.op.file_driver and 242 not self.op.file_driver in constants.FILE_DRIVER): 243 raise errors.OpPrereqError("Invalid file driver name '%s'" % 244 self.op.file_driver, errors.ECODE_INVAL) 245 246 # set default file_driver if unset and required 247 if (not self.op.file_driver and 248 self.op.disk_template in constants.DTS_FILEBASED): 249 self.op.file_driver = constants.FD_DEFAULT 250 251 ### Node/iallocator related checks 252 CheckIAllocatorOrNode(self, "iallocator", "pnode") 253 254 if self.op.pnode is not None: 255 if self.op.disk_template in constants.DTS_INT_MIRROR: 256 if self.op.snode is None: 257 raise errors.OpPrereqError("The networked disk templates need" 258 " a mirror node", errors.ECODE_INVAL) 259 elif self.op.snode: 260 self.LogWarning("Secondary node will be ignored on non-mirrored disk" 261 " template") 262 self.op.snode = None 263 264 CheckOpportunisticLocking(self.op) 265 266 if self.op.mode == constants.INSTANCE_IMPORT: 267 # On import force_variant must be True, because if we forced it at 268 # initial install, our only chance when importing it back is that it 269 # works again! 270 self.op.force_variant = True 271 272 if self.op.no_install: 273 self.LogInfo("No-installation mode has no effect during import") 274 275 if objects.GetOSImage(self.op.osparams): 276 self.LogInfo("OS image has no effect during import") 277 elif self.op.mode == constants.INSTANCE_CREATE: 278 os_image = CheckOSImage(self.op) 279 280 if self.op.os_type is None and os_image is None: 281 raise errors.OpPrereqError("No guest OS or OS image specified", 282 errors.ECODE_INVAL) 283 284 if self.op.os_type is not None \ 285 and self.op.os_type in self.cfg.GetClusterInfo().blacklisted_os: 286 raise errors.OpPrereqError("Guest OS '%s' is not allowed for" 287 " installation" % self.op.os_type, 288 errors.ECODE_STATE) 289 elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: 290 if objects.GetOSImage(self.op.osparams): 291 self.LogInfo("OS image has no effect during import") 292 293 self._cds = GetClusterDomainSecret() 294 295 # Check handshake to ensure both clusters have the same domain secret 296 src_handshake = self.op.source_handshake 297 if not src_handshake: 298 raise errors.OpPrereqError("Missing source handshake", 299 errors.ECODE_INVAL) 300 301 errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds, 302 src_handshake) 303 if errmsg: 304 raise errors.OpPrereqError("Invalid handshake: %s" % errmsg, 305 errors.ECODE_INVAL) 306 307 # Load and check source CA 308 self.source_x509_ca_pem = self.op.source_x509_ca 309 if not self.source_x509_ca_pem: 310 raise errors.OpPrereqError("Missing source X509 CA", 311 errors.ECODE_INVAL) 312 313 try: 314 (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem, 315 self._cds) 316 except OpenSSL.crypto.Error, err: 317 raise errors.OpPrereqError("Unable to load source X509 CA (%s)" % 318 (err, ), errors.ECODE_INVAL) 319 320 (errcode, msg) = utils.VerifyX509Certificate(cert, None, None) 321 if errcode is not None: 322 raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ), 323 errors.ECODE_INVAL) 324 325 self.source_x509_ca = cert 326 327 src_instance_name = self.op.source_instance_name 328 if not src_instance_name: 329 raise errors.OpPrereqError("Missing source instance name", 330 errors.ECODE_INVAL) 331 332 self.source_instance_name = \ 333 netutils.GetHostname(name=src_instance_name).name 334 335 else: 336 raise errors.OpPrereqError("Invalid instance creation mode %r" % 337 self.op.mode, errors.ECODE_INVAL)
338
339 - def ExpandNames(self):
340 """ExpandNames for CreateInstance. 341 342 Figure out the right locks for instance creation. 343 344 """ 345 self.needed_locks = {} 346 347 if self.op.commit: 348 (uuid, name) = self.cfg.ExpandInstanceName(self.op.instance_name) 349 if name is None: 350 raise errors.OpPrereqError("Instance %s unknown" % 351 self.op.instance_name, 352 errors.ECODE_INVAL) 353 self.op.instance_name = name 354 if not self.cfg.GetInstanceInfo(uuid).forthcoming: 355 raise errors.OpPrereqError("Instance %s (with uuid %s) not forthcoming" 356 " but --commit was passed." % (name, uuid), 357 errors.ECODE_STATE) 358 logging.debug("Verified that instance %s with uuid %s is forthcoming", 359 name, uuid) 360 else: 361 # this is just a preventive check, but someone might still add this 362 # instance in the meantime; we check again in CheckPrereq 363 CheckInstanceExistence(self, self.op.instance_name) 364 365 self.add_locks[locking.LEVEL_INSTANCE] = self.op.instance_name 366 367 if self.op.commit: 368 (uuid, _) = self.cfg.ExpandInstanceName(self.op.instance_name) 369 self.needed_locks[locking.LEVEL_NODE] = self.cfg.GetInstanceNodes(uuid) 370 logging.debug("Forthcoming instance %s resides on %s", uuid, 371 self.needed_locks[locking.LEVEL_NODE]) 372 elif self.op.iallocator: 373 # TODO: Find a solution to not lock all nodes in the cluster, e.g. by 374 # specifying a group on instance creation and then selecting nodes from 375 # that group 376 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 377 378 if self.op.opportunistic_locking: 379 self.opportunistic_locks[locking.LEVEL_NODE] = True 380 self.opportunistic_locks[locking.LEVEL_NODE_RES] = True 381 if self.op.disk_template == constants.DT_DRBD8: 382 self.opportunistic_locks_count[locking.LEVEL_NODE] = 2 383 self.opportunistic_locks_count[locking.LEVEL_NODE_RES] = 2 384 else: 385 (self.op.pnode_uuid, self.op.pnode) = \ 386 ExpandNodeUuidAndName(self.cfg, self.op.pnode_uuid, self.op.pnode) 387 nodelist = [self.op.pnode_uuid] 388 if self.op.snode is not None: 389 (self.op.snode_uuid, self.op.snode) = \ 390 ExpandNodeUuidAndName(self.cfg, self.op.snode_uuid, self.op.snode) 391 nodelist.append(self.op.snode_uuid) 392 self.needed_locks[locking.LEVEL_NODE] = nodelist 393 394 # in case of import lock the source node too 395 if self.op.mode == constants.INSTANCE_IMPORT: 396 src_node = self.op.src_node 397 src_path = self.op.src_path 398 399 if src_path is None: 400 self.op.src_path = src_path = self.op.instance_name 401 402 if src_node is None: 403 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 404 self.op.src_node = None 405 if os.path.isabs(src_path): 406 raise errors.OpPrereqError("Importing an instance from a path" 407 " requires a source node option", 408 errors.ECODE_INVAL) 409 else: 410 (self.op.src_node_uuid, self.op.src_node) = (_, src_node) = \ 411 ExpandNodeUuidAndName(self.cfg, self.op.src_node_uuid, src_node) 412 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: 413 self.needed_locks[locking.LEVEL_NODE].append(self.op.src_node_uuid) 414 if not os.path.isabs(src_path): 415 self.op.src_path = \ 416 utils.PathJoin(pathutils.EXPORT_DIR, src_path) 417 418 self.needed_locks[locking.LEVEL_NODE_RES] = \ 419 CopyLockList(self.needed_locks[locking.LEVEL_NODE]) 420 421 # Optimistically acquire shared group locks (we're reading the 422 # configuration). We can't just call GetInstanceNodeGroups, because the 423 # instance doesn't exist yet. Therefore we lock all node groups of all 424 # nodes we have. 425 if self.needed_locks[locking.LEVEL_NODE] == locking.ALL_SET: 426 # In the case we lock all nodes for opportunistic allocation, we have no 427 # choice than to lock all groups, because they're allocated before nodes. 428 # This is sad, but true. At least we release all those we don't need in 429 # CheckPrereq later. 430 self.needed_locks[locking.LEVEL_NODEGROUP] = locking.ALL_SET 431 else: 432 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 433 list(self.cfg.GetNodeGroupsFromNodes( 434 self.needed_locks[locking.LEVEL_NODE])) 435 self.share_locks[locking.LEVEL_NODEGROUP] = 1
436
437 - def DeclareLocks(self, level):
438 if level == locking.LEVEL_NODE_RES: 439 if self.op.opportunistic_locking: 440 self.needed_locks[locking.LEVEL_NODE_RES] = \ 441 CopyLockList(list(self.owned_locks(locking.LEVEL_NODE)))
442
443 - def _RunAllocator(self):
444 """Run the allocator based on input opcode. 445 446 """ 447 if self.op.opportunistic_locking: 448 # Only consider nodes for which a lock is held 449 node_name_whitelist = self.cfg.GetNodeNames( 450 set(self.owned_locks(locking.LEVEL_NODE)) & 451 set(self.owned_locks(locking.LEVEL_NODE_RES))) 452 else: 453 node_name_whitelist = None 454 455 req = CreateInstanceAllocRequest(self.op, self.disks, 456 self.nics, self.be_full, 457 node_name_whitelist) 458 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 459 460 ial.Run(self.op.iallocator) 461 462 if not ial.success: 463 # When opportunistic locks are used only a temporary failure is generated 464 if self.op.opportunistic_locking: 465 ecode = errors.ECODE_TEMP_NORES 466 self.LogInfo("IAllocator '%s' failed on opportunistically acquired" 467 " nodes: %s", self.op.iallocator, ial.info) 468 else: 469 ecode = errors.ECODE_NORES 470 471 raise errors.OpPrereqError("Can't compute nodes using" 472 " iallocator '%s': %s" % 473 (self.op.iallocator, ial.info), 474 ecode) 475 476 (self.op.pnode_uuid, self.op.pnode) = \ 477 ExpandNodeUuidAndName(self.cfg, None, ial.result[0]) 478 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 479 self.op.instance_name, self.op.iallocator, 480 utils.CommaJoin(ial.result)) 481 482 assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator" 483 484 if req.RequiredNodes() == 2: 485 (self.op.snode_uuid, self.op.snode) = \ 486 ExpandNodeUuidAndName(self.cfg, None, ial.result[1])
487
488 - def BuildHooksEnv(self):
489 """Build hooks env. 490 491 This runs on master, primary and secondary nodes of the instance. 492 493 """ 494 env = { 495 "ADD_MODE": self.op.mode, 496 } 497 if self.op.mode == constants.INSTANCE_IMPORT: 498 env["SRC_NODE"] = self.op.src_node 499 env["SRC_PATH"] = self.op.src_path 500 env["SRC_IMAGES"] = self.src_images 501 502 env.update(BuildInstanceHookEnv( 503 name=self.op.instance_name, 504 primary_node_name=self.op.pnode, 505 secondary_node_names=self.cfg.GetNodeNames(self.secondaries), 506 status=self.op.start, 507 os_type=self.op.os_type, 508 minmem=self.be_full[constants.BE_MINMEM], 509 maxmem=self.be_full[constants.BE_MAXMEM], 510 vcpus=self.be_full[constants.BE_VCPUS], 511 nics=NICListToTuple(self, self.nics), 512 disk_template=self.op.disk_template, 513 # Note that self.disks here is not a list with objects.Disk 514 # but with dicts as returned by ComputeDisks. 515 disks=self.disks, 516 bep=self.be_full, 517 hvp=self.hv_full, 518 hypervisor_name=self.op.hypervisor, 519 tags=self.op.tags, 520 )) 521 522 return env
523
524 - def BuildHooksNodes(self):
525 """Build hooks nodes. 526 527 """ 528 nl = [self.cfg.GetMasterNode(), self.op.pnode_uuid] + self.secondaries 529 return nl, nl
530
531 - def _ReadExportInfo(self):
532 """Reads the export information from disk. 533 534 It will override the opcode source node and path with the actual 535 information, if these two were not specified before. 536 537 @return: the export information 538 539 """ 540 assert self.op.mode == constants.INSTANCE_IMPORT 541 542 if self.op.src_node_uuid is None: 543 locked_nodes = self.owned_locks(locking.LEVEL_NODE) 544 exp_list = self.rpc.call_export_list(locked_nodes) 545 found = False 546 for node_uuid in exp_list: 547 if exp_list[node_uuid].fail_msg: 548 continue 549 if self.op.src_path in exp_list[node_uuid].payload: 550 found = True 551 self.op.src_node = self.cfg.GetNodeInfo(node_uuid).name 552 self.op.src_node_uuid = node_uuid 553 self.op.src_path = utils.PathJoin(pathutils.EXPORT_DIR, 554 self.op.src_path) 555 break 556 if not found: 557 raise errors.OpPrereqError("No export found for relative path %s" % 558 self.op.src_path, errors.ECODE_INVAL) 559 560 CheckNodeOnline(self, self.op.src_node_uuid) 561 result = self.rpc.call_export_info(self.op.src_node_uuid, self.op.src_path) 562 result.Raise("No export or invalid export found in dir %s" % 563 self.op.src_path) 564 565 export_info = objects.SerializableConfigParser.Loads(str(result.payload)) 566 if not export_info.has_section(constants.INISECT_EXP): 567 raise errors.ProgrammerError("Corrupted export config", 568 errors.ECODE_ENVIRON) 569 570 ei_version = export_info.get(constants.INISECT_EXP, "version") 571 if int(ei_version) != constants.EXPORT_VERSION: 572 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" % 573 (ei_version, constants.EXPORT_VERSION), 574 errors.ECODE_ENVIRON) 575 return export_info
576
577 - def _ReadExportParams(self, einfo):
578 """Use export parameters as defaults. 579 580 In case the opcode doesn't specify (as in override) some instance 581 parameters, then try to use them from the export information, if 582 that declares them. 583 584 """ 585 self.op.os_type = einfo.get(constants.INISECT_EXP, "os") 586 587 if not self.op.disks: 588 disks = [] 589 # TODO: import the disk iv_name too 590 for idx in range(constants.MAX_DISKS): 591 if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx): 592 disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx) 593 disk_name = einfo.get(constants.INISECT_INS, "disk%d_name" % idx) 594 disk = { 595 constants.IDISK_SIZE: disk_sz, 596 constants.IDISK_NAME: disk_name 597 } 598 disks.append(disk) 599 self.op.disks = disks 600 if not disks and self.op.disk_template != constants.DT_DISKLESS: 601 raise errors.OpPrereqError("No disk info specified and the export" 602 " is missing the disk information", 603 errors.ECODE_INVAL) 604 605 if not self.op.nics: 606 nics = [] 607 for idx in range(constants.MAX_NICS): 608 if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx): 609 ndict = {} 610 for name in [constants.INIC_IP, 611 constants.INIC_MAC, constants.INIC_NAME]: 612 nic_param_name = "nic%d_%s" % (idx, name) 613 if einfo.has_option(constants.INISECT_INS, nic_param_name): 614 v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) 615 ndict[name] = v 616 network = einfo.get(constants.INISECT_INS, 617 "nic%d_%s" % (idx, constants.INIC_NETWORK)) 618 # in case network is given link and mode are inherited 619 # from nodegroup's netparams and thus should not be passed here 620 if network: 621 ndict[constants.INIC_NETWORK] = network 622 else: 623 for name in list(constants.NICS_PARAMETERS): 624 v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) 625 ndict[name] = v 626 nics.append(ndict) 627 else: 628 break 629 self.op.nics = nics 630 631 if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"): 632 self.op.tags = einfo.get(constants.INISECT_INS, "tags").split() 633 634 if (self.op.hypervisor is None and 635 einfo.has_option(constants.INISECT_INS, "hypervisor")): 636 self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor") 637 638 if einfo.has_section(constants.INISECT_HYP): 639 # use the export parameters but do not override the ones 640 # specified by the user 641 for name, value in einfo.items(constants.INISECT_HYP): 642 if name not in self.op.hvparams: 643 self.op.hvparams[name] = value 644 645 if einfo.has_section(constants.INISECT_BEP): 646 # use the parameters, without overriding 647 for name, value in einfo.items(constants.INISECT_BEP): 648 if name not in self.op.beparams: 649 self.op.beparams[name] = value 650 # Compatibility for the old "memory" be param 651 if name == constants.BE_MEMORY: 652 if constants.BE_MAXMEM not in self.op.beparams: 653 self.op.beparams[constants.BE_MAXMEM] = value 654 if constants.BE_MINMEM not in self.op.beparams: 655 self.op.beparams[constants.BE_MINMEM] = value 656 else: 657 # try to read the parameters old style, from the main section 658 for name in constants.BES_PARAMETERS: 659 if (name not in self.op.beparams and 660 einfo.has_option(constants.INISECT_INS, name)): 661 self.op.beparams[name] = einfo.get(constants.INISECT_INS, name) 662 663 if einfo.has_section(constants.INISECT_OSP): 664 # use the parameters, without overriding 665 for name, value in einfo.items(constants.INISECT_OSP): 666 if name not in self.op.osparams: 667 self.op.osparams[name] = value 668 669 if einfo.has_section(constants.INISECT_OSP_PRIVATE): 670 # use the parameters, without overriding 671 for name, value in einfo.items(constants.INISECT_OSP_PRIVATE): 672 if name not in self.op.osparams_private: 673 self.op.osparams_private[name] = serializer.Private(value, descr=name)
674
675 - def _RevertToDefaults(self, cluster):
676 """Revert the instance parameters to the default values. 677 678 """ 679 # hvparams 680 hv_defs = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, {}) 681 for name in self.op.hvparams.keys(): 682 if name in hv_defs and hv_defs[name] == self.op.hvparams[name]: 683 del self.op.hvparams[name] 684 # beparams 685 be_defs = cluster.SimpleFillBE({}) 686 for name in self.op.beparams.keys(): 687 if name in be_defs and be_defs[name] == self.op.beparams[name]: 688 del self.op.beparams[name] 689 # nic params 690 nic_defs = cluster.SimpleFillNIC({}) 691 for nic in self.op.nics: 692 for name in constants.NICS_PARAMETERS: 693 if name in nic and name in nic_defs and nic[name] == nic_defs[name]: 694 del nic[name] 695 # osparams 696 os_defs = cluster.SimpleFillOS(self.op.os_type, {}) 697 for name in self.op.osparams.keys(): 698 if name in os_defs and os_defs[name] == self.op.osparams[name]: 699 del self.op.osparams[name] 700 701 os_defs_ = cluster.SimpleFillOS(self.op.os_type, {}, 702 os_params_private={}) 703 for name in self.op.osparams_private.keys(): 704 if name in os_defs_ and os_defs_[name] == self.op.osparams_private[name]: 705 del self.op.osparams_private[name]
706
708 """Set nodes as in the forthcoming instance 709 710 """ 711 (uuid, name) = self.cfg.ExpandInstanceName(self.op.instance_name) 712 inst = self.cfg.GetInstanceInfo(uuid) 713 self.op.pnode_uuid = inst.primary_node 714 self.op.pnode = self.cfg.GetNodeName(inst.primary_node) 715 sec_nodes = self.cfg.GetInstanceSecondaryNodes(uuid) 716 node_names = [self.op.pnode] 717 if sec_nodes: 718 self.op.snode_uuid = sec_nodes[0] 719 self.op.snode = self.cfg.GetNodeName(sec_nodes[0]) 720 node_names.append(self.op.snode) 721 self.LogInfo("Nodes of instance %s: %s", name, node_names)
722
723 - def CheckPrereq(self): # pylint: disable=R0914
724 """Check prerequisites. 725 726 """ 727 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) 728 729 if self.op.commit: 730 # Check that the instance is still on the cluster, forthcoming, and 731 # still resides on the nodes we acquired. 732 (uuid, name) = self.cfg.ExpandInstanceName(self.op.instance_name) 733 if uuid is None: 734 raise errors.OpPrereqError("Instance %s disappeared from the cluster" 735 " while waiting for locks" 736 % (self.op.instance_name,), 737 errors.ECODE_STATE) 738 if not self.cfg.GetInstanceInfo(uuid).forthcoming: 739 raise errors.OpPrereqError("Instance %s (with uuid %s) is no longer" 740 " forthcoming" % (name, uuid), 741 errors.ECODE_STATE) 742 required_nodes = self.cfg.GetInstanceNodes(uuid) 743 if not owned_nodes.issuperset(required_nodes): 744 raise errors.OpPrereqError("Forthcoming instance %s nodes changed" 745 " since locks were acquired; retry the" 746 " operation" % self.op.instance_name, 747 errors.ECODE_STATE) 748 else: 749 CheckInstanceExistence(self, self.op.instance_name) 750 751 # Check that the optimistically acquired groups are correct wrt the 752 # acquired nodes 753 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) 754 cur_groups = list(self.cfg.GetNodeGroupsFromNodes(owned_nodes)) 755 if not owned_groups.issuperset(cur_groups): 756 raise errors.OpPrereqError("New instance %s's node groups changed since" 757 " locks were acquired, current groups are" 758 " are '%s', owning groups '%s'; retry the" 759 " operation" % 760 (self.op.instance_name, 761 utils.CommaJoin(cur_groups), 762 utils.CommaJoin(owned_groups)), 763 errors.ECODE_STATE) 764 765 self.instance_file_storage_dir = CalculateFileStorageDir( 766 self.op.disk_template, self.cfg, self.op.instance_name, 767 self.op.file_storage_dir) 768 769 if self.op.mode == constants.INSTANCE_IMPORT: 770 export_info = self._ReadExportInfo() 771 self._ReadExportParams(export_info) 772 self._old_instance_name = export_info.get(constants.INISECT_INS, "name") 773 else: 774 self._old_instance_name = None 775 776 if (not self.cfg.GetVGName() and 777 self.op.disk_template not in constants.DTS_NOT_LVM): 778 raise errors.OpPrereqError("Cluster does not support lvm-based" 779 " instances", errors.ECODE_STATE) 780 781 if (self.op.hypervisor is None or 782 self.op.hypervisor == constants.VALUE_AUTO): 783 self.op.hypervisor = self.cfg.GetHypervisorType() 784 785 cluster = self.cfg.GetClusterInfo() 786 enabled_hvs = cluster.enabled_hypervisors 787 if self.op.hypervisor not in enabled_hvs: 788 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the" 789 " cluster (%s)" % 790 (self.op.hypervisor, ",".join(enabled_hvs)), 791 errors.ECODE_STATE) 792 793 # Check tag validity 794 for tag in self.op.tags: 795 objects.TaggableObject.ValidateTag(tag) 796 797 # check hypervisor parameter syntax (locally) 798 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) 799 filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, 800 self.op.hvparams) 801 hv_type = hypervisor.GetHypervisorClass(self.op.hypervisor) 802 hv_type.CheckParameterSyntax(filled_hvp) 803 self.hv_full = filled_hvp 804 # check that we don't specify global parameters on an instance 805 CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS, "hypervisor", 806 "instance", "cluster") 807 808 # fill and remember the beparams dict 809 self.be_full = ComputeFullBeParams(self.op, cluster) 810 811 # build os parameters 812 if self.op.osparams_private is None: 813 self.op.osparams_private = serializer.PrivateDict() 814 if self.op.osparams_secret is None: 815 self.op.osparams_secret = serializer.PrivateDict() 816 817 self.os_full = cluster.SimpleFillOS( 818 self.op.os_type, 819 self.op.osparams, 820 os_params_private=self.op.osparams_private, 821 os_params_secret=self.op.osparams_secret 822 ) 823 824 # now that hvp/bep are in final format, let's reset to defaults, 825 # if told to do so 826 if self.op.identify_defaults: 827 self._RevertToDefaults(cluster) 828 829 # NIC buildup 830 self.nics = ComputeNics(self.op, cluster, self.check_ip, self.cfg, 831 self.proc.GetECId()) 832 833 # disk checks/pre-build 834 default_vg = self.cfg.GetVGName() 835 self.disks = ComputeDisks(self.op.disks, self.op.disk_template, default_vg) 836 837 if self.op.mode == constants.INSTANCE_IMPORT: 838 disk_images = [] 839 for idx in range(len(self.disks)): 840 option = "disk%d_dump" % idx 841 if export_info.has_option(constants.INISECT_INS, option): 842 # FIXME: are the old os-es, disk sizes, etc. useful? 843 export_name = export_info.get(constants.INISECT_INS, option) 844 image = utils.PathJoin(self.op.src_path, export_name) 845 disk_images.append(image) 846 else: 847 disk_images.append(False) 848 849 self.src_images = disk_images 850 851 if self.op.instance_name == self._old_instance_name: 852 for idx, nic in enumerate(self.nics): 853 if nic.mac == constants.VALUE_AUTO: 854 nic_mac_ini = "nic%d_mac" % idx 855 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini) 856 857 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT 858 859 # ip ping checks (we use the same ip that was resolved in ExpandNames) 860 if self.op.ip_check: 861 if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): 862 raise errors.OpPrereqError("IP %s of instance %s already in use" % 863 (self.check_ip, self.op.instance_name), 864 errors.ECODE_NOTUNIQUE) 865 866 #### mac address generation 867 # By generating here the mac address both the allocator and the hooks get 868 # the real final mac address rather than the 'auto' or 'generate' value. 869 # There is a race condition between the generation and the instance object 870 # creation, which means that we know the mac is valid now, but we're not 871 # sure it will be when we actually add the instance. If things go bad 872 # adding the instance will abort because of a duplicate mac, and the 873 # creation job will fail. 874 for nic in self.nics: 875 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): 876 nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId()) 877 878 #### allocator run 879 880 if self.op.iallocator is not None: 881 if self.op.commit: 882 self._GetNodesFromForthcomingInstance() 883 else: 884 self._RunAllocator() 885 886 # Release all unneeded node locks 887 keep_locks = filter(None, [self.op.pnode_uuid, self.op.snode_uuid, 888 self.op.src_node_uuid]) 889 ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks) 890 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks) 891 # Release all unneeded group locks 892 ReleaseLocks(self, locking.LEVEL_NODEGROUP, 893 keep=self.cfg.GetNodeGroupsFromNodes(keep_locks)) 894 895 assert (self.owned_locks(locking.LEVEL_NODE) == 896 self.owned_locks(locking.LEVEL_NODE_RES)), \ 897 "Node locks differ from node resource locks" 898 899 #### node related checks 900 901 # check primary node 902 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode_uuid) 903 assert self.pnode is not None, \ 904 "Cannot retrieve locked node %s" % self.op.pnode_uuid 905 if pnode.offline: 906 raise errors.OpPrereqError("Cannot use offline primary node '%s'" % 907 pnode.name, errors.ECODE_STATE) 908 if pnode.drained: 909 raise errors.OpPrereqError("Cannot use drained primary node '%s'" % 910 pnode.name, errors.ECODE_STATE) 911 if not pnode.vm_capable: 912 raise errors.OpPrereqError("Cannot use non-vm_capable primary node" 913 " '%s'" % pnode.name, errors.ECODE_STATE) 914 915 self.secondaries = [] 916 917 # Fill in any IPs from IP pools. This must happen here, because we need to 918 # know the nic's primary node, as specified by the iallocator 919 for idx, nic in enumerate(self.nics): 920 net_uuid = nic.network 921 if net_uuid is not None: 922 nobj = self.cfg.GetNetwork(net_uuid) 923 netparams = self.cfg.GetGroupNetParams(net_uuid, self.pnode.uuid) 924 if netparams is None: 925 raise errors.OpPrereqError("No netparams found for network" 926 " %s. Probably not connected to" 927 " node's %s nodegroup" % 928 (nobj.name, self.pnode.name), 929 errors.ECODE_INVAL) 930 self.LogInfo("NIC/%d inherits netparams %s" % 931 (idx, netparams.values())) 932 nic.nicparams = dict(netparams) 933 if nic.ip is not None: 934 if nic.ip.lower() == constants.NIC_IP_POOL: 935 try: 936 nic.ip = self.cfg.GenerateIp(net_uuid, self.proc.GetECId()) 937 except errors.ReservationError: 938 raise errors.OpPrereqError("Unable to get a free IP for NIC %d" 939 " from the address pool" % idx, 940 errors.ECODE_STATE) 941 self.LogInfo("Chose IP %s from network %s", nic.ip, nobj.name) 942 else: 943 try: 944 self.cfg.ReserveIp(net_uuid, nic.ip, self.proc.GetECId(), 945 check=self.op.conflicts_check) 946 except errors.ReservationError: 947 raise errors.OpPrereqError("IP address %s already in use" 948 " or does not belong to network %s" % 949 (nic.ip, nobj.name), 950 errors.ECODE_NOTUNIQUE) 951 952 # net is None, ip None or given 953 elif self.op.conflicts_check: 954 CheckForConflictingIp(self, nic.ip, self.pnode.uuid) 955 956 # mirror node verification 957 if self.op.disk_template in constants.DTS_INT_MIRROR: 958 if self.op.snode_uuid == pnode.uuid: 959 raise errors.OpPrereqError("The secondary node cannot be the" 960 " primary node", errors.ECODE_INVAL) 961 CheckNodeOnline(self, self.op.snode_uuid) 962 CheckNodeNotDrained(self, self.op.snode_uuid) 963 CheckNodeVmCapable(self, self.op.snode_uuid) 964 self.secondaries.append(self.op.snode_uuid) 965 966 snode = self.cfg.GetNodeInfo(self.op.snode_uuid) 967 if pnode.group != snode.group: 968 self.LogWarning("The primary and secondary nodes are in two" 969 " different node groups; the disk parameters" 970 " from the first disk's node group will be" 971 " used") 972 973 nodes = [pnode] 974 if self.op.disk_template in constants.DTS_INT_MIRROR: 975 nodes.append(snode) 976 has_es = lambda n: IsExclusiveStorageEnabledNode(self.cfg, n) 977 excl_stor = compat.any(map(has_es, nodes)) 978 if excl_stor and not self.op.disk_template in constants.DTS_EXCL_STORAGE: 979 raise errors.OpPrereqError("Disk template %s not supported with" 980 " exclusive storage" % self.op.disk_template, 981 errors.ECODE_STATE) 982 for disk in self.disks: 983 CheckSpindlesExclusiveStorage(disk, excl_stor, True) 984 985 node_uuids = [pnode.uuid] + self.secondaries 986 987 if not self.adopt_disks: 988 if self.op.disk_template == constants.DT_RBD: 989 # _CheckRADOSFreeSpace() is just a placeholder. 990 # Any function that checks prerequisites can be placed here. 991 # Check if there is enough space on the RADOS cluster. 992 CheckRADOSFreeSpace() 993 elif self.op.disk_template == constants.DT_EXT: 994 # FIXME: Function that checks prereqs if needed 995 pass 996 elif self.op.disk_template in constants.DTS_LVM: 997 # Check lv size requirements, if not adopting 998 req_sizes = ComputeDiskSizePerVG(self.op.disk_template, self.disks) 999 CheckNodesFreeDiskPerVG(self, node_uuids, req_sizes) 1000 else: 1001 # FIXME: add checks for other, non-adopting, non-lvm disk templates 1002 pass 1003 1004 elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data 1005 all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG], 1006 disk[constants.IDISK_ADOPT]) 1007 for disk in self.disks]) 1008 if len(all_lvs) != len(self.disks): 1009 raise errors.OpPrereqError("Duplicate volume names given for adoption", 1010 errors.ECODE_INVAL) 1011 for lv_name in all_lvs: 1012 try: 1013 # FIXME: lv_name here is "vg/lv" need to ensure that other calls 1014 # to ReserveLV uses the same syntax 1015 self.cfg.ReserveLV(lv_name, self.proc.GetECId()) 1016 except errors.ReservationError: 1017 raise errors.OpPrereqError("LV named %s used by another instance" % 1018 lv_name, errors.ECODE_NOTUNIQUE) 1019 1020 vg_names = self.rpc.call_vg_list([pnode.uuid])[pnode.uuid] 1021 vg_names.Raise("Cannot get VG information from node %s" % pnode.name, 1022 prereq=True) 1023 1024 node_lvs = self.rpc.call_lv_list([pnode.uuid], 1025 vg_names.payload.keys())[pnode.uuid] 1026 node_lvs.Raise("Cannot get LV information from node %s" % pnode.name, 1027 prereq=True) 1028 node_lvs = node_lvs.payload 1029 1030 delta = all_lvs.difference(node_lvs.keys()) 1031 if delta: 1032 raise errors.OpPrereqError("Missing logical volume(s): %s" % 1033 utils.CommaJoin(delta), 1034 errors.ECODE_INVAL) 1035 online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]] 1036 if online_lvs: 1037 raise errors.OpPrereqError("Online logical volumes found, cannot" 1038 " adopt: %s" % utils.CommaJoin(online_lvs), 1039 errors.ECODE_STATE) 1040 # update the size of disk based on what is found 1041 for dsk in self.disks: 1042 dsk[constants.IDISK_SIZE] = \ 1043 int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG], 1044 dsk[constants.IDISK_ADOPT])][0])) 1045 1046 elif self.op.disk_template == constants.DT_BLOCK: 1047 # Normalize and de-duplicate device paths 1048 all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT]) 1049 for disk in self.disks]) 1050 if len(all_disks) != len(self.disks): 1051 raise errors.OpPrereqError("Duplicate disk names given for adoption", 1052 errors.ECODE_INVAL) 1053 baddisks = [d for d in all_disks 1054 if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)] 1055 if baddisks: 1056 raise errors.OpPrereqError("Device node(s) %s lie outside %s and" 1057 " cannot be adopted" % 1058 (utils.CommaJoin(baddisks), 1059 constants.ADOPTABLE_BLOCKDEV_ROOT), 1060 errors.ECODE_INVAL) 1061 1062 node_disks = self.rpc.call_bdev_sizes([pnode.uuid], 1063 list(all_disks))[pnode.uuid] 1064 node_disks.Raise("Cannot get block device information from node %s" % 1065 pnode.name, prereq=True) 1066 node_disks = node_disks.payload 1067 delta = all_disks.difference(node_disks.keys()) 1068 if delta: 1069 raise errors.OpPrereqError("Missing block device(s): %s" % 1070 utils.CommaJoin(delta), 1071 errors.ECODE_INVAL) 1072 for dsk in self.disks: 1073 dsk[constants.IDISK_SIZE] = \ 1074 int(float(node_disks[dsk[constants.IDISK_ADOPT]])) 1075 1076 # Check disk access param to be compatible with specified hypervisor 1077 node_info = self.cfg.GetNodeInfo(self.op.pnode_uuid) 1078 node_group = self.cfg.GetNodeGroup(node_info.group) 1079 group_disk_params = self.cfg.GetGroupDiskParams(node_group) 1080 group_access_type = group_disk_params[self.op.disk_template].get( 1081 constants.RBD_ACCESS, constants.DISK_KERNELSPACE 1082 ) 1083 for dsk in self.disks: 1084 access_type = dsk.get(constants.IDISK_ACCESS, group_access_type) 1085 if not IsValidDiskAccessModeCombination(self.op.hypervisor, 1086 self.op.disk_template, 1087 access_type): 1088 raise errors.OpPrereqError("Selected hypervisor (%s) cannot be" 1089 " used with %s disk access param" % 1090 (self.op.hypervisor, access_type), 1091 errors.ECODE_STATE) 1092 1093 # Verify instance specs 1094 spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None) 1095 ispec = { 1096 constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None), 1097 constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None), 1098 constants.ISPEC_DISK_COUNT: len(self.disks), 1099 constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE] 1100 for disk in self.disks], 1101 constants.ISPEC_NIC_COUNT: len(self.nics), 1102 constants.ISPEC_SPINDLE_USE: spindle_use, 1103 } 1104 1105 group_info = self.cfg.GetNodeGroup(pnode.group) 1106 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info) 1107 disk_types = [self.op.disk_template] * len(self.disks) 1108 res = ComputeIPolicyInstanceSpecViolation(ipolicy, ispec, disk_types) 1109 if not self.op.ignore_ipolicy and res: 1110 msg = ("Instance allocation to group %s (%s) violates policy: %s" % 1111 (pnode.group, group_info.name, utils.CommaJoin(res))) 1112 raise errors.OpPrereqError(msg, errors.ECODE_INVAL) 1113 1114 CheckHVParams(self, node_uuids, self.op.hypervisor, self.op.hvparams) 1115 1116 CheckOSParams(self, True, node_uuids, self.op.os_type, self.os_full, 1117 self.op.force_variant) 1118 1119 CheckNicsBridgesExist(self, self.nics, self.pnode.uuid) 1120 1121 CheckCompressionTool(self, self.op.compress) 1122 1123 #TODO: _CheckExtParams (remotely) 1124 # Check parameters for extstorage 1125 1126 # memory check on primary node 1127 #TODO(dynmem): use MINMEM for checking 1128 if self.op.start: 1129 hvfull = objects.FillDict(cluster.hvparams.get(self.op.hypervisor, {}), 1130 self.op.hvparams) 1131 CheckNodeFreeMemory(self, self.pnode.uuid, 1132 "creating instance %s" % self.op.instance_name, 1133 self.be_full[constants.BE_MAXMEM], 1134 self.op.hypervisor, hvfull) 1135 1136 self.dry_run_result = list(node_uuids)
1137
1138 - def _RemoveDegradedDisks(self, feedback_fn, disk_abort, instance):
1139 """Removes degraded disks and instance. 1140 1141 It optionally checks whether disks are degraded. If the disks are 1142 degraded, they are removed and the instance is also removed from 1143 the configuration. 1144 1145 If L{disk_abort} is True, then the disks are considered degraded 1146 and removed, and the instance is removed from the configuration. 1147 1148 If L{disk_abort} is False, then it first checks whether disks are 1149 degraded and, if so, it removes the disks and the instance is 1150 removed from the configuration. 1151 1152 @type feedback_fn: callable 1153 @param feedback_fn: function used send feedback back to the caller 1154 1155 @type disk_abort: boolean 1156 @param disk_abort: 1157 True if disks are degraded, False to first check if disks are 1158 degraded 1159 @type instance: L{objects.Instance} 1160 @param instance: instance containing the disks to check 1161 1162 @rtype: NoneType 1163 @return: None 1164 @raise errors.OpPrereqError: if disks are degraded 1165 1166 """ 1167 disk_info = self.cfg.GetInstanceDisks(instance.uuid) 1168 if disk_abort: 1169 pass 1170 elif self.op.wait_for_sync: 1171 disk_abort = not WaitForSync(self, instance) 1172 elif utils.AnyDiskOfType(disk_info, constants.DTS_INT_MIRROR): 1173 # make sure the disks are not degraded (still sync-ing is ok) 1174 feedback_fn("* checking mirrors status") 1175 disk_abort = not WaitForSync(self, instance, oneshot=True) 1176 else: 1177 disk_abort = False 1178 1179 if disk_abort: 1180 RemoveDisks(self, instance) 1181 for disk_uuid in instance.disks: 1182 self.cfg.RemoveInstanceDisk(instance.uuid, disk_uuid) 1183 self.cfg.RemoveInstance(instance.uuid) 1184 raise errors.OpExecError("There are some degraded disks for" 1185 " this instance")
1186
1187 - def RunOsScripts(self, feedback_fn, iobj):
1188 """Run OS scripts 1189 1190 If necessary, disks are paused. It handles instance create, 1191 import, and remote import. 1192 1193 @type feedback_fn: callable 1194 @param feedback_fn: function used send feedback back to the caller 1195 1196 @type iobj: L{objects.Instance} 1197 @param iobj: instance object 1198 1199 """ 1200 if iobj.disks and not self.adopt_disks: 1201 disks = self.cfg.GetInstanceDisks(iobj.uuid) 1202 if self.op.mode == constants.INSTANCE_CREATE: 1203 os_image = objects.GetOSImage(self.op.osparams) 1204 1205 if os_image is None and not self.op.no_install: 1206 pause_sync = (not self.op.wait_for_sync and 1207 utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR)) 1208 if pause_sync: 1209 feedback_fn("* pausing disk sync to install instance OS") 1210 result = self.rpc.call_blockdev_pause_resume_sync(self.pnode.uuid, 1211 (disks, iobj), 1212 True) 1213 for idx, success in enumerate(result.payload): 1214 if not success: 1215 logging.warn("pause-sync of instance %s for disk %d failed", 1216 self.op.instance_name, idx) 1217 1218 feedback_fn("* running the instance OS create scripts...") 1219 # FIXME: pass debug option from opcode to backend 1220 os_add_result = \ 1221 self.rpc.call_instance_os_add(self.pnode.uuid, 1222 (iobj, self.op.osparams_secret), 1223 False, 1224 self.op.debug_level) 1225 if pause_sync: 1226 feedback_fn("* resuming disk sync") 1227 result = self.rpc.call_blockdev_pause_resume_sync(self.pnode.uuid, 1228 (disks, iobj), 1229 False) 1230 for idx, success in enumerate(result.payload): 1231 if not success: 1232 logging.warn("resume-sync of instance %s for disk %d failed", 1233 self.op.instance_name, idx) 1234 1235 os_add_result.Raise("Could not add os for instance %s" 1236 " on node %s" % (self.op.instance_name, 1237 self.pnode.name)) 1238 1239 else: 1240 if self.op.mode == constants.INSTANCE_IMPORT: 1241 feedback_fn("* running the instance OS import scripts...") 1242 1243 transfers = [] 1244 1245 for idx, image in enumerate(self.src_images): 1246 if not image: 1247 continue 1248 1249 if iobj.os: 1250 dst_io = constants.IEIO_SCRIPT 1251 dst_ioargs = ((disks[idx], iobj), idx) 1252 else: 1253 dst_io = constants.IEIO_RAW_DISK 1254 dst_ioargs = (disks[idx], iobj) 1255 1256 # FIXME: pass debug option from opcode to backend 1257 dt = masterd.instance.DiskTransfer("disk/%s" % idx, 1258 constants.IEIO_FILE, (image, ), 1259 dst_io, dst_ioargs, 1260 None) 1261 transfers.append(dt) 1262 1263 import_result = \ 1264 masterd.instance.TransferInstanceData(self, feedback_fn, 1265 self.op.src_node_uuid, 1266 self.pnode.uuid, 1267 self.pnode.secondary_ip, 1268 self.op.compress, 1269 iobj, transfers) 1270 if not compat.all(import_result): 1271 self.LogWarning("Some disks for instance %s on node %s were not" 1272 " imported successfully" % (self.op.instance_name, 1273 self.pnode.name)) 1274 1275 rename_from = self._old_instance_name 1276 1277 elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: 1278 feedback_fn("* preparing remote import...") 1279 # The source cluster will stop the instance before attempting to make 1280 # a connection. In some cases stopping an instance can take a long 1281 # time, hence the shutdown timeout is added to the connection 1282 # timeout. 1283 connect_timeout = (constants.RIE_CONNECT_TIMEOUT + 1284 self.op.source_shutdown_timeout) 1285 timeouts = masterd.instance.ImportExportTimeouts(connect_timeout) 1286 1287 assert iobj.primary_node == self.pnode.uuid 1288 disk_results = \ 1289 masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode, 1290 self.source_x509_ca, 1291 self._cds, self.op.compress, timeouts) 1292 if not compat.all(disk_results): 1293 # TODO: Should the instance still be started, even if some disks 1294 # failed to import (valid for local imports, too)? 1295 self.LogWarning("Some disks for instance %s on node %s were not" 1296 " imported successfully" % (self.op.instance_name, 1297 self.pnode.name)) 1298 1299 rename_from = self.source_instance_name 1300 1301 else: 1302 # also checked in the prereq part 1303 raise errors.ProgrammerError("Unknown OS initialization mode '%s'" 1304 % self.op.mode) 1305 1306 assert iobj.name == self.op.instance_name 1307 1308 # Run rename script on newly imported instance 1309 if iobj.os: 1310 feedback_fn("Running rename script for %s" % self.op.instance_name) 1311 result = self.rpc.call_instance_run_rename(self.pnode.uuid, iobj, 1312 rename_from, 1313 self.op.debug_level) 1314 result.Warn("Failed to run rename script for %s on node %s" % 1315 (self.op.instance_name, self.pnode.name), self.LogWarning)
1316
1317 - def GetOsInstallPackageEnvironment(self, instance, script):
1318 """Returns the OS scripts environment for the helper VM 1319 1320 @type instance: L{objects.Instance} 1321 @param instance: instance for which the OS scripts are run 1322 1323 @type script: string 1324 @param script: script to run (e.g., 1325 constants.OS_SCRIPT_CREATE_UNTRUSTED) 1326 1327 @rtype: dict of string to string 1328 @return: OS scripts environment for the helper VM 1329 1330 """ 1331 env = {"OS_SCRIPT": script} 1332 1333 # We pass only the instance's disks, not the helper VM's disks. 1334 if instance.hypervisor == constants.HT_KVM: 1335 prefix = "/dev/vd" 1336 elif instance.hypervisor in [constants.HT_XEN_PVM, constants.HT_XEN_HVM]: 1337 prefix = "/dev/xvd" 1338 else: 1339 raise errors.OpExecError("Cannot run OS scripts in a virtualized" 1340 " environment for hypervisor '%s'" 1341 % instance.hypervisor) 1342 1343 num_disks = len(self.cfg.GetInstanceDisks(instance.uuid)) 1344 1345 for idx, disk_label in enumerate(utils.GetDiskLabels(prefix, num_disks + 1, 1346 start=1)): 1347 env["DISK_%d_PATH" % idx] = disk_label 1348 1349 return env
1350
1351 - def UpdateInstanceOsInstallPackage(self, feedback_fn, instance, override_env):
1352 """Updates the OS parameter 'os-install-package' for an instance. 1353 1354 The OS install package is an archive containing an OS definition 1355 and a file containing the environment variables needed to run the 1356 OS scripts. 1357 1358 The OS install package is served by the metadata daemon to the 1359 instances, so the OS scripts can run inside the virtualized 1360 environment. 1361 1362 @type feedback_fn: callable 1363 @param feedback_fn: function used send feedback back to the caller 1364 1365 @type instance: L{objects.Instance} 1366 @param instance: instance for which the OS parameter 1367 'os-install-package' is updated 1368 1369 @type override_env: dict of string to string 1370 @param override_env: if supplied, it overrides the environment of 1371 the export OS scripts archive 1372 1373 """ 1374 if "os-install-package" in instance.osparams: 1375 feedback_fn("Using OS install package '%s'" % 1376 instance.osparams["os-install-package"]) 1377 else: 1378 result = self.rpc.call_os_export(instance.primary_node, instance, 1379 override_env) 1380 result.Raise("Could not export OS '%s'" % instance.os) 1381 instance.osparams["os-install-package"] = result.payload 1382 1383 feedback_fn("Created OS install package '%s'" % result.payload)
1384
1385 - def RunOsScriptsVirtualized(self, feedback_fn, instance):
1386 """Runs the OS scripts inside a safe virtualized environment. 1387 1388 The virtualized environment reuses the instance and temporarily 1389 creates a disk onto which the image of the helper VM is dumped. 1390 The temporary disk is used to boot the helper VM. The OS scripts 1391 are passed to the helper VM through the metadata daemon and the OS 1392 install package. 1393 1394 @type feedback_fn: callable 1395 @param feedback_fn: function used send feedback back to the caller 1396 1397 @type instance: L{objects.Instance} 1398 @param instance: instance for which the OS scripts must be run 1399 inside the virtualized environment 1400 1401 """ 1402 install_image = self.cfg.GetInstallImage() 1403 1404 if not install_image: 1405 raise errors.OpExecError("Cannot create install instance because an" 1406 " install image has not been specified") 1407 1408 disk_size = DetermineImageSize(self, install_image, instance.primary_node) 1409 1410 env = self.GetOsInstallPackageEnvironment( 1411 instance, 1412 constants.OS_SCRIPT_CREATE_UNTRUSTED) 1413 self.UpdateInstanceOsInstallPackage(feedback_fn, instance, env) 1414 UpdateMetadata(feedback_fn, self.rpc, instance, 1415 osparams_private=self.op.osparams_private, 1416 osparams_secret=self.op.osparams_secret) 1417 1418 with TemporaryDisk(self, 1419 instance, 1420 [(constants.DT_PLAIN, constants.DISK_RDWR, disk_size)], 1421 feedback_fn): 1422 feedback_fn("Activating instance disks") 1423 StartInstanceDisks(self, instance, False) 1424 1425 feedback_fn("Imaging disk with install image") 1426 ImageDisks(self, instance, install_image) 1427 1428 feedback_fn("Starting instance with install image") 1429 result = self.rpc.call_instance_start(instance.primary_node, 1430 (instance, [], []), 1431 False, self.op.reason) 1432 result.Raise("Could not start instance '%s' with the install image '%s'" 1433 % (instance.name, install_image)) 1434 1435 # First wait for the instance to start up 1436 running_check = lambda: IsInstanceRunning(self, instance, prereq=False) 1437 instance_up = retry.SimpleRetry(True, running_check, 5.0, 1438 self.op.helper_startup_timeout) 1439 if not instance_up: 1440 raise errors.OpExecError("Could not boot instance using install image" 1441 " '%s'" % install_image) 1442 1443 feedback_fn("Instance is up, now awaiting shutdown") 1444 1445 # Then for it to be finished, detected by its shutdown 1446 instance_up = retry.SimpleRetry(False, running_check, 20.0, 1447 self.op.helper_shutdown_timeout) 1448 if instance_up: 1449 self.LogWarning("Installation not completed prior to timeout, shutting" 1450 " down instance forcibly") 1451 1452 feedback_fn("Installation complete")
1453
1454 - def Exec(self, feedback_fn):
1455 """Create and add the instance to the cluster. 1456 1457 """ 1458 assert not (self.owned_locks(locking.LEVEL_NODE_RES) - 1459 self.owned_locks(locking.LEVEL_NODE)), \ 1460 "Node locks differ from node resource locks" 1461 1462 ht_kind = self.op.hypervisor 1463 if ht_kind in constants.HTS_REQ_PORT: 1464 network_port = self.cfg.AllocatePort() 1465 else: 1466 network_port = None 1467 1468 if self.op.commit: 1469 (instance_uuid, _) = self.cfg.ExpandInstanceName(self.op.instance_name) 1470 else: 1471 instance_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId()) 1472 1473 # This is ugly but we got a chicken-egg problem here 1474 # We can only take the group disk parameters, as the instance 1475 # has no disks yet (we are generating them right here). 1476 nodegroup = self.cfg.GetNodeGroup(self.pnode.group) 1477 1478 if self.op.commit: 1479 disks = self.cfg.GetInstanceDisks(instance_uuid) 1480 CommitDisks(disks) 1481 else: 1482 disks = GenerateDiskTemplate(self, 1483 self.op.disk_template, 1484 instance_uuid, self.pnode.uuid, 1485 self.secondaries, 1486 self.disks, 1487 self.instance_file_storage_dir, 1488 self.op.file_driver, 1489 0, 1490 feedback_fn, 1491 self.cfg.GetGroupDiskParams(nodegroup), 1492 forthcoming=self.op.forthcoming) 1493 1494 if self.op.os_type is None: 1495 os_type = "" 1496 else: 1497 os_type = self.op.os_type 1498 1499 iobj = objects.Instance(name=self.op.instance_name, 1500 uuid=instance_uuid, 1501 os=os_type, 1502 primary_node=self.pnode.uuid, 1503 nics=self.nics, disks=[], 1504 disk_template=self.op.disk_template, 1505 disks_active=False, 1506 admin_state=constants.ADMINST_DOWN, 1507 admin_state_source=constants.ADMIN_SOURCE, 1508 network_port=network_port, 1509 beparams=self.op.beparams, 1510 hvparams=self.op.hvparams, 1511 hypervisor=self.op.hypervisor, 1512 osparams=self.op.osparams, 1513 osparams_private=self.op.osparams_private, 1514 forthcoming=self.op.forthcoming, 1515 ) 1516 1517 if self.op.tags: 1518 for tag in self.op.tags: 1519 iobj.AddTag(tag) 1520 1521 if self.adopt_disks: 1522 if self.op.disk_template == constants.DT_PLAIN: 1523 # rename LVs to the newly-generated names; we need to construct 1524 # 'fake' LV disks with the old data, plus the new unique_id 1525 tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks] 1526 rename_to = [] 1527 for t_dsk, a_dsk in zip(tmp_disks, self.disks): 1528 rename_to.append(t_dsk.logical_id) 1529 t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT]) 1530 result = self.rpc.call_blockdev_rename(self.pnode.uuid, 1531 zip(tmp_disks, rename_to)) 1532 result.Raise("Failed to rename adoped LVs") 1533 elif self.op.forthcoming: 1534 feedback_fn("Instance is forthcoming, not creating disks") 1535 else: 1536 feedback_fn("* creating instance disks...") 1537 try: 1538 CreateDisks(self, iobj, disks=disks) 1539 except errors.OpExecError: 1540 self.LogWarning("Device creation failed") 1541 for disk in disks: 1542 self.cfg.ReleaseDRBDMinors(disk.uuid) 1543 raise 1544 1545 feedback_fn("adding instance %s to cluster config" % self.op.instance_name) 1546 self.cfg.AddInstance(iobj, self.proc.GetECId(), replace=self.op.commit) 1547 1548 feedback_fn("adding disks to cluster config") 1549 for disk in disks: 1550 self.cfg.AddInstanceDisk(iobj.uuid, disk, replace=self.op.commit) 1551 1552 if self.op.forthcoming: 1553 feedback_fn("Instance is forthcoming; not creating the actual instance") 1554 return self.cfg.GetNodeNames(list(self.cfg.GetInstanceNodes(iobj.uuid))) 1555 1556 # re-read the instance from the configuration 1557 iobj = self.cfg.GetInstanceInfo(iobj.uuid) 1558 1559 if self.op.mode == constants.INSTANCE_IMPORT: 1560 # Release unused nodes 1561 ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node_uuid]) 1562 else: 1563 # Release all nodes 1564 ReleaseLocks(self, locking.LEVEL_NODE) 1565 1566 # Wipe disks 1567 disk_abort = False 1568 if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks: 1569 feedback_fn("* wiping instance disks...") 1570 try: 1571 WipeDisks(self, iobj) 1572 except errors.OpExecError, err: 1573 logging.exception("Wiping disks failed") 1574 self.LogWarning("Wiping instance disks failed (%s)", err) 1575 disk_abort = True 1576 1577 self._RemoveDegradedDisks(feedback_fn, disk_abort, iobj) 1578 1579 # Image disks 1580 os_image = objects.GetOSImage(iobj.osparams) 1581 disk_abort = False 1582 1583 if not self.adopt_disks and os_image is not None: 1584 feedback_fn("* imaging instance disks...") 1585 try: 1586 ImageDisks(self, iobj, os_image) 1587 except errors.OpExecError, err: 1588 logging.exception("Imaging disks failed") 1589 self.LogWarning("Imaging instance disks failed (%s)", err) 1590 disk_abort = True 1591 1592 self._RemoveDegradedDisks(feedback_fn, disk_abort, iobj) 1593 1594 # instance disks are now active 1595 iobj.disks_active = True 1596 1597 # Release all node resource locks 1598 ReleaseLocks(self, locking.LEVEL_NODE_RES) 1599 1600 if iobj.os: 1601 result = self.rpc.call_os_diagnose([iobj.primary_node])[iobj.primary_node] 1602 result.Raise("Failed to get OS '%s'" % iobj.os) 1603 1604 trusted = None 1605 1606 for (name, _, _, _, _, _, _, os_trusted) in result.payload: 1607 if name == objects.OS.GetName(iobj.os): 1608 trusted = os_trusted 1609 break 1610 1611 if trusted is None: 1612 raise errors.OpPrereqError("OS '%s' is not available in node '%s'" % 1613 (iobj.os, iobj.primary_node)) 1614 elif trusted: 1615 self.RunOsScripts(feedback_fn, iobj) 1616 else: 1617 self.RunOsScriptsVirtualized(feedback_fn, iobj) 1618 # Instance is modified by 'RunOsScriptsVirtualized', 1619 # therefore, it must be retrieved once again from the 1620 # configuration, otherwise there will be a config object 1621 # version mismatch. 1622 iobj = self.cfg.GetInstanceInfo(iobj.uuid) 1623 1624 # Update instance metadata so that it can be reached from the 1625 # metadata service. 1626 UpdateMetadata(feedback_fn, self.rpc, iobj, 1627 osparams_private=self.op.osparams_private, 1628 osparams_secret=self.op.osparams_secret) 1629 1630 assert not self.owned_locks(locking.LEVEL_NODE_RES) 1631 1632 if self.op.start: 1633 iobj.admin_state = constants.ADMINST_UP 1634 self.cfg.Update(iobj, feedback_fn) 1635 logging.info("Starting instance %s on node %s", self.op.instance_name, 1636 self.pnode.name) 1637 feedback_fn("* starting instance...") 1638 result = self.rpc.call_instance_start(self.pnode.uuid, (iobj, None, None), 1639 False, self.op.reason) 1640 result.Raise("Could not start instance") 1641 1642 return self.cfg.GetNodeNames(list(self.cfg.GetInstanceNodes(iobj.uuid)))
1643
1644 - def PrepareRetry(self, feedback_fn):
1645 # A temporary lack of resources can only happen if opportunistic locking 1646 # is used. 1647 assert self.op.opportunistic_locking 1648 1649 logging.info("Opportunistic locking did not suceed, falling back to" 1650 " full lock allocation") 1651 feedback_fn("* falling back to full lock allocation") 1652 self.op.opportunistic_locking = False
1653