Package ganeti :: Package cmdlib :: Module instance_migration
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_migration

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Logical units dealing with instance migration an failover.""" 
 32   
 33  import logging 
 34  import time 
 35   
 36  from ganeti import constants 
 37  from ganeti import errors 
 38  from ganeti import locking 
 39  from ganeti import hypervisor 
 40  from ganeti.masterd import iallocator 
 41  from ganeti import utils 
 42  from ganeti.cmdlib.base import LogicalUnit, Tasklet 
 43  from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \ 
 44    CheckIAllocatorOrNode, ExpandNodeUuidAndName 
 45  from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \ 
 46    ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks 
 47  from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \ 
 48    CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \ 
 49    CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist 
 50   
 51  import ganeti.masterd.instance 
 52   
 53   
54 -def _ExpandNamesForMigration(lu):
55 """Expands names for use with L{TLMigrateInstance}. 56 57 @type lu: L{LogicalUnit} 58 59 """ 60 if lu.op.target_node is not None: 61 (lu.op.target_node_uuid, lu.op.target_node) = \ 62 ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node) 63 64 lu.needed_locks[locking.LEVEL_NODE] = [] 65 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 66 lu.dont_collate_locks[locking.LEVEL_NODE] = True 67 68 lu.needed_locks[locking.LEVEL_NODE_RES] = [] 69 lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE 70 lu.dont_collate_locks[locking.LEVEL_NODE_RES] = True 71 72 # The node allocation lock is actually only needed for externally replicated 73 # instances (e.g. sharedfile or RBD) and if an iallocator is used. 74 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = [] 75 lu.dont_collate_locks[locking.LEVEL_NODE_ALLOC] = True
76 77
78 -def _DeclareLocksForMigration(lu, level):
79 """Declares locks for L{TLMigrateInstance}. 80 81 @type lu: L{LogicalUnit} 82 @param level: Lock level 83 84 """ 85 if level == locking.LEVEL_NODE_ALLOC: 86 assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE) 87 88 instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid) 89 90 # Node locks are already declared here rather than at LEVEL_NODE as we need 91 # the instance object anyway to declare the node allocation lock. 92 if instance.disk_template in constants.DTS_EXT_MIRROR: 93 if lu.op.target_node is None: 94 lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 95 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 96 else: 97 lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, 98 lu.op.target_node_uuid] 99 del lu.recalculate_locks[locking.LEVEL_NODE] 100 else: 101 lu._LockInstancesNodes() # pylint: disable=W0212 102 103 elif level == locking.LEVEL_NODE: 104 # Node locks are declared together with the node allocation lock 105 assert (lu.needed_locks[locking.LEVEL_NODE] or 106 lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET) 107 108 elif level == locking.LEVEL_NODE_RES: 109 # Copy node locks 110 lu.needed_locks[locking.LEVEL_NODE_RES] = \ 111 CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
112 113
114 -class LUInstanceFailover(LogicalUnit):
115 """Failover an instance. 116 117 This is migration by shutting the instance down, but with the disks 118 of the instance already available on the new node. 119 120 See also: 121 L{LUInstanceMove} for moving an instance by copying the data. 122 123 L{LUInstanceMigrate} for the live migration of an instance (no shutdown 124 required). 125 """ 126 HPATH = "instance-failover" 127 HTYPE = constants.HTYPE_INSTANCE 128 REQ_BGL = False 129
130 - def CheckArguments(self):
131 """Check the arguments. 132 133 """ 134 self.iallocator = getattr(self.op, "iallocator", None) 135 self.target_node = getattr(self.op, "target_node", None)
136
137 - def ExpandNames(self):
138 self._ExpandAndLockInstance() 139 _ExpandNamesForMigration(self) 140 141 self._migrater = \ 142 TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name, 143 self.op.cleanup, True, False, 144 self.op.ignore_consistency, True, 145 self.op.shutdown_timeout, self.op.ignore_ipolicy, True) 146 147 self.tasklets = [self._migrater]
148
149 - def DeclareLocks(self, level):
150 _DeclareLocksForMigration(self, level)
151
152 - def BuildHooksEnv(self):
153 """Build hooks env. 154 155 This runs on master, primary and secondary nodes of the instance. 156 157 """ 158 instance = self._migrater.instance 159 source_node_uuid = instance.primary_node 160 target_node_uuid = self._migrater.target_node_uuid 161 env = { 162 "IGNORE_CONSISTENCY": self.op.ignore_consistency, 163 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, 164 "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid), 165 "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid), 166 "FAILOVER_CLEANUP": self.op.cleanup, 167 } 168 169 if instance.disk_template in constants.DTS_INT_MIRROR: 170 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 171 env["OLD_SECONDARY"] = self.cfg.GetNodeName(secondary_nodes[0]) 172 env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid) 173 else: 174 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = "" 175 176 env.update(BuildInstanceHookEnvByObject(self, instance)) 177 178 return env
179
180 - def BuildHooksNodes(self):
181 """Build hooks nodes. 182 183 """ 184 instance = self._migrater.instance 185 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 186 nl = [self.cfg.GetMasterNode()] + list(secondary_nodes) 187 nl.append(self._migrater.target_node_uuid) 188 return (nl, nl + [instance.primary_node])
189 190
191 -class LUInstanceMigrate(LogicalUnit):
192 """Migrate an instance. 193 194 This is migration without shutting down (live migration) and the disks are 195 already available on the new node. 196 197 See also: 198 L{LUInstanceMove} for moving an instance by copying the data. 199 200 L{LUInstanceFailover} for the migration of an instance where a shutdown is 201 required. 202 """ 203 HPATH = "instance-migrate" 204 HTYPE = constants.HTYPE_INSTANCE 205 REQ_BGL = False 206
207 - def ExpandNames(self):
208 self._ExpandAndLockInstance() 209 _ExpandNamesForMigration(self) 210 211 self._migrater = \ 212 TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name, 213 self.op.cleanup, False, self.op.allow_failover, False, 214 self.op.allow_runtime_changes, 215 constants.DEFAULT_SHUTDOWN_TIMEOUT, 216 self.op.ignore_ipolicy, self.op.ignore_hvversions) 217 218 self.tasklets = [self._migrater]
219
220 - def DeclareLocks(self, level):
221 _DeclareLocksForMigration(self, level)
222
223 - def BuildHooksEnv(self):
224 """Build hooks env. 225 226 This runs on master, primary and secondary nodes of the instance. 227 228 """ 229 instance = self._migrater.instance 230 source_node_uuid = instance.primary_node 231 target_node_uuid = self._migrater.target_node_uuid 232 env = BuildInstanceHookEnvByObject(self, instance) 233 env.update({ 234 "MIGRATE_LIVE": self._migrater.live, 235 "MIGRATE_CLEANUP": self.op.cleanup, 236 "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid), 237 "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid), 238 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 239 }) 240 241 if instance.disk_template in constants.DTS_INT_MIRROR: 242 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 243 env["OLD_SECONDARY"] = self.cfg.GetNodeName(secondary_nodes[0]) 244 env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid) 245 else: 246 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = "" 247 248 return env
249
250 - def BuildHooksNodes(self):
251 """Build hooks nodes. 252 253 """ 254 instance = self._migrater.instance 255 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 256 snode_uuids = list(secondary_nodes) 257 nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids 258 nl.append(self._migrater.target_node_uuid) 259 return (nl, nl)
260 261
262 -class TLMigrateInstance(Tasklet):
263 """Tasklet class for instance migration. 264 265 @type live: boolean 266 @ivar live: whether the migration will be done live or non-live; 267 this variable is initalized only after CheckPrereq has run 268 @type cleanup: boolean 269 @ivar cleanup: Wheater we cleanup from a failed migration 270 @type iallocator: string 271 @ivar iallocator: The iallocator used to determine target_node 272 @type target_node_uuid: string 273 @ivar target_node_uuid: If given, the target node UUID to reallocate the 274 instance to 275 @type failover: boolean 276 @ivar failover: Whether operation results in failover or migration 277 @type fallback: boolean 278 @ivar fallback: Whether fallback to failover is allowed if migration not 279 possible 280 @type ignore_consistency: boolean 281 @ivar ignore_consistency: Wheter we should ignore consistency between source 282 and target node 283 @type shutdown_timeout: int 284 @ivar shutdown_timeout: In case of failover timeout of the shutdown 285 @type ignore_ipolicy: bool 286 @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating 287 @type ignore_hvversions: bool 288 @ivar ignore_hvversions: If true, accept incompatible hypervisor versions 289 290 """ 291 292 # Constants 293 _MIGRATION_POLL_INTERVAL = 1 # seconds 294 _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds 295
296 - def __init__(self, lu, instance_uuid, instance_name, cleanup, failover, 297 fallback, ignore_consistency, allow_runtime_changes, 298 shutdown_timeout, ignore_ipolicy, ignore_hvversions):
299 """Initializes this class. 300 301 """ 302 Tasklet.__init__(self, lu) 303 304 # Parameters 305 self.instance_uuid = instance_uuid 306 self.instance_name = instance_name 307 self.cleanup = cleanup 308 self.live = False # will be overridden later 309 self.failover = failover 310 self.fallback = fallback 311 self.ignore_consistency = ignore_consistency 312 self.shutdown_timeout = shutdown_timeout 313 self.ignore_ipolicy = ignore_ipolicy 314 self.allow_runtime_changes = allow_runtime_changes 315 self.ignore_hvversions = ignore_hvversions
316
317 - def CheckPrereq(self):
318 """Check prerequisites. 319 320 This checks that the instance is in the cluster. 321 322 """ 323 (self.instance_uuid, self.instance_name) = \ 324 ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid, 325 self.instance_name) 326 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 327 assert self.instance is not None 328 cluster = self.cfg.GetClusterInfo() 329 330 if (not self.cleanup and 331 not self.instance.admin_state == constants.ADMINST_UP and 332 not self.failover and self.fallback): 333 self.lu.LogInfo("Instance is marked down or offline, fallback allowed," 334 " switching to failover") 335 self.failover = True 336 337 if self.instance.disk_template not in constants.DTS_MIRRORED: 338 if self.failover: 339 text = "failovers" 340 else: 341 text = "migrations" 342 raise errors.OpPrereqError("Instance's disk layout '%s' does not allow" 343 " %s" % (self.instance.disk_template, text), 344 errors.ECODE_STATE) 345 346 if self.instance.disk_template in constants.DTS_EXT_MIRROR: 347 CheckIAllocatorOrNode(self.lu, "iallocator", "target_node") 348 349 if self.lu.op.iallocator: 350 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC) 351 self._RunAllocator() 352 else: 353 # We set set self.target_node_uuid as it is required by 354 # BuildHooksEnv 355 self.target_node_uuid = self.lu.op.target_node_uuid 356 357 # Check that the target node is correct in terms of instance policy 358 nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid) 359 group_info = self.cfg.GetNodeGroup(nodeinfo.group) 360 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 361 group_info) 362 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo, 363 self.cfg, ignore=self.ignore_ipolicy) 364 365 # self.target_node is already populated, either directly or by the 366 # iallocator run 367 target_node_uuid = self.target_node_uuid 368 if self.target_node_uuid == self.instance.primary_node: 369 raise errors.OpPrereqError( 370 "Cannot migrate instance %s to its primary (%s)" % 371 (self.instance.name, 372 self.cfg.GetNodeName(self.instance.primary_node)), 373 errors.ECODE_STATE) 374 375 if len(self.lu.tasklets) == 1: 376 # It is safe to release locks only when we're the only tasklet 377 # in the LU 378 ReleaseLocks(self.lu, locking.LEVEL_NODE, 379 keep=[self.instance.primary_node, self.target_node_uuid]) 380 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) 381 382 else: 383 secondary_node_uuids = \ 384 self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 385 if not secondary_node_uuids: 386 raise errors.ConfigurationError("No secondary node but using" 387 " %s disk template" % 388 self.instance.disk_template) 389 self.target_node_uuid = target_node_uuid = secondary_node_uuids[0] 390 if self.lu.op.iallocator or \ 391 (self.lu.op.target_node_uuid and 392 self.lu.op.target_node_uuid != target_node_uuid): 393 if self.failover: 394 text = "failed over" 395 else: 396 text = "migrated" 397 raise errors.OpPrereqError("Instances with disk template %s cannot" 398 " be %s to arbitrary nodes" 399 " (neither an iallocator nor a target" 400 " node can be passed)" % 401 (self.instance.disk_template, text), 402 errors.ECODE_INVAL) 403 nodeinfo = self.cfg.GetNodeInfo(target_node_uuid) 404 group_info = self.cfg.GetNodeGroup(nodeinfo.group) 405 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 406 group_info) 407 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo, 408 self.cfg, ignore=self.ignore_ipolicy) 409 410 i_be = cluster.FillBE(self.instance) 411 412 # check memory requirements on the secondary node 413 if (not self.cleanup and 414 (not self.failover or 415 self.instance.admin_state == constants.ADMINST_UP)): 416 self.tgt_free_mem = CheckNodeFreeMemory( 417 self.lu, target_node_uuid, 418 "migrating instance %s" % self.instance.name, 419 i_be[constants.BE_MINMEM], self.instance.hypervisor, 420 self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor]) 421 else: 422 self.lu.LogInfo("Not checking memory on the secondary node as" 423 " instance will not be started") 424 425 # check if failover must be forced instead of migration 426 if (not self.cleanup and not self.failover and 427 i_be[constants.BE_ALWAYS_FAILOVER]): 428 self.lu.LogInfo("Instance configured to always failover; fallback" 429 " to failover") 430 self.failover = True 431 432 # check bridge existance 433 CheckInstanceBridgesExist(self.lu, self.instance, 434 node_uuid=target_node_uuid) 435 436 if not self.cleanup: 437 CheckNodeNotDrained(self.lu, target_node_uuid) 438 if not self.failover: 439 result = self.rpc.call_instance_migratable(self.instance.primary_node, 440 self.instance) 441 if result.fail_msg and self.fallback: 442 self.lu.LogInfo("Can't migrate, instance offline, fallback to" 443 " failover") 444 self.failover = True 445 else: 446 result.Raise("Can't migrate, please use failover", 447 prereq=True, ecode=errors.ECODE_STATE) 448 449 assert not (self.failover and self.cleanup) 450 451 if not self.failover: 452 if self.lu.op.live is not None and self.lu.op.mode is not None: 453 raise errors.OpPrereqError("Only one of the 'live' and 'mode'" 454 " parameters are accepted", 455 errors.ECODE_INVAL) 456 if self.lu.op.live is not None: 457 if self.lu.op.live: 458 self.lu.op.mode = constants.HT_MIGRATION_LIVE 459 else: 460 self.lu.op.mode = constants.HT_MIGRATION_NONLIVE 461 # reset the 'live' parameter to None so that repeated 462 # invocations of CheckPrereq do not raise an exception 463 self.lu.op.live = None 464 elif self.lu.op.mode is None: 465 # read the default value from the hypervisor 466 i_hv = cluster.FillHV(self.instance, skip_globals=False) 467 self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] 468 469 self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE 470 else: 471 # Failover is never live 472 self.live = False 473 474 if not (self.failover or self.cleanup): 475 remote_info = self.rpc.call_instance_info( 476 self.instance.primary_node, self.instance.name, 477 self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor]) 478 remote_info.Raise("Error checking instance on node %s" % 479 self.cfg.GetNodeName(self.instance.primary_node), 480 prereq=True) 481 instance_running = bool(remote_info.payload) 482 if instance_running: 483 self.current_mem = int(remote_info.payload["memory"])
484
485 - def _RunAllocator(self):
486 """Run the allocator based on input opcode. 487 488 """ 489 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC) 490 491 # FIXME: add a self.ignore_ipolicy option 492 req = iallocator.IAReqRelocate( 493 inst_uuid=self.instance_uuid, 494 relocate_from_node_uuids=[self.instance.primary_node]) 495 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 496 497 ial.Run(self.lu.op.iallocator) 498 499 if not ial.success: 500 raise errors.OpPrereqError("Can't compute nodes using" 501 " iallocator '%s': %s" % 502 (self.lu.op.iallocator, ial.info), 503 errors.ECODE_NORES) 504 self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid 505 self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 506 self.instance_name, self.lu.op.iallocator, 507 utils.CommaJoin(ial.result))
508
509 - def _WaitUntilSync(self):
510 """Poll with custom rpc for disk sync. 511 512 This uses our own step-based rpc call. 513 514 """ 515 self.feedback_fn("* wait until resync is done") 516 all_done = False 517 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 518 while not all_done: 519 all_done = True 520 result = self.rpc.call_drbd_wait_sync(self.all_node_uuids, 521 (disks, self.instance)) 522 min_percent = 100 523 for node_uuid, nres in result.items(): 524 nres.Raise("Cannot resync disks on node %s" % 525 self.cfg.GetNodeName(node_uuid)) 526 node_done, node_percent = nres.payload 527 all_done = all_done and node_done 528 if node_percent is not None: 529 min_percent = min(min_percent, node_percent) 530 if not all_done: 531 if min_percent < 100: 532 self.feedback_fn(" - progress: %.1f%%" % min_percent) 533 time.sleep(2)
534
535 - def _EnsureSecondary(self, node_uuid):
536 """Demote a node to secondary. 537 538 """ 539 self.feedback_fn("* switching node %s to secondary mode" % 540 self.cfg.GetNodeName(node_uuid)) 541 542 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 543 result = self.rpc.call_blockdev_close(node_uuid, self.instance.name, 544 (disks, self.instance)) 545 result.Raise("Cannot change disk to secondary on node %s" % 546 self.cfg.GetNodeName(node_uuid))
547
548 - def _GoStandalone(self):
549 """Disconnect from the network. 550 551 """ 552 self.feedback_fn("* changing into standalone mode") 553 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 554 result = self.rpc.call_drbd_disconnect_net( 555 self.all_node_uuids, (disks, self.instance)) 556 for node_uuid, nres in result.items(): 557 nres.Raise("Cannot disconnect disks node %s" % 558 self.cfg.GetNodeName(node_uuid))
559
560 - def _GoReconnect(self, multimaster):
561 """Reconnect to the network. 562 563 """ 564 if multimaster: 565 msg = "dual-master" 566 else: 567 msg = "single-master" 568 self.feedback_fn("* changing disks into %s mode" % msg) 569 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 570 result = self.rpc.call_drbd_attach_net(self.all_node_uuids, 571 (disks, self.instance), 572 self.instance.name, multimaster) 573 for node_uuid, nres in result.items(): 574 nres.Raise("Cannot change disks config on node %s" % 575 self.cfg.GetNodeName(node_uuid))
576
577 - def _ExecCleanup(self):
578 """Try to cleanup after a failed migration. 579 580 The cleanup is done by: 581 - check that the instance is running only on one node 582 (and update the config if needed) 583 - change disks on its secondary node to secondary 584 - wait until disks are fully synchronized 585 - disconnect from the network 586 - change disks into single-master mode 587 - wait again until disks are fully synchronized 588 589 """ 590 # check running on only one node 591 self.feedback_fn("* checking where the instance actually runs" 592 " (if this hangs, the hypervisor might be in" 593 " a bad state)") 594 cluster_hvparams = self.cfg.GetClusterInfo().hvparams 595 ins_l = self.rpc.call_instance_list(self.all_node_uuids, 596 [self.instance.hypervisor], 597 cluster_hvparams) 598 for node_uuid, result in ins_l.items(): 599 result.Raise("Can't contact node %s" % node_uuid) 600 601 runningon_source = self.instance.name in \ 602 ins_l[self.source_node_uuid].payload 603 runningon_target = self.instance.name in \ 604 ins_l[self.target_node_uuid].payload 605 606 if runningon_source and runningon_target: 607 raise errors.OpExecError("Instance seems to be running on two nodes," 608 " or the hypervisor is confused; you will have" 609 " to ensure manually that it runs only on one" 610 " and restart this operation") 611 612 if not (runningon_source or runningon_target): 613 raise errors.OpExecError("Instance does not seem to be running at all;" 614 " in this case it's safer to repair by" 615 " running 'gnt-instance stop' to ensure disk" 616 " shutdown, and then restarting it") 617 618 if runningon_target: 619 # the migration has actually succeeded, we need to update the config 620 self.feedback_fn("* instance running on secondary node (%s)," 621 " updating config" % 622 self.cfg.GetNodeName(self.target_node_uuid)) 623 self.cfg.SetInstancePrimaryNode(self.instance.uuid, 624 self.target_node_uuid) 625 demoted_node_uuid = self.source_node_uuid 626 else: 627 self.feedback_fn("* instance confirmed to be running on its" 628 " primary node (%s)" % 629 self.cfg.GetNodeName(self.source_node_uuid)) 630 demoted_node_uuid = self.target_node_uuid 631 632 if self.instance.disk_template in constants.DTS_INT_MIRROR: 633 self._EnsureSecondary(demoted_node_uuid) 634 try: 635 self._WaitUntilSync() 636 except errors.OpExecError: 637 # we ignore here errors, since if the device is standalone, it 638 # won't be able to sync 639 pass 640 self._GoStandalone() 641 self._GoReconnect(False) 642 self._WaitUntilSync() 643 644 self.feedback_fn("* done")
645
646 - def _RevertDiskStatus(self):
647 """Try to revert the disk status after a failed migration. 648 649 """ 650 if self.instance.disk_template in constants.DTS_EXT_MIRROR: 651 return 652 653 try: 654 self._EnsureSecondary(self.target_node_uuid) 655 self._GoStandalone() 656 self._GoReconnect(False) 657 self._WaitUntilSync() 658 except errors.OpExecError, err: 659 self.lu.LogWarning("Migration failed and I can't reconnect the drives," 660 " please try to recover the instance manually;" 661 " error '%s'" % str(err))
662
663 - def _AbortMigration(self):
664 """Call the hypervisor code to abort a started migration. 665 666 """ 667 abort_result = self.rpc.call_instance_finalize_migration_dst( 668 self.target_node_uuid, self.instance, self.migration_info, 669 False) 670 abort_msg = abort_result.fail_msg 671 if abort_msg: 672 logging.error("Aborting migration failed on target node %s: %s", 673 self.cfg.GetNodeName(self.target_node_uuid), abort_msg) 674 # Don't raise an exception here, as we stil have to try to revert the 675 # disk status, even if this step failed. 676 677 abort_result = self.rpc.call_instance_finalize_migration_src( 678 self.source_node_uuid, self.instance, False, self.live) 679 abort_msg = abort_result.fail_msg 680 if abort_msg: 681 logging.error("Aborting migration failed on source node %s: %s", 682 self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
683
684 - def _ExecMigration(self):
685 """Migrate an instance. 686 687 The migrate is done by: 688 - change the disks into dual-master mode 689 - wait until disks are fully synchronized again 690 - migrate the instance 691 - change disks on the new secondary node (the old primary) to secondary 692 - wait until disks are fully synchronized 693 - change disks into single-master mode 694 695 """ 696 # Check for hypervisor version mismatch and warn the user. 697 hvspecs = [(self.instance.hypervisor, 698 self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])] 699 nodeinfo = self.rpc.call_node_info( 700 [self.source_node_uuid, self.target_node_uuid], None, hvspecs) 701 for ninfo in nodeinfo.values(): 702 ninfo.Raise("Unable to retrieve node information from node '%s'" % 703 ninfo.node) 704 (_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload 705 (_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload 706 707 if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and 708 (constants.HV_NODEINFO_KEY_VERSION in dst_info)): 709 src_version = src_info[constants.HV_NODEINFO_KEY_VERSION] 710 dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION] 711 if src_version != dst_version: 712 self.feedback_fn("* warning: hypervisor version mismatch between" 713 " source (%s) and target (%s) node" % 714 (src_version, dst_version)) 715 hv = hypervisor.GetHypervisorClass(self.instance.hypervisor) 716 if hv.VersionsSafeForMigration(src_version, dst_version): 717 self.feedback_fn(" migrating from hypervisor version %s to %s should" 718 " be safe" % (src_version, dst_version)) 719 else: 720 self.feedback_fn(" migrating from hypervisor version %s to %s is" 721 " likely unsupported" % (src_version, dst_version)) 722 if self.ignore_hvversions: 723 self.feedback_fn(" continuing anyway (told to ignore version" 724 " mismatch)") 725 else: 726 raise errors.OpExecError("Unsupported migration between hypervisor" 727 " versions (%s to %s)" % 728 (src_version, dst_version)) 729 730 self.feedback_fn("* checking disk consistency between source and target") 731 for (idx, dev) in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)): 732 if not CheckDiskConsistency(self.lu, self.instance, dev, 733 self.target_node_uuid, 734 False): 735 raise errors.OpExecError("Disk %s is degraded or not fully" 736 " synchronized on target node," 737 " aborting migration" % idx) 738 739 if self.current_mem > self.tgt_free_mem: 740 if not self.allow_runtime_changes: 741 raise errors.OpExecError("Memory ballooning not allowed and not enough" 742 " free memory to fit instance %s on target" 743 " node %s (have %dMB, need %dMB)" % 744 (self.instance.name, 745 self.cfg.GetNodeName(self.target_node_uuid), 746 self.tgt_free_mem, self.current_mem)) 747 self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem) 748 rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node, 749 self.instance, 750 self.tgt_free_mem) 751 rpcres.Raise("Cannot modify instance runtime memory") 752 753 # First get the migration information from the remote node 754 result = self.rpc.call_migration_info(self.source_node_uuid, self.instance) 755 msg = result.fail_msg 756 if msg: 757 log_err = ("Failed fetching source migration information from %s: %s" % 758 (self.cfg.GetNodeName(self.source_node_uuid), msg)) 759 logging.error(log_err) 760 raise errors.OpExecError(log_err) 761 762 self.migration_info = migration_info = result.payload 763 764 if self.instance.disk_template not in constants.DTS_EXT_MIRROR: 765 # Then switch the disks to master/master mode 766 self._EnsureSecondary(self.target_node_uuid) 767 self._GoStandalone() 768 self._GoReconnect(True) 769 self._WaitUntilSync() 770 771 self.feedback_fn("* preparing %s to accept the instance" % 772 self.cfg.GetNodeName(self.target_node_uuid)) 773 result = self.rpc.call_accept_instance(self.target_node_uuid, 774 self.instance, 775 migration_info, 776 self.nodes_ip[self.target_node_uuid]) 777 778 msg = result.fail_msg 779 if msg: 780 logging.error("Instance pre-migration failed, trying to revert" 781 " disk status: %s", msg) 782 self.feedback_fn("Pre-migration failed, aborting") 783 self._AbortMigration() 784 self._RevertDiskStatus() 785 raise errors.OpExecError("Could not pre-migrate instance %s: %s" % 786 (self.instance.name, msg)) 787 788 self.feedback_fn("* migrating instance to %s" % 789 self.cfg.GetNodeName(self.target_node_uuid)) 790 cluster = self.cfg.GetClusterInfo() 791 result = self.rpc.call_instance_migrate( 792 self.source_node_uuid, cluster.cluster_name, self.instance, 793 self.nodes_ip[self.target_node_uuid], self.live) 794 msg = result.fail_msg 795 if msg: 796 logging.error("Instance migration failed, trying to revert" 797 " disk status: %s", msg) 798 self.feedback_fn("Migration failed, aborting") 799 self._AbortMigration() 800 self._RevertDiskStatus() 801 raise errors.OpExecError("Could not migrate instance %s: %s" % 802 (self.instance.name, msg)) 803 804 self.feedback_fn("* starting memory transfer") 805 last_feedback = time.time() 806 while True: 807 result = self.rpc.call_instance_get_migration_status( 808 self.source_node_uuid, self.instance) 809 msg = result.fail_msg 810 ms = result.payload # MigrationStatus instance 811 if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES): 812 logging.error("Instance migration failed, trying to revert" 813 " disk status: %s", msg) 814 self.feedback_fn("Migration failed, aborting") 815 self._AbortMigration() 816 self._RevertDiskStatus() 817 if not msg: 818 msg = "hypervisor returned failure" 819 raise errors.OpExecError("Could not migrate instance %s: %s" % 820 (self.instance.name, msg)) 821 822 if result.payload.status != constants.HV_MIGRATION_ACTIVE: 823 self.feedback_fn("* memory transfer complete") 824 break 825 826 if (utils.TimeoutExpired(last_feedback, 827 self._MIGRATION_FEEDBACK_INTERVAL) and 828 ms.transferred_ram is not None): 829 mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram) 830 self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress) 831 last_feedback = time.time() 832 833 time.sleep(self._MIGRATION_POLL_INTERVAL) 834 835 result = self.rpc.call_instance_finalize_migration_src( 836 self.source_node_uuid, self.instance, True, self.live) 837 msg = result.fail_msg 838 if msg: 839 logging.error("Instance migration succeeded, but finalization failed" 840 " on the source node: %s", msg) 841 raise errors.OpExecError("Could not finalize instance migration: %s" % 842 msg) 843 844 self.cfg.SetInstancePrimaryNode(self.instance.uuid, self.target_node_uuid) 845 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 846 847 result = self.rpc.call_instance_finalize_migration_dst( 848 self.target_node_uuid, self.instance, migration_info, True) 849 msg = result.fail_msg 850 if msg: 851 logging.error("Instance migration succeeded, but finalization failed" 852 " on the target node: %s", msg) 853 raise errors.OpExecError("Could not finalize instance migration: %s" % 854 msg) 855 856 if self.instance.disk_template not in constants.DTS_EXT_MIRROR: 857 self._EnsureSecondary(self.source_node_uuid) 858 self._WaitUntilSync() 859 self._GoStandalone() 860 self._GoReconnect(False) 861 self._WaitUntilSync() 862 863 # If the instance's disk template is `rbd' or `ext' and there was a 864 # successful migration, unmap the device from the source node. 865 if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT): 866 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 867 disks = ExpandCheckDisks(inst_disks, inst_disks) 868 self.feedback_fn("* unmapping instance's disks from %s" % 869 self.cfg.GetNodeName(self.source_node_uuid)) 870 for disk in disks: 871 result = self.rpc.call_blockdev_shutdown(self.source_node_uuid, 872 (disk, self.instance)) 873 msg = result.fail_msg 874 if msg: 875 logging.error("Migration was successful, but couldn't unmap the" 876 " block device %s on source node %s: %s", 877 disk.iv_name, 878 self.cfg.GetNodeName(self.source_node_uuid), msg) 879 logging.error("You need to unmap the device %s manually on %s", 880 disk.iv_name, 881 self.cfg.GetNodeName(self.source_node_uuid)) 882 883 self.feedback_fn("* done")
884
885 - def _ExecFailover(self):
886 """Failover an instance. 887 888 The failover is done by shutting it down on its present node and 889 starting it on the secondary. 890 891 """ 892 primary_node = self.cfg.GetNodeInfo(self.instance.primary_node) 893 894 source_node_uuid = self.instance.primary_node 895 896 if self.instance.disks_active: 897 self.feedback_fn("* checking disk consistency between source and target") 898 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 899 for (idx, dev) in enumerate(inst_disks): 900 # for drbd, these are drbd over lvm 901 if not CheckDiskConsistency(self.lu, self.instance, dev, 902 self.target_node_uuid, False): 903 if primary_node.offline: 904 self.feedback_fn("Node %s is offline, ignoring degraded disk %s on" 905 " target node %s" % 906 (primary_node.name, idx, 907 self.cfg.GetNodeName(self.target_node_uuid))) 908 elif not self.ignore_consistency: 909 raise errors.OpExecError("Disk %s is degraded on target node," 910 " aborting failover" % idx) 911 else: 912 self.feedback_fn("* not checking disk consistency as instance is not" 913 " running") 914 915 self.feedback_fn("* shutting down instance on source node") 916 logging.info("Shutting down instance %s on node %s", 917 self.instance.name, self.cfg.GetNodeName(source_node_uuid)) 918 919 result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance, 920 self.shutdown_timeout, 921 self.lu.op.reason) 922 msg = result.fail_msg 923 if msg: 924 if self.ignore_consistency or primary_node.offline: 925 self.lu.LogWarning("Could not shutdown instance %s on node %s," 926 " proceeding anyway; please make sure node" 927 " %s is down; error details: %s", 928 self.instance.name, 929 self.cfg.GetNodeName(source_node_uuid), 930 self.cfg.GetNodeName(source_node_uuid), msg) 931 else: 932 raise errors.OpExecError("Could not shutdown instance %s on" 933 " node %s: %s" % 934 (self.instance.name, 935 self.cfg.GetNodeName(source_node_uuid), msg)) 936 937 self.feedback_fn("* deactivating the instance's disks on source node") 938 if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True): 939 raise errors.OpExecError("Can't shut down the instance's disks") 940 941 self.cfg.SetInstancePrimaryNode(self.instance.uuid, self.target_node_uuid) 942 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 943 944 # Only start the instance if it's marked as up 945 if self.instance.admin_state == constants.ADMINST_UP: 946 self.feedback_fn("* activating the instance's disks on target node %s" % 947 self.cfg.GetNodeName(self.target_node_uuid)) 948 logging.info("Starting instance %s on node %s", self.instance.name, 949 self.cfg.GetNodeName(self.target_node_uuid)) 950 951 disks_ok, _ = AssembleInstanceDisks(self.lu, self.instance, 952 ignore_secondaries=True) 953 if not disks_ok: 954 ShutdownInstanceDisks(self.lu, self.instance) 955 raise errors.OpExecError("Can't activate the instance's disks") 956 957 self.feedback_fn("* starting the instance on the target node %s" % 958 self.cfg.GetNodeName(self.target_node_uuid)) 959 result = self.rpc.call_instance_start(self.target_node_uuid, 960 (self.instance, None, None), False, 961 self.lu.op.reason) 962 msg = result.fail_msg 963 if msg: 964 ShutdownInstanceDisks(self.lu, self.instance) 965 raise errors.OpExecError("Could not start instance %s on node %s: %s" % 966 (self.instance.name, 967 self.cfg.GetNodeName(self.target_node_uuid), 968 msg))
969
970 - def Exec(self, feedback_fn):
971 """Perform the migration. 972 973 """ 974 self.feedback_fn = feedback_fn 975 self.source_node_uuid = self.instance.primary_node 976 977 # FIXME: if we implement migrate-to-any in DRBD, this needs fixing 978 if self.instance.disk_template in constants.DTS_INT_MIRROR: 979 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 980 self.target_node_uuid = secondary_nodes[0] 981 # Otherwise self.target_node has been populated either 982 # directly, or through an iallocator. 983 984 self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid] 985 self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node) 986 in self.cfg.GetMultiNodeInfo(self.all_node_uuids)) 987 988 if self.failover: 989 feedback_fn("Failover instance %s" % self.instance.name) 990 self._ExecFailover() 991 else: 992 feedback_fn("Migrating instance %s" % self.instance.name) 993 994 if self.cleanup: 995 return self._ExecCleanup() 996 else: 997 return self._ExecMigration()
998