Package ganeti :: Package cmdlib :: Module instance_create
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_create

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30  """Logical unit for creating a single instance.""" 
  31   
  32  import OpenSSL 
  33  import logging 
  34  import os 
  35   
  36   
  37  from ganeti import compat 
  38  from ganeti import constants 
  39  from ganeti import errors 
  40  from ganeti import hypervisor 
  41  from ganeti import locking 
  42  from ganeti.masterd import iallocator 
  43  from ganeti import masterd 
  44  from ganeti import netutils 
  45  from ganeti import objects 
  46  from ganeti import pathutils 
  47  from ganeti import utils 
  48  from ganeti import serializer 
  49   
  50  from ganeti.cmdlib.base import LogicalUnit 
  51   
  52  from ganeti.cmdlib.common import \ 
  53    CheckNodeOnline, \ 
  54    CheckParamsNotGlobal, \ 
  55    IsExclusiveStorageEnabledNode, CheckHVParams, CheckOSParams, \ 
  56    ExpandNodeUuidAndName, \ 
  57    IsValidDiskAccessModeCombination, \ 
  58    CheckDiskTemplateEnabled, CheckIAllocatorOrNode, CheckOSImage 
  59  from ganeti.cmdlib.instance_helpervm import RunWithHelperVM 
  60  from ganeti.cmdlib.instance_storage import CalculateFileStorageDir, \ 
  61    CheckNodesFreeDiskPerVG, CheckRADOSFreeSpace, CheckSpindlesExclusiveStorage, \ 
  62    ComputeDiskSizePerVG, CreateDisks, \ 
  63    GenerateDiskTemplate, CommitDisks, \ 
  64    WaitForSync, ComputeDisks, \ 
  65    ImageDisks, WipeDisks 
  66  from ganeti.cmdlib.instance_utils import \ 
  67    CheckNodeNotDrained, CopyLockList, \ 
  68    ReleaseLocks, CheckNodeVmCapable, \ 
  69    RemoveDisks, CheckNodeFreeMemory, \ 
  70    UpdateMetadata, CheckForConflictingIp, \ 
  71    ComputeInstanceCommunicationNIC, \ 
  72    ComputeIPolicyInstanceSpecViolation, \ 
  73    CheckHostnameSane, CheckOpportunisticLocking, \ 
  74    ComputeFullBeParams, ComputeNics, GetClusterDomainSecret, \ 
  75    CheckInstanceExistence, CreateInstanceAllocRequest, BuildInstanceHookEnv, \ 
  76    NICListToTuple, CheckNicsBridgesExist, CheckCompressionTool 
  77  import ganeti.masterd.instance 
  78   
  79   
80 -class LUInstanceCreate(LogicalUnit):
81 """Create an instance. 82 83 """ 84 HPATH = "instance-add" 85 HTYPE = constants.HTYPE_INSTANCE 86 REQ_BGL = False 87
88 - def _CheckDiskTemplateValid(self):
89 """Checks validity of disk template. 90 91 """ 92 cluster = self.cfg.GetClusterInfo() 93 if self.op.disk_template is None: 94 # FIXME: It would be better to take the default disk template from the 95 # ipolicy, but for the ipolicy we need the primary node, which we get from 96 # the iallocator, which wants the disk template as input. To solve this 97 # chicken-and-egg problem, it should be possible to specify just a node 98 # group from the iallocator and take the ipolicy from that. 99 self.op.disk_template = cluster.enabled_disk_templates[0] 100 CheckDiskTemplateEnabled(cluster, self.op.disk_template)
101
102 - def _CheckDiskArguments(self):
103 """Checks validity of disk-related arguments. 104 105 """ 106 # check that disk's names are unique and valid 107 utils.ValidateDeviceNames("disk", self.op.disks) 108 109 self._CheckDiskTemplateValid() 110 111 # check disks. parameter names and consistent adopt/no-adopt strategy 112 has_adopt = has_no_adopt = False 113 for disk in self.op.disks: 114 if self.op.disk_template != constants.DT_EXT: 115 utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES) 116 if constants.IDISK_ADOPT in disk: 117 has_adopt = True 118 else: 119 has_no_adopt = True 120 if has_adopt and has_no_adopt: 121 raise errors.OpPrereqError("Either all disks are adopted or none is", 122 errors.ECODE_INVAL) 123 if has_adopt: 124 if self.op.disk_template not in constants.DTS_MAY_ADOPT: 125 raise errors.OpPrereqError("Disk adoption is not supported for the" 126 " '%s' disk template" % 127 self.op.disk_template, 128 errors.ECODE_INVAL) 129 if self.op.iallocator is not None: 130 raise errors.OpPrereqError("Disk adoption not allowed with an" 131 " iallocator script", errors.ECODE_INVAL) 132 if self.op.mode == constants.INSTANCE_IMPORT: 133 raise errors.OpPrereqError("Disk adoption not allowed for" 134 " instance import", errors.ECODE_INVAL) 135 else: 136 if self.op.disk_template in constants.DTS_MUST_ADOPT: 137 raise errors.OpPrereqError("Disk template %s requires disk adoption," 138 " but no 'adopt' parameter given" % 139 self.op.disk_template, 140 errors.ECODE_INVAL) 141 142 self.adopt_disks = has_adopt
143
144 - def _CheckVLANArguments(self):
145 """ Check validity of VLANs if given 146 147 """ 148 for nic in self.op.nics: 149 vlan = nic.get(constants.INIC_VLAN, None) 150 if vlan: 151 if vlan[0] == ".": 152 # vlan starting with dot means single untagged vlan, 153 # might be followed by trunk (:) 154 if not vlan[1:].isdigit(): 155 vlanlist = vlan[1:].split(':') 156 for vl in vlanlist: 157 if not vl.isdigit(): 158 raise errors.OpPrereqError("Specified VLAN parameter is " 159 "invalid : %s" % vlan, 160 errors.ECODE_INVAL) 161 elif vlan[0] == ":": 162 # Trunk - tagged only 163 vlanlist = vlan[1:].split(':') 164 for vl in vlanlist: 165 if not vl.isdigit(): 166 raise errors.OpPrereqError("Specified VLAN parameter is invalid" 167 " : %s" % vlan, errors.ECODE_INVAL) 168 elif vlan.isdigit(): 169 # This is the simplest case. No dots, only single digit 170 # -> Create untagged access port, dot needs to be added 171 nic[constants.INIC_VLAN] = "." + vlan 172 else: 173 raise errors.OpPrereqError("Specified VLAN parameter is invalid" 174 " : %s" % vlan, errors.ECODE_INVAL)
175
176 - def CheckArguments(self):
177 """Check arguments. 178 179 """ 180 if self.op.forthcoming and self.op.commit: 181 raise errors.OpPrereqError("Forthcoming generation and commiting are" 182 " mutually exclusive", errors.ECODE_INVAL) 183 184 # do not require name_check to ease forward/backward compatibility 185 # for tools 186 if self.op.no_install and self.op.start: 187 self.LogInfo("No-installation mode selected, disabling startup") 188 self.op.start = False 189 # validate/normalize the instance name 190 self.op.instance_name = \ 191 netutils.Hostname.GetNormalizedName(self.op.instance_name) 192 193 if self.op.ip_check and not self.op.name_check: 194 # TODO: make the ip check more flexible and not depend on the name check 195 raise errors.OpPrereqError("Cannot do IP address check without a name" 196 " check", errors.ECODE_INVAL) 197 198 # instance name verification 199 if self.op.name_check: 200 self.hostname = CheckHostnameSane(self, self.op.instance_name) 201 self.op.instance_name = self.hostname.name 202 # used in CheckPrereq for ip ping check 203 self.check_ip = self.hostname.ip 204 else: 205 self.check_ip = None 206 207 # add NIC for instance communication 208 if self.op.instance_communication: 209 nic_name = ComputeInstanceCommunicationNIC(self.op.instance_name) 210 211 for nic in self.op.nics: 212 if nic.get(constants.INIC_NAME, None) == nic_name: 213 break 214 else: 215 self.op.nics.append({constants.INIC_NAME: nic_name, 216 constants.INIC_MAC: constants.VALUE_GENERATE, 217 constants.INIC_IP: constants.NIC_IP_POOL, 218 constants.INIC_NETWORK: 219 self.cfg.GetInstanceCommunicationNetwork()}) 220 221 # timeouts for unsafe OS installs 222 if self.op.helper_startup_timeout is None: 223 self.op.helper_startup_timeout = constants.HELPER_VM_STARTUP 224 225 if self.op.helper_shutdown_timeout is None: 226 self.op.helper_shutdown_timeout = constants.HELPER_VM_SHUTDOWN 227 228 # check nics' parameter names 229 for nic in self.op.nics: 230 utils.ForceDictType(nic, constants.INIC_PARAMS_TYPES) 231 # check that NIC's parameters names are unique and valid 232 utils.ValidateDeviceNames("NIC", self.op.nics) 233 234 self._CheckVLANArguments() 235 236 self._CheckDiskArguments() 237 assert self.op.disk_template is not None 238 239 # file storage checks 240 if (self.op.file_driver and 241 not self.op.file_driver in constants.FILE_DRIVER): 242 raise errors.OpPrereqError("Invalid file driver name '%s'" % 243 self.op.file_driver, errors.ECODE_INVAL) 244 245 # set default file_driver if unset and required 246 if (not self.op.file_driver and 247 self.op.disk_template in constants.DTS_FILEBASED): 248 self.op.file_driver = constants.FD_DEFAULT 249 250 ### Node/iallocator related checks 251 CheckIAllocatorOrNode(self, "iallocator", "pnode") 252 253 if self.op.pnode is not None: 254 if self.op.disk_template in constants.DTS_INT_MIRROR: 255 if self.op.snode is None: 256 raise errors.OpPrereqError("The networked disk templates need" 257 " a mirror node", errors.ECODE_INVAL) 258 elif self.op.snode: 259 self.LogWarning("Secondary node will be ignored on non-mirrored disk" 260 " template") 261 self.op.snode = None 262 263 CheckOpportunisticLocking(self.op) 264 265 if self.op.mode == constants.INSTANCE_IMPORT: 266 # On import force_variant must be True, because if we forced it at 267 # initial install, our only chance when importing it back is that it 268 # works again! 269 self.op.force_variant = True 270 271 if self.op.no_install: 272 self.LogInfo("No-installation mode has no effect during import") 273 274 if objects.GetOSImage(self.op.osparams): 275 self.LogInfo("OS image has no effect during import") 276 elif self.op.mode == constants.INSTANCE_CREATE: 277 os_image = CheckOSImage(self.op) 278 279 if self.op.os_type is None and os_image is None: 280 raise errors.OpPrereqError("No guest OS or OS image specified", 281 errors.ECODE_INVAL) 282 283 if self.op.os_type is not None \ 284 and self.op.os_type in self.cfg.GetClusterInfo().blacklisted_os: 285 raise errors.OpPrereqError("Guest OS '%s' is not allowed for" 286 " installation" % self.op.os_type, 287 errors.ECODE_STATE) 288 elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: 289 if objects.GetOSImage(self.op.osparams): 290 self.LogInfo("OS image has no effect during import") 291 292 self._cds = GetClusterDomainSecret() 293 294 # Check handshake to ensure both clusters have the same domain secret 295 src_handshake = self.op.source_handshake 296 if not src_handshake: 297 raise errors.OpPrereqError("Missing source handshake", 298 errors.ECODE_INVAL) 299 300 errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds, 301 src_handshake) 302 if errmsg: 303 raise errors.OpPrereqError("Invalid handshake: %s" % errmsg, 304 errors.ECODE_INVAL) 305 306 # Load and check source CA 307 self.source_x509_ca_pem = self.op.source_x509_ca 308 if not self.source_x509_ca_pem: 309 raise errors.OpPrereqError("Missing source X509 CA", 310 errors.ECODE_INVAL) 311 312 try: 313 (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem, 314 self._cds) 315 except OpenSSL.crypto.Error, err: 316 raise errors.OpPrereqError("Unable to load source X509 CA (%s)" % 317 (err, ), errors.ECODE_INVAL) 318 319 (errcode, msg) = utils.VerifyX509Certificate(cert, None, None) 320 if errcode is not None: 321 raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ), 322 errors.ECODE_INVAL) 323 324 self.source_x509_ca = cert 325 326 src_instance_name = self.op.source_instance_name 327 if not src_instance_name: 328 raise errors.OpPrereqError("Missing source instance name", 329 errors.ECODE_INVAL) 330 331 self.source_instance_name = \ 332 netutils.GetHostname(name=src_instance_name).name 333 334 else: 335 raise errors.OpPrereqError("Invalid instance creation mode %r" % 336 self.op.mode, errors.ECODE_INVAL)
337
338 - def ExpandNames(self):
339 """ExpandNames for CreateInstance. 340 341 Figure out the right locks for instance creation. 342 343 """ 344 self.needed_locks = {} 345 346 if self.op.commit: 347 (uuid, name) = self.cfg.ExpandInstanceName(self.op.instance_name) 348 if name is None: 349 raise errors.OpPrereqError("Instance %s unknown" % 350 self.op.instance_name, 351 errors.ECODE_INVAL) 352 self.op.instance_name = name 353 if not self.cfg.GetInstanceInfo(uuid).forthcoming: 354 raise errors.OpPrereqError("Instance %s (with uuid %s) not forthcoming" 355 " but --commit was passed." % (name, uuid), 356 errors.ECODE_STATE) 357 logging.debug("Verified that instance %s with uuid %s is forthcoming", 358 name, uuid) 359 else: 360 # this is just a preventive check, but someone might still add this 361 # instance in the meantime; we check again in CheckPrereq 362 CheckInstanceExistence(self, self.op.instance_name) 363 364 self.add_locks[locking.LEVEL_INSTANCE] = self.op.instance_name 365 366 if self.op.commit: 367 (uuid, _) = self.cfg.ExpandInstanceName(self.op.instance_name) 368 self.needed_locks[locking.LEVEL_NODE] = self.cfg.GetInstanceNodes(uuid) 369 logging.debug("Forthcoming instance %s resides on %s", uuid, 370 self.needed_locks[locking.LEVEL_NODE]) 371 elif self.op.iallocator: 372 # TODO: Find a solution to not lock all nodes in the cluster, e.g. by 373 # specifying a group on instance creation and then selecting nodes from 374 # that group 375 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 376 377 if self.op.opportunistic_locking: 378 self.opportunistic_locks[locking.LEVEL_NODE] = True 379 self.opportunistic_locks[locking.LEVEL_NODE_RES] = True 380 if self.op.disk_template == constants.DT_DRBD8: 381 self.opportunistic_locks_count[locking.LEVEL_NODE] = 2 382 self.opportunistic_locks_count[locking.LEVEL_NODE_RES] = 2 383 else: 384 (self.op.pnode_uuid, self.op.pnode) = \ 385 ExpandNodeUuidAndName(self.cfg, self.op.pnode_uuid, self.op.pnode) 386 nodelist = [self.op.pnode_uuid] 387 if self.op.snode is not None: 388 (self.op.snode_uuid, self.op.snode) = \ 389 ExpandNodeUuidAndName(self.cfg, self.op.snode_uuid, self.op.snode) 390 nodelist.append(self.op.snode_uuid) 391 self.needed_locks[locking.LEVEL_NODE] = nodelist 392 393 # in case of import lock the source node too 394 if self.op.mode == constants.INSTANCE_IMPORT: 395 src_node = self.op.src_node 396 src_path = self.op.src_path 397 398 if src_path is None: 399 self.op.src_path = src_path = self.op.instance_name 400 401 if src_node is None: 402 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 403 self.op.src_node = None 404 if os.path.isabs(src_path): 405 raise errors.OpPrereqError("Importing an instance from a path" 406 " requires a source node option", 407 errors.ECODE_INVAL) 408 else: 409 (self.op.src_node_uuid, self.op.src_node) = (_, src_node) = \ 410 ExpandNodeUuidAndName(self.cfg, self.op.src_node_uuid, src_node) 411 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: 412 self.needed_locks[locking.LEVEL_NODE].append(self.op.src_node_uuid) 413 if not os.path.isabs(src_path): 414 self.op.src_path = \ 415 utils.PathJoin(pathutils.EXPORT_DIR, src_path) 416 417 self.needed_locks[locking.LEVEL_NODE_RES] = \ 418 CopyLockList(self.needed_locks[locking.LEVEL_NODE]) 419 420 # Optimistically acquire shared group locks (we're reading the 421 # configuration). We can't just call GetInstanceNodeGroups, because the 422 # instance doesn't exist yet. Therefore we lock all node groups of all 423 # nodes we have. 424 if self.needed_locks[locking.LEVEL_NODE] == locking.ALL_SET: 425 # In the case we lock all nodes for opportunistic allocation, we have no 426 # choice than to lock all groups, because they're allocated before nodes. 427 # This is sad, but true. At least we release all those we don't need in 428 # CheckPrereq later. 429 self.needed_locks[locking.LEVEL_NODEGROUP] = locking.ALL_SET 430 else: 431 self.needed_locks[locking.LEVEL_NODEGROUP] = \ 432 list(self.cfg.GetNodeGroupsFromNodes( 433 self.needed_locks[locking.LEVEL_NODE])) 434 self.share_locks[locking.LEVEL_NODEGROUP] = 1
435
436 - def DeclareLocks(self, level):
437 if level == locking.LEVEL_NODE_RES: 438 if self.op.opportunistic_locking: 439 self.needed_locks[locking.LEVEL_NODE_RES] = \ 440 CopyLockList(list(self.owned_locks(locking.LEVEL_NODE)))
441
442 - def _RunAllocator(self):
443 """Run the allocator based on input opcode. 444 445 """ 446 if self.op.opportunistic_locking: 447 # Only consider nodes for which a lock is held 448 node_name_whitelist = self.cfg.GetNodeNames( 449 set(self.owned_locks(locking.LEVEL_NODE)) & 450 set(self.owned_locks(locking.LEVEL_NODE_RES))) 451 logging.debug("Trying to allocate on nodes %s", node_name_whitelist) 452 else: 453 node_name_whitelist = None 454 455 req = CreateInstanceAllocRequest(self.op, self.disks, 456 self.nics, self.be_full, 457 node_name_whitelist) 458 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 459 460 ial.Run(self.op.iallocator) 461 462 if not ial.success: 463 # When opportunistic locks are used only a temporary failure is generated 464 if self.op.opportunistic_locking: 465 ecode = errors.ECODE_TEMP_NORES 466 self.LogInfo("IAllocator '%s' failed on opportunistically acquired" 467 " nodes: %s", self.op.iallocator, ial.info) 468 else: 469 ecode = errors.ECODE_NORES 470 471 raise errors.OpPrereqError("Can't compute nodes using" 472 " iallocator '%s': %s" % 473 (self.op.iallocator, ial.info), 474 ecode) 475 476 (self.op.pnode_uuid, self.op.pnode) = \ 477 ExpandNodeUuidAndName(self.cfg, None, ial.result[0]) 478 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 479 self.op.instance_name, self.op.iallocator, 480 utils.CommaJoin(ial.result)) 481 482 assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator" 483 484 if req.RequiredNodes() == 2: 485 (self.op.snode_uuid, self.op.snode) = \ 486 ExpandNodeUuidAndName(self.cfg, None, ial.result[1])
487
488 - def BuildHooksEnv(self):
489 """Build hooks env. 490 491 This runs on master, primary and secondary nodes of the instance. 492 493 """ 494 env = { 495 "ADD_MODE": self.op.mode, 496 } 497 if self.op.mode == constants.INSTANCE_IMPORT: 498 env["SRC_NODE"] = self.op.src_node 499 env["SRC_PATH"] = self.op.src_path 500 env["SRC_IMAGES"] = self.src_images 501 502 env.update(BuildInstanceHookEnv( 503 name=self.op.instance_name, 504 primary_node_name=self.op.pnode, 505 secondary_node_names=self.cfg.GetNodeNames(self.secondaries), 506 status=self.op.start, 507 os_type=self.op.os_type, 508 minmem=self.be_full[constants.BE_MINMEM], 509 maxmem=self.be_full[constants.BE_MAXMEM], 510 vcpus=self.be_full[constants.BE_VCPUS], 511 nics=NICListToTuple(self, self.nics), 512 disk_template=self.op.disk_template, 513 # Note that self.disks here is not a list with objects.Disk 514 # but with dicts as returned by ComputeDisks. 515 disks=self.disks, 516 bep=self.be_full, 517 hvp=self.hv_full, 518 hypervisor_name=self.op.hypervisor, 519 tags=self.op.tags, 520 )) 521 522 return env
523
524 - def BuildHooksNodes(self):
525 """Build hooks nodes. 526 527 """ 528 nl = [self.cfg.GetMasterNode(), self.op.pnode_uuid] + self.secondaries 529 return nl, nl
530
531 - def _ReadExportInfo(self):
532 """Reads the export information from disk. 533 534 It will override the opcode source node and path with the actual 535 information, if these two were not specified before. 536 537 @return: the export information 538 539 """ 540 assert self.op.mode == constants.INSTANCE_IMPORT 541 542 if self.op.src_node_uuid is None: 543 locked_nodes = self.owned_locks(locking.LEVEL_NODE) 544 exp_list = self.rpc.call_export_list(locked_nodes) 545 found = False 546 for node_uuid in exp_list: 547 if exp_list[node_uuid].fail_msg: 548 continue 549 if self.op.src_path in exp_list[node_uuid].payload: 550 found = True 551 self.op.src_node = self.cfg.GetNodeInfo(node_uuid).name 552 self.op.src_node_uuid = node_uuid 553 self.op.src_path = utils.PathJoin(pathutils.EXPORT_DIR, 554 self.op.src_path) 555 break 556 if not found: 557 raise errors.OpPrereqError("No export found for relative path %s" % 558 self.op.src_path, errors.ECODE_INVAL) 559 560 CheckNodeOnline(self, self.op.src_node_uuid) 561 result = self.rpc.call_export_info(self.op.src_node_uuid, self.op.src_path) 562 result.Raise("No export or invalid export found in dir %s" % 563 self.op.src_path) 564 565 export_info = objects.SerializableConfigParser.Loads(str(result.payload)) 566 if not export_info.has_section(constants.INISECT_EXP): 567 raise errors.ProgrammerError("Corrupted export config", 568 errors.ECODE_ENVIRON) 569 570 ei_version = export_info.get(constants.INISECT_EXP, "version") 571 if int(ei_version) != constants.EXPORT_VERSION: 572 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" % 573 (ei_version, constants.EXPORT_VERSION), 574 errors.ECODE_ENVIRON) 575 return export_info
576
577 - def _ReadExportParams(self, einfo):
578 """Use export parameters as defaults. 579 580 In case the opcode doesn't specify (as in override) some instance 581 parameters, then try to use them from the export information, if 582 that declares them. 583 584 """ 585 self.op.os_type = einfo.get(constants.INISECT_EXP, "os") 586 587 if not self.op.disks: 588 disks = [] 589 # TODO: import the disk iv_name too 590 for idx in range(constants.MAX_DISKS): 591 if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx): 592 disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx) 593 disk_name = einfo.get(constants.INISECT_INS, "disk%d_name" % idx) 594 disk = { 595 constants.IDISK_SIZE: disk_sz, 596 constants.IDISK_NAME: disk_name 597 } 598 disks.append(disk) 599 self.op.disks = disks 600 if not disks and self.op.disk_template != constants.DT_DISKLESS: 601 raise errors.OpPrereqError("No disk info specified and the export" 602 " is missing the disk information", 603 errors.ECODE_INVAL) 604 605 if not self.op.nics: 606 nics = [] 607 for idx in range(constants.MAX_NICS): 608 if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx): 609 ndict = {} 610 for name in [constants.INIC_IP, 611 constants.INIC_MAC, constants.INIC_NAME]: 612 nic_param_name = "nic%d_%s" % (idx, name) 613 if einfo.has_option(constants.INISECT_INS, nic_param_name): 614 v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) 615 ndict[name] = v 616 network = einfo.get(constants.INISECT_INS, 617 "nic%d_%s" % (idx, constants.INIC_NETWORK)) 618 # in case network is given link and mode are inherited 619 # from nodegroup's netparams and thus should not be passed here 620 if network: 621 ndict[constants.INIC_NETWORK] = network 622 else: 623 for name in list(constants.NICS_PARAMETERS): 624 v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) 625 ndict[name] = v 626 nics.append(ndict) 627 else: 628 break 629 self.op.nics = nics 630 631 if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"): 632 self.op.tags = einfo.get(constants.INISECT_INS, "tags").split() 633 634 if (self.op.hypervisor is None and 635 einfo.has_option(constants.INISECT_INS, "hypervisor")): 636 self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor") 637 638 if einfo.has_section(constants.INISECT_HYP): 639 # use the export parameters but do not override the ones 640 # specified by the user 641 for name, value in einfo.items(constants.INISECT_HYP): 642 if name not in self.op.hvparams: 643 self.op.hvparams[name] = value 644 645 if einfo.has_section(constants.INISECT_BEP): 646 # use the parameters, without overriding 647 for name, value in einfo.items(constants.INISECT_BEP): 648 if name not in self.op.beparams: 649 self.op.beparams[name] = value 650 # Compatibility for the old "memory" be param 651 if name == constants.BE_MEMORY: 652 if constants.BE_MAXMEM not in self.op.beparams: 653 self.op.beparams[constants.BE_MAXMEM] = value 654 if constants.BE_MINMEM not in self.op.beparams: 655 self.op.beparams[constants.BE_MINMEM] = value 656 else: 657 # try to read the parameters old style, from the main section 658 for name in constants.BES_PARAMETERS: 659 if (name not in self.op.beparams and 660 einfo.has_option(constants.INISECT_INS, name)): 661 self.op.beparams[name] = einfo.get(constants.INISECT_INS, name) 662 663 if einfo.has_section(constants.INISECT_OSP): 664 # use the parameters, without overriding 665 for name, value in einfo.items(constants.INISECT_OSP): 666 if name not in self.op.osparams: 667 self.op.osparams[name] = value 668 669 if einfo.has_section(constants.INISECT_OSP_PRIVATE): 670 # use the parameters, without overriding 671 for name, value in einfo.items(constants.INISECT_OSP_PRIVATE): 672 if name not in self.op.osparams_private: 673 self.op.osparams_private[name] = serializer.Private(value, descr=name)
674
675 - def _RevertToDefaults(self, cluster):
676 """Revert the instance parameters to the default values. 677 678 """ 679 # hvparams 680 hv_defs = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, {}) 681 for name in self.op.hvparams.keys(): 682 if name in hv_defs and hv_defs[name] == self.op.hvparams[name]: 683 del self.op.hvparams[name] 684 # beparams 685 be_defs = cluster.SimpleFillBE({}) 686 for name in self.op.beparams.keys(): 687 if name in be_defs and be_defs[name] == self.op.beparams[name]: 688 del self.op.beparams[name] 689 # nic params 690 nic_defs = cluster.SimpleFillNIC({}) 691 for nic in self.op.nics: 692 for name in constants.NICS_PARAMETERS: 693 if name in nic and name in nic_defs and nic[name] == nic_defs[name]: 694 del nic[name] 695 # osparams 696 os_defs = cluster.SimpleFillOS(self.op.os_type, {}) 697 for name in self.op.osparams.keys(): 698 if name in os_defs and os_defs[name] == self.op.osparams[name]: 699 del self.op.osparams[name] 700 701 os_defs_ = cluster.SimpleFillOS(self.op.os_type, {}, 702 os_params_private={}) 703 for name in self.op.osparams_private.keys(): 704 if name in os_defs_ and os_defs_[name] == self.op.osparams_private[name]: 705 del self.op.osparams_private[name]
706
708 """Set nodes as in the forthcoming instance 709 710 """ 711 (uuid, name) = self.cfg.ExpandInstanceName(self.op.instance_name) 712 inst = self.cfg.GetInstanceInfo(uuid) 713 self.op.pnode_uuid = inst.primary_node 714 self.op.pnode = self.cfg.GetNodeName(inst.primary_node) 715 sec_nodes = self.cfg.GetInstanceSecondaryNodes(uuid) 716 node_names = [self.op.pnode] 717 if sec_nodes: 718 self.op.snode_uuid = sec_nodes[0] 719 self.op.snode = self.cfg.GetNodeName(sec_nodes[0]) 720 node_names.append(self.op.snode) 721 self.LogInfo("Nodes of instance %s: %s", name, node_names)
722
723 - def CheckPrereq(self): # pylint: disable=R0914
724 """Check prerequisites. 725 726 """ 727 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) 728 729 if self.op.commit: 730 # Check that the instance is still on the cluster, forthcoming, and 731 # still resides on the nodes we acquired. 732 (uuid, name) = self.cfg.ExpandInstanceName(self.op.instance_name) 733 if uuid is None: 734 raise errors.OpPrereqError("Instance %s disappeared from the cluster" 735 " while waiting for locks" 736 % (self.op.instance_name,), 737 errors.ECODE_STATE) 738 if not self.cfg.GetInstanceInfo(uuid).forthcoming: 739 raise errors.OpPrereqError("Instance %s (with uuid %s) is no longer" 740 " forthcoming" % (name, uuid), 741 errors.ECODE_STATE) 742 required_nodes = self.cfg.GetInstanceNodes(uuid) 743 if not owned_nodes.issuperset(required_nodes): 744 raise errors.OpPrereqError("Forthcoming instance %s nodes changed" 745 " since locks were acquired; retry the" 746 " operation" % self.op.instance_name, 747 errors.ECODE_STATE) 748 else: 749 CheckInstanceExistence(self, self.op.instance_name) 750 751 # Check that the optimistically acquired groups are correct wrt the 752 # acquired nodes 753 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) 754 cur_groups = list(self.cfg.GetNodeGroupsFromNodes(owned_nodes)) 755 if not owned_groups.issuperset(cur_groups): 756 raise errors.OpPrereqError("New instance %s's node groups changed since" 757 " locks were acquired, current groups are" 758 " are '%s', owning groups '%s'; retry the" 759 " operation" % 760 (self.op.instance_name, 761 utils.CommaJoin(cur_groups), 762 utils.CommaJoin(owned_groups)), 763 errors.ECODE_STATE) 764 765 self.instance_file_storage_dir = CalculateFileStorageDir( 766 self.op.disk_template, self.cfg, self.op.instance_name, 767 self.op.file_storage_dir) 768 769 if self.op.mode == constants.INSTANCE_IMPORT: 770 export_info = self._ReadExportInfo() 771 self._ReadExportParams(export_info) 772 self._old_instance_name = export_info.get(constants.INISECT_INS, "name") 773 else: 774 self._old_instance_name = None 775 776 if (not self.cfg.GetVGName() and 777 self.op.disk_template not in constants.DTS_NOT_LVM): 778 raise errors.OpPrereqError("Cluster does not support lvm-based" 779 " instances", errors.ECODE_STATE) 780 781 if (self.op.hypervisor is None or 782 self.op.hypervisor == constants.VALUE_AUTO): 783 self.op.hypervisor = self.cfg.GetHypervisorType() 784 785 cluster = self.cfg.GetClusterInfo() 786 enabled_hvs = cluster.enabled_hypervisors 787 if self.op.hypervisor not in enabled_hvs: 788 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the" 789 " cluster (%s)" % 790 (self.op.hypervisor, ",".join(enabled_hvs)), 791 errors.ECODE_STATE) 792 793 # Check tag validity 794 for tag in self.op.tags: 795 objects.TaggableObject.ValidateTag(tag) 796 797 # check hypervisor parameter syntax (locally) 798 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) 799 filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, 800 self.op.hvparams) 801 hv_type = hypervisor.GetHypervisorClass(self.op.hypervisor) 802 hv_type.CheckParameterSyntax(filled_hvp) 803 self.hv_full = filled_hvp 804 # check that we don't specify global parameters on an instance 805 CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS, "hypervisor", 806 "instance", "cluster") 807 808 # fill and remember the beparams dict 809 self.be_full = ComputeFullBeParams(self.op, cluster) 810 811 # build os parameters 812 if self.op.osparams_private is None: 813 self.op.osparams_private = serializer.PrivateDict() 814 if self.op.osparams_secret is None: 815 self.op.osparams_secret = serializer.PrivateDict() 816 817 self.os_full = cluster.SimpleFillOS( 818 self.op.os_type, 819 self.op.osparams, 820 os_params_private=self.op.osparams_private, 821 os_params_secret=self.op.osparams_secret 822 ) 823 824 # now that hvp/bep are in final format, let's reset to defaults, 825 # if told to do so 826 if self.op.identify_defaults: 827 self._RevertToDefaults(cluster) 828 829 # NIC buildup 830 self.nics = ComputeNics(self.op, cluster, self.check_ip, self.cfg, 831 self.proc.GetECId()) 832 833 # disk checks/pre-build 834 default_vg = self.cfg.GetVGName() 835 self.disks = ComputeDisks(self.op.disks, self.op.disk_template, default_vg) 836 837 if self.op.mode == constants.INSTANCE_IMPORT: 838 disk_images = [] 839 for idx in range(len(self.disks)): 840 option = "disk%d_dump" % idx 841 if export_info.has_option(constants.INISECT_INS, option): 842 # FIXME: are the old os-es, disk sizes, etc. useful? 843 export_name = export_info.get(constants.INISECT_INS, option) 844 image = utils.PathJoin(self.op.src_path, export_name) 845 disk_images.append(image) 846 else: 847 disk_images.append(False) 848 849 self.src_images = disk_images 850 851 if self.op.instance_name == self._old_instance_name: 852 for idx, nic in enumerate(self.nics): 853 if nic.mac == constants.VALUE_AUTO: 854 nic_mac_ini = "nic%d_mac" % idx 855 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini) 856 857 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT 858 859 # ip ping checks (we use the same ip that was resolved in ExpandNames) 860 if self.op.ip_check: 861 if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): 862 raise errors.OpPrereqError("IP %s of instance %s already in use" % 863 (self.check_ip, self.op.instance_name), 864 errors.ECODE_NOTUNIQUE) 865 866 #### mac address generation 867 # By generating here the mac address both the allocator and the hooks get 868 # the real final mac address rather than the 'auto' or 'generate' value. 869 # There is a race condition between the generation and the instance object 870 # creation, which means that we know the mac is valid now, but we're not 871 # sure it will be when we actually add the instance. If things go bad 872 # adding the instance will abort because of a duplicate mac, and the 873 # creation job will fail. 874 for nic in self.nics: 875 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): 876 nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId()) 877 878 #### allocator run 879 880 if self.op.iallocator is not None: 881 if self.op.commit: 882 self._GetNodesFromForthcomingInstance() 883 else: 884 self._RunAllocator() 885 886 # Release all unneeded node locks 887 keep_locks = filter(None, [self.op.pnode_uuid, self.op.snode_uuid, 888 self.op.src_node_uuid]) 889 ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks) 890 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks) 891 # Release all unneeded group locks 892 ReleaseLocks(self, locking.LEVEL_NODEGROUP, 893 keep=self.cfg.GetNodeGroupsFromNodes(keep_locks)) 894 895 assert (self.owned_locks(locking.LEVEL_NODE) == 896 self.owned_locks(locking.LEVEL_NODE_RES)), \ 897 ("Node locks differ from node resource locks (%s vs %s)" 898 % (self.owned_locks(locking.LEVEL_NODE), 899 self.owned_locks(locking.LEVEL_NODE_RES))) 900 901 #### node related checks 902 903 # check primary node 904 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode_uuid) 905 assert self.pnode is not None, \ 906 "Cannot retrieve locked node %s" % self.op.pnode_uuid 907 if pnode.offline: 908 raise errors.OpPrereqError("Cannot use offline primary node '%s'" % 909 pnode.name, errors.ECODE_STATE) 910 if pnode.drained: 911 raise errors.OpPrereqError("Cannot use drained primary node '%s'" % 912 pnode.name, errors.ECODE_STATE) 913 if not pnode.vm_capable: 914 raise errors.OpPrereqError("Cannot use non-vm_capable primary node" 915 " '%s'" % pnode.name, errors.ECODE_STATE) 916 917 self.secondaries = [] 918 919 # Fill in any IPs from IP pools. This must happen here, because we need to 920 # know the nic's primary node, as specified by the iallocator 921 for idx, nic in enumerate(self.nics): 922 net_uuid = nic.network 923 if net_uuid is not None: 924 nobj = self.cfg.GetNetwork(net_uuid) 925 netparams = self.cfg.GetGroupNetParams(net_uuid, self.pnode.uuid) 926 if netparams is None: 927 raise errors.OpPrereqError("No netparams found for network" 928 " %s. Probably not connected to" 929 " node's %s nodegroup" % 930 (nobj.name, self.pnode.name), 931 errors.ECODE_INVAL) 932 self.LogInfo("NIC/%d inherits netparams %s" % 933 (idx, netparams.values())) 934 nic.nicparams = dict(netparams) 935 if nic.ip is not None: 936 if nic.ip.lower() == constants.NIC_IP_POOL: 937 try: 938 nic.ip = self.cfg.GenerateIp(net_uuid, self.proc.GetECId()) 939 except errors.ReservationError: 940 raise errors.OpPrereqError("Unable to get a free IP for NIC %d" 941 " from the address pool" % idx, 942 errors.ECODE_STATE) 943 self.LogInfo("Chose IP %s from network %s", nic.ip, nobj.name) 944 else: 945 try: 946 self.cfg.ReserveIp(net_uuid, nic.ip, self.proc.GetECId(), 947 check=self.op.conflicts_check) 948 except errors.ReservationError: 949 raise errors.OpPrereqError("IP address %s already in use" 950 " or does not belong to network %s" % 951 (nic.ip, nobj.name), 952 errors.ECODE_NOTUNIQUE) 953 954 # net is None, ip None or given 955 elif self.op.conflicts_check: 956 CheckForConflictingIp(self, nic.ip, self.pnode.uuid) 957 958 # mirror node verification 959 if self.op.disk_template in constants.DTS_INT_MIRROR: 960 if self.op.snode_uuid == pnode.uuid: 961 raise errors.OpPrereqError("The secondary node cannot be the" 962 " primary node", errors.ECODE_INVAL) 963 CheckNodeOnline(self, self.op.snode_uuid) 964 CheckNodeNotDrained(self, self.op.snode_uuid) 965 CheckNodeVmCapable(self, self.op.snode_uuid) 966 self.secondaries.append(self.op.snode_uuid) 967 968 snode = self.cfg.GetNodeInfo(self.op.snode_uuid) 969 if pnode.group != snode.group: 970 self.LogWarning("The primary and secondary nodes are in two" 971 " different node groups; the disk parameters" 972 " from the first disk's node group will be" 973 " used") 974 975 nodes = [pnode] 976 if self.op.disk_template in constants.DTS_INT_MIRROR: 977 nodes.append(snode) 978 has_es = lambda n: IsExclusiveStorageEnabledNode(self.cfg, n) 979 excl_stor = compat.any(map(has_es, nodes)) 980 if excl_stor and not self.op.disk_template in constants.DTS_EXCL_STORAGE: 981 raise errors.OpPrereqError("Disk template %s not supported with" 982 " exclusive storage" % self.op.disk_template, 983 errors.ECODE_STATE) 984 for disk in self.disks: 985 CheckSpindlesExclusiveStorage(disk, excl_stor, True) 986 987 node_uuids = [pnode.uuid] + self.secondaries 988 989 if not self.adopt_disks: 990 if self.op.disk_template == constants.DT_RBD: 991 # _CheckRADOSFreeSpace() is just a placeholder. 992 # Any function that checks prerequisites can be placed here. 993 # Check if there is enough space on the RADOS cluster. 994 CheckRADOSFreeSpace() 995 elif self.op.disk_template == constants.DT_EXT: 996 # FIXME: Function that checks prereqs if needed 997 pass 998 elif self.op.disk_template in constants.DTS_LVM: 999 # Check lv size requirements, if not adopting 1000 req_sizes = ComputeDiskSizePerVG(self.op.disk_template, self.disks) 1001 CheckNodesFreeDiskPerVG(self, node_uuids, req_sizes) 1002 else: 1003 # FIXME: add checks for other, non-adopting, non-lvm disk templates 1004 pass 1005 1006 elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data 1007 all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG], 1008 disk[constants.IDISK_ADOPT]) 1009 for disk in self.disks]) 1010 if len(all_lvs) != len(self.disks): 1011 raise errors.OpPrereqError("Duplicate volume names given for adoption", 1012 errors.ECODE_INVAL) 1013 for lv_name in all_lvs: 1014 try: 1015 # FIXME: lv_name here is "vg/lv" need to ensure that other calls 1016 # to ReserveLV uses the same syntax 1017 self.cfg.ReserveLV(lv_name, self.proc.GetECId()) 1018 except errors.ReservationError: 1019 raise errors.OpPrereqError("LV named %s used by another instance" % 1020 lv_name, errors.ECODE_NOTUNIQUE) 1021 1022 vg_names = self.rpc.call_vg_list([pnode.uuid])[pnode.uuid] 1023 vg_names.Raise("Cannot get VG information from node %s" % pnode.name, 1024 prereq=True) 1025 1026 node_lvs = self.rpc.call_lv_list([pnode.uuid], 1027 vg_names.payload.keys())[pnode.uuid] 1028 node_lvs.Raise("Cannot get LV information from node %s" % pnode.name, 1029 prereq=True) 1030 node_lvs = node_lvs.payload 1031 1032 delta = all_lvs.difference(node_lvs.keys()) 1033 if delta: 1034 raise errors.OpPrereqError("Missing logical volume(s): %s" % 1035 utils.CommaJoin(delta), 1036 errors.ECODE_INVAL) 1037 online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]] 1038 if online_lvs: 1039 raise errors.OpPrereqError("Online logical volumes found, cannot" 1040 " adopt: %s" % utils.CommaJoin(online_lvs), 1041 errors.ECODE_STATE) 1042 # update the size of disk based on what is found 1043 for dsk in self.disks: 1044 dsk[constants.IDISK_SIZE] = \ 1045 int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG], 1046 dsk[constants.IDISK_ADOPT])][0])) 1047 1048 elif self.op.disk_template == constants.DT_BLOCK: 1049 # Normalize and de-duplicate device paths 1050 all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT]) 1051 for disk in self.disks]) 1052 if len(all_disks) != len(self.disks): 1053 raise errors.OpPrereqError("Duplicate disk names given for adoption", 1054 errors.ECODE_INVAL) 1055 baddisks = [d for d in all_disks 1056 if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)] 1057 if baddisks: 1058 raise errors.OpPrereqError("Device node(s) %s lie outside %s and" 1059 " cannot be adopted" % 1060 (utils.CommaJoin(baddisks), 1061 constants.ADOPTABLE_BLOCKDEV_ROOT), 1062 errors.ECODE_INVAL) 1063 1064 node_disks = self.rpc.call_bdev_sizes([pnode.uuid], 1065 list(all_disks))[pnode.uuid] 1066 node_disks.Raise("Cannot get block device information from node %s" % 1067 pnode.name, prereq=True) 1068 node_disks = node_disks.payload 1069 delta = all_disks.difference(node_disks.keys()) 1070 if delta: 1071 raise errors.OpPrereqError("Missing block device(s): %s" % 1072 utils.CommaJoin(delta), 1073 errors.ECODE_INVAL) 1074 for dsk in self.disks: 1075 dsk[constants.IDISK_SIZE] = \ 1076 int(float(node_disks[dsk[constants.IDISK_ADOPT]])) 1077 1078 # Check disk access param to be compatible with specified hypervisor 1079 node_info = self.cfg.GetNodeInfo(self.op.pnode_uuid) 1080 node_group = self.cfg.GetNodeGroup(node_info.group) 1081 group_disk_params = self.cfg.GetGroupDiskParams(node_group) 1082 group_access_type = group_disk_params[self.op.disk_template].get( 1083 constants.RBD_ACCESS, constants.DISK_KERNELSPACE 1084 ) 1085 for dsk in self.disks: 1086 access_type = dsk.get(constants.IDISK_ACCESS, group_access_type) 1087 if not IsValidDiskAccessModeCombination(self.op.hypervisor, 1088 self.op.disk_template, 1089 access_type): 1090 raise errors.OpPrereqError("Selected hypervisor (%s) cannot be" 1091 " used with %s disk access param" % 1092 (self.op.hypervisor, access_type), 1093 errors.ECODE_STATE) 1094 1095 # Verify instance specs 1096 spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None) 1097 ispec = { 1098 constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None), 1099 constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None), 1100 constants.ISPEC_DISK_COUNT: len(self.disks), 1101 constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE] 1102 for disk in self.disks], 1103 constants.ISPEC_NIC_COUNT: len(self.nics), 1104 constants.ISPEC_SPINDLE_USE: spindle_use, 1105 } 1106 1107 group_info = self.cfg.GetNodeGroup(pnode.group) 1108 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info) 1109 disk_types = [self.op.disk_template] * len(self.disks) 1110 res = ComputeIPolicyInstanceSpecViolation(ipolicy, ispec, disk_types) 1111 if not self.op.ignore_ipolicy and res: 1112 msg = ("Instance allocation to group %s (%s) violates policy: %s" % 1113 (pnode.group, group_info.name, utils.CommaJoin(res))) 1114 raise errors.OpPrereqError(msg, errors.ECODE_INVAL) 1115 1116 CheckHVParams(self, node_uuids, self.op.hypervisor, self.op.hvparams) 1117 1118 CheckOSParams(self, True, node_uuids, self.op.os_type, self.os_full, 1119 self.op.force_variant) 1120 1121 CheckNicsBridgesExist(self, self.nics, self.pnode.uuid) 1122 1123 CheckCompressionTool(self, self.op.compress) 1124 1125 #TODO: _CheckExtParams (remotely) 1126 # Check parameters for extstorage 1127 1128 # memory check on primary node 1129 #TODO(dynmem): use MINMEM for checking 1130 if self.op.start: 1131 hvfull = objects.FillDict(cluster.hvparams.get(self.op.hypervisor, {}), 1132 self.op.hvparams) 1133 CheckNodeFreeMemory(self, self.pnode.uuid, 1134 "creating instance %s" % self.op.instance_name, 1135 self.be_full[constants.BE_MAXMEM], 1136 self.op.hypervisor, hvfull) 1137 1138 self.dry_run_result = list(node_uuids)
1139
1140 - def _RemoveDegradedDisks(self, feedback_fn, disk_abort, instance):
1141 """Removes degraded disks and instance. 1142 1143 It optionally checks whether disks are degraded. If the disks are 1144 degraded, they are removed and the instance is also removed from 1145 the configuration. 1146 1147 If L{disk_abort} is True, then the disks are considered degraded 1148 and removed, and the instance is removed from the configuration. 1149 1150 If L{disk_abort} is False, then it first checks whether disks are 1151 degraded and, if so, it removes the disks and the instance is 1152 removed from the configuration. 1153 1154 @type feedback_fn: callable 1155 @param feedback_fn: function used send feedback back to the caller 1156 1157 @type disk_abort: boolean 1158 @param disk_abort: 1159 True if disks are degraded, False to first check if disks are 1160 degraded 1161 @type instance: L{objects.Instance} 1162 @param instance: instance containing the disks to check 1163 1164 @rtype: NoneType 1165 @return: None 1166 @raise errors.OpPrereqError: if disks are degraded 1167 1168 """ 1169 disk_info = self.cfg.GetInstanceDisks(instance.uuid) 1170 if disk_abort: 1171 pass 1172 elif self.op.wait_for_sync: 1173 disk_abort = not WaitForSync(self, instance) 1174 elif utils.AnyDiskOfType(disk_info, constants.DTS_INT_MIRROR): 1175 # make sure the disks are not degraded (still sync-ing is ok) 1176 feedback_fn("* checking mirrors status") 1177 disk_abort = not WaitForSync(self, instance, oneshot=True) 1178 else: 1179 disk_abort = False 1180 1181 if disk_abort: 1182 RemoveDisks(self, instance) 1183 for disk_uuid in instance.disks: 1184 self.cfg.RemoveInstanceDisk(instance.uuid, disk_uuid) 1185 self.cfg.RemoveInstance(instance.uuid) 1186 raise errors.OpExecError("There are some degraded disks for" 1187 " this instance")
1188
1189 - def RunOsScripts(self, feedback_fn, iobj):
1190 """Run OS scripts 1191 1192 If necessary, disks are paused. It handles instance create, 1193 import, and remote import. 1194 1195 @type feedback_fn: callable 1196 @param feedback_fn: function used send feedback back to the caller 1197 1198 @type iobj: L{objects.Instance} 1199 @param iobj: instance object 1200 1201 """ 1202 if iobj.disks and not self.adopt_disks: 1203 disks = self.cfg.GetInstanceDisks(iobj.uuid) 1204 if self.op.mode == constants.INSTANCE_CREATE: 1205 os_image = objects.GetOSImage(self.op.osparams) 1206 1207 if os_image is None and not self.op.no_install: 1208 pause_sync = (not self.op.wait_for_sync and 1209 utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR)) 1210 if pause_sync: 1211 feedback_fn("* pausing disk sync to install instance OS") 1212 result = self.rpc.call_blockdev_pause_resume_sync(self.pnode.uuid, 1213 (disks, iobj), 1214 True) 1215 for idx, success in enumerate(result.payload): 1216 if not success: 1217 logging.warn("pause-sync of instance %s for disk %d failed", 1218 self.op.instance_name, idx) 1219 1220 feedback_fn("* running the instance OS create scripts...") 1221 # FIXME: pass debug option from opcode to backend 1222 os_add_result = \ 1223 self.rpc.call_instance_os_add(self.pnode.uuid, 1224 (iobj, self.op.osparams_secret), 1225 False, 1226 self.op.debug_level) 1227 if pause_sync: 1228 feedback_fn("* resuming disk sync") 1229 result = self.rpc.call_blockdev_pause_resume_sync(self.pnode.uuid, 1230 (disks, iobj), 1231 False) 1232 for idx, success in enumerate(result.payload): 1233 if not success: 1234 logging.warn("resume-sync of instance %s for disk %d failed", 1235 self.op.instance_name, idx) 1236 1237 os_add_result.Raise("Could not add os for instance %s" 1238 " on node %s" % (self.op.instance_name, 1239 self.pnode.name)) 1240 1241 else: 1242 if self.op.mode == constants.INSTANCE_IMPORT: 1243 feedback_fn("* running the instance OS import scripts...") 1244 1245 transfers = [] 1246 1247 for idx, image in enumerate(self.src_images): 1248 if not image: 1249 continue 1250 1251 if iobj.os: 1252 dst_io = constants.IEIO_SCRIPT 1253 dst_ioargs = ((disks[idx], iobj), idx) 1254 else: 1255 dst_io = constants.IEIO_RAW_DISK 1256 dst_ioargs = (disks[idx], iobj) 1257 1258 # FIXME: pass debug option from opcode to backend 1259 dt = masterd.instance.DiskTransfer("disk/%s" % idx, 1260 constants.IEIO_FILE, (image, ), 1261 dst_io, dst_ioargs, 1262 None) 1263 transfers.append(dt) 1264 1265 import_result = \ 1266 masterd.instance.TransferInstanceData(self, feedback_fn, 1267 self.op.src_node_uuid, 1268 self.pnode.uuid, 1269 self.pnode.secondary_ip, 1270 self.op.compress, 1271 iobj, transfers) 1272 if not compat.all(import_result): 1273 self.LogWarning("Some disks for instance %s on node %s were not" 1274 " imported successfully" % (self.op.instance_name, 1275 self.pnode.name)) 1276 1277 rename_from = self._old_instance_name 1278 1279 elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: 1280 feedback_fn("* preparing remote import...") 1281 # The source cluster will stop the instance before attempting to make 1282 # a connection. In some cases stopping an instance can take a long 1283 # time, hence the shutdown timeout is added to the connection 1284 # timeout. 1285 connect_timeout = (constants.RIE_CONNECT_TIMEOUT + 1286 self.op.source_shutdown_timeout) 1287 timeouts = masterd.instance.ImportExportTimeouts(connect_timeout) 1288 1289 assert iobj.primary_node == self.pnode.uuid 1290 disk_results = \ 1291 masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode, 1292 self.source_x509_ca, 1293 self._cds, self.op.compress, timeouts) 1294 if not compat.all(disk_results): 1295 # TODO: Should the instance still be started, even if some disks 1296 # failed to import (valid for local imports, too)? 1297 self.LogWarning("Some disks for instance %s on node %s were not" 1298 " imported successfully" % (self.op.instance_name, 1299 self.pnode.name)) 1300 1301 rename_from = self.source_instance_name 1302 1303 else: 1304 # also checked in the prereq part 1305 raise errors.ProgrammerError("Unknown OS initialization mode '%s'" 1306 % self.op.mode) 1307 1308 assert iobj.name == self.op.instance_name 1309 1310 # Run rename script on newly imported instance 1311 if iobj.os: 1312 feedback_fn("Running rename script for %s" % self.op.instance_name) 1313 result = self.rpc.call_instance_run_rename(self.pnode.uuid, iobj, 1314 rename_from, 1315 self.op.debug_level) 1316 result.Warn("Failed to run rename script for %s on node %s" % 1317 (self.op.instance_name, self.pnode.name), self.LogWarning)
1318
1319 - def GetOsInstallPackageEnvironment(self, instance, script):
1320 """Returns the OS scripts environment for the helper VM 1321 1322 @type instance: L{objects.Instance} 1323 @param instance: instance for which the OS scripts are run 1324 1325 @type script: string 1326 @param script: script to run (e.g., 1327 constants.OS_SCRIPT_CREATE_UNTRUSTED) 1328 1329 @rtype: dict of string to string 1330 @return: OS scripts environment for the helper VM 1331 1332 """ 1333 env = {"OS_SCRIPT": script} 1334 1335 # We pass only the instance's disks, not the helper VM's disks. 1336 if instance.hypervisor == constants.HT_KVM: 1337 prefix = "/dev/vd" 1338 elif instance.hypervisor in [constants.HT_XEN_PVM, constants.HT_XEN_HVM]: 1339 prefix = "/dev/xvd" 1340 else: 1341 raise errors.OpExecError("Cannot run OS scripts in a virtualized" 1342 " environment for hypervisor '%s'" 1343 % instance.hypervisor) 1344 1345 num_disks = len(self.cfg.GetInstanceDisks(instance.uuid)) 1346 1347 for idx, disk_label in enumerate(utils.GetDiskLabels(prefix, num_disks + 1, 1348 start=1)): 1349 env["DISK_%d_PATH" % idx] = disk_label 1350 1351 return env
1352
1353 - def UpdateInstanceOsInstallPackage(self, feedback_fn, instance, override_env):
1354 """Updates the OS parameter 'os-install-package' for an instance. 1355 1356 The OS install package is an archive containing an OS definition 1357 and a file containing the environment variables needed to run the 1358 OS scripts. 1359 1360 The OS install package is served by the metadata daemon to the 1361 instances, so the OS scripts can run inside the virtualized 1362 environment. 1363 1364 @type feedback_fn: callable 1365 @param feedback_fn: function used send feedback back to the caller 1366 1367 @type instance: L{objects.Instance} 1368 @param instance: instance for which the OS parameter 1369 'os-install-package' is updated 1370 1371 @type override_env: dict of string to string 1372 @param override_env: if supplied, it overrides the environment of 1373 the export OS scripts archive 1374 1375 """ 1376 if "os-install-package" in instance.osparams: 1377 feedback_fn("Using OS install package '%s'" % 1378 instance.osparams["os-install-package"]) 1379 else: 1380 result = self.rpc.call_os_export(instance.primary_node, instance, 1381 override_env) 1382 result.Raise("Could not export OS '%s'" % instance.os) 1383 instance.osparams["os-install-package"] = result.payload 1384 1385 feedback_fn("Created OS install package '%s'" % result.payload)
1386
1387 - def RunOsScriptsVirtualized(self, feedback_fn, instance):
1388 """Runs the OS scripts inside a safe virtualized environment. 1389 1390 The virtualized environment reuses the instance and temporarily 1391 creates a disk onto which the image of the helper VM is dumped. 1392 The temporary disk is used to boot the helper VM. The OS scripts 1393 are passed to the helper VM through the metadata daemon and the OS 1394 install package. 1395 1396 @type feedback_fn: callable 1397 @param feedback_fn: function used send feedback back to the caller 1398 1399 @type instance: L{objects.Instance} 1400 @param instance: instance for which the OS scripts must be run 1401 inside the virtualized environment 1402 1403 """ 1404 install_image = self.cfg.GetInstallImage() 1405 1406 if not install_image: 1407 raise errors.OpExecError("Cannot create install instance because an" 1408 " install image has not been specified") 1409 1410 env = self.GetOsInstallPackageEnvironment( 1411 instance, 1412 constants.OS_SCRIPT_CREATE_UNTRUSTED) 1413 self.UpdateInstanceOsInstallPackage(feedback_fn, instance, env) 1414 UpdateMetadata(feedback_fn, self.rpc, instance, 1415 osparams_private=self.op.osparams_private, 1416 osparams_secret=self.op.osparams_secret) 1417 1418 RunWithHelperVM(self, instance, install_image, 1419 self.op.helper_startup_timeout, 1420 self.op.helper_shutdown_timeout, 1421 log_prefix="Running OS create script", 1422 feedback_fn=feedback_fn)
1423
1424 - def Exec(self, feedback_fn):
1425 """Create and add the instance to the cluster. 1426 1427 """ 1428 assert not (self.owned_locks(locking.LEVEL_NODE_RES) - 1429 self.owned_locks(locking.LEVEL_NODE)), \ 1430 "Node locks differ from node resource locks" 1431 1432 ht_kind = self.op.hypervisor 1433 if ht_kind in constants.HTS_REQ_PORT: 1434 network_port = self.cfg.AllocatePort() 1435 else: 1436 network_port = None 1437 1438 if self.op.commit: 1439 (instance_uuid, _) = self.cfg.ExpandInstanceName(self.op.instance_name) 1440 else: 1441 instance_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId()) 1442 1443 # This is ugly but we got a chicken-egg problem here 1444 # We can only take the group disk parameters, as the instance 1445 # has no disks yet (we are generating them right here). 1446 nodegroup = self.cfg.GetNodeGroup(self.pnode.group) 1447 1448 if self.op.commit: 1449 disks = self.cfg.GetInstanceDisks(instance_uuid) 1450 CommitDisks(disks) 1451 else: 1452 disks = GenerateDiskTemplate(self, 1453 self.op.disk_template, 1454 instance_uuid, self.pnode.uuid, 1455 self.secondaries, 1456 self.disks, 1457 self.instance_file_storage_dir, 1458 self.op.file_driver, 1459 0, 1460 feedback_fn, 1461 self.cfg.GetGroupDiskParams(nodegroup), 1462 forthcoming=self.op.forthcoming) 1463 1464 if self.op.os_type is None: 1465 os_type = "" 1466 else: 1467 os_type = self.op.os_type 1468 1469 iobj = objects.Instance(name=self.op.instance_name, 1470 uuid=instance_uuid, 1471 os=os_type, 1472 primary_node=self.pnode.uuid, 1473 nics=self.nics, disks=[], 1474 disk_template=self.op.disk_template, 1475 disks_active=False, 1476 admin_state=constants.ADMINST_DOWN, 1477 admin_state_source=constants.ADMIN_SOURCE, 1478 network_port=network_port, 1479 beparams=self.op.beparams, 1480 hvparams=self.op.hvparams, 1481 hypervisor=self.op.hypervisor, 1482 osparams=self.op.osparams, 1483 osparams_private=self.op.osparams_private, 1484 forthcoming=self.op.forthcoming, 1485 ) 1486 1487 if self.op.tags: 1488 for tag in self.op.tags: 1489 iobj.AddTag(tag) 1490 1491 if self.adopt_disks: 1492 if self.op.disk_template == constants.DT_PLAIN: 1493 # rename LVs to the newly-generated names; we need to construct 1494 # 'fake' LV disks with the old data, plus the new unique_id 1495 tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks] 1496 rename_to = [] 1497 for t_dsk, a_dsk in zip(tmp_disks, self.disks): 1498 rename_to.append(t_dsk.logical_id) 1499 t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT]) 1500 result = self.rpc.call_blockdev_rename(self.pnode.uuid, 1501 zip(tmp_disks, rename_to)) 1502 result.Raise("Failed to rename adoped LVs") 1503 elif self.op.forthcoming: 1504 feedback_fn("Instance is forthcoming, not creating disks") 1505 else: 1506 feedback_fn("* creating instance disks...") 1507 try: 1508 CreateDisks(self, iobj, disks=disks) 1509 except errors.OpExecError: 1510 self.LogWarning("Device creation failed") 1511 for disk in disks: 1512 self.cfg.ReleaseDRBDMinors(disk.uuid) 1513 raise 1514 1515 feedback_fn("adding instance %s to cluster config" % self.op.instance_name) 1516 self.cfg.AddInstance(iobj, self.proc.GetECId(), replace=self.op.commit) 1517 1518 feedback_fn("adding disks to cluster config") 1519 for disk in disks: 1520 self.cfg.AddInstanceDisk(iobj.uuid, disk, replace=self.op.commit) 1521 1522 if self.op.forthcoming: 1523 feedback_fn("Instance is forthcoming; not creating the actual instance") 1524 return self.cfg.GetNodeNames(list(self.cfg.GetInstanceNodes(iobj.uuid))) 1525 1526 # re-read the instance from the configuration 1527 iobj = self.cfg.GetInstanceInfo(iobj.uuid) 1528 1529 if self.op.mode == constants.INSTANCE_IMPORT: 1530 # Release unused nodes 1531 ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node_uuid]) 1532 else: 1533 # Release all nodes 1534 ReleaseLocks(self, locking.LEVEL_NODE) 1535 1536 # Wipe disks 1537 disk_abort = False 1538 if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks: 1539 feedback_fn("* wiping instance disks...") 1540 try: 1541 WipeDisks(self, iobj) 1542 except errors.OpExecError, err: 1543 logging.exception("Wiping disks failed") 1544 self.LogWarning("Wiping instance disks failed (%s)", err) 1545 disk_abort = True 1546 1547 self._RemoveDegradedDisks(feedback_fn, disk_abort, iobj) 1548 1549 # Image disks 1550 os_image = objects.GetOSImage(iobj.osparams) 1551 disk_abort = False 1552 1553 if not self.adopt_disks and os_image is not None: 1554 feedback_fn("* imaging instance disks...") 1555 try: 1556 ImageDisks(self, iobj, os_image) 1557 except errors.OpExecError, err: 1558 logging.exception("Imaging disks failed") 1559 self.LogWarning("Imaging instance disks failed (%s)", err) 1560 disk_abort = True 1561 1562 self._RemoveDegradedDisks(feedback_fn, disk_abort, iobj) 1563 1564 # instance disks are now active 1565 iobj.disks_active = True 1566 1567 # Release all node resource locks 1568 ReleaseLocks(self, locking.LEVEL_NODE_RES) 1569 1570 if iobj.os: 1571 result = self.rpc.call_os_diagnose([iobj.primary_node])[iobj.primary_node] 1572 result.Raise("Failed to get OS '%s'" % iobj.os) 1573 1574 trusted = None 1575 1576 for (name, _, _, _, _, _, _, os_trusted) in result.payload: 1577 if name == objects.OS.GetName(iobj.os): 1578 trusted = os_trusted 1579 break 1580 1581 if trusted is None: 1582 raise errors.OpPrereqError("OS '%s' is not available in node '%s'" % 1583 (iobj.os, iobj.primary_node)) 1584 elif trusted: 1585 self.RunOsScripts(feedback_fn, iobj) 1586 else: 1587 self.RunOsScriptsVirtualized(feedback_fn, iobj) 1588 # Instance is modified by 'RunOsScriptsVirtualized', 1589 # therefore, it must be retrieved once again from the 1590 # configuration, otherwise there will be a config object 1591 # version mismatch. 1592 iobj = self.cfg.GetInstanceInfo(iobj.uuid) 1593 1594 # Update instance metadata so that it can be reached from the 1595 # metadata service. 1596 UpdateMetadata(feedback_fn, self.rpc, iobj, 1597 osparams_private=self.op.osparams_private, 1598 osparams_secret=self.op.osparams_secret) 1599 1600 assert not self.owned_locks(locking.LEVEL_NODE_RES) 1601 1602 if self.op.start: 1603 iobj.admin_state = constants.ADMINST_UP 1604 self.cfg.Update(iobj, feedback_fn) 1605 logging.info("Starting instance %s on node %s", self.op.instance_name, 1606 self.pnode.name) 1607 feedback_fn("* starting instance...") 1608 result = self.rpc.call_instance_start(self.pnode.uuid, (iobj, None, None), 1609 False, self.op.reason) 1610 result.Raise("Could not start instance") 1611 1612 return self.cfg.GetNodeNames(list(self.cfg.GetInstanceNodes(iobj.uuid)))
1613
1614 - def PrepareRetry(self, feedback_fn):
1615 # A temporary lack of resources can only happen if opportunistic locking 1616 # is used. 1617 assert self.op.opportunistic_locking 1618 1619 logging.info("Opportunistic locking did not suceed, falling back to" 1620 " full lock allocation") 1621 feedback_fn("* falling back to full lock allocation") 1622 self.op.opportunistic_locking = False
1623