Package ganeti :: Package cmdlib :: Module instance_migration
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_migration

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Logical units dealing with instance migration an failover.""" 
 32   
 33  import logging 
 34  import time 
 35   
 36  from ganeti import constants 
 37  from ganeti import errors 
 38  from ganeti import locking 
 39  from ganeti.masterd import iallocator 
 40  from ganeti import utils 
 41  from ganeti.cmdlib.base import LogicalUnit, Tasklet 
 42  from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \ 
 43    CheckIAllocatorOrNode, ExpandNodeUuidAndName 
 44  from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \ 
 45    ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks 
 46  from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \ 
 47    CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \ 
 48    CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist 
 49   
 50  import ganeti.masterd.instance 
 51   
 52   
53 -def _ExpandNamesForMigration(lu):
54 """Expands names for use with L{TLMigrateInstance}. 55 56 @type lu: L{LogicalUnit} 57 58 """ 59 if lu.op.target_node is not None: 60 (lu.op.target_node_uuid, lu.op.target_node) = \ 61 ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node) 62 63 lu.needed_locks[locking.LEVEL_NODE] = [] 64 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 65 66 lu.needed_locks[locking.LEVEL_NODE_RES] = [] 67 lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE 68 69 # The node allocation lock is actually only needed for externally replicated 70 # instances (e.g. sharedfile or RBD) and if an iallocator is used. 71 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
72 73
74 -def _DeclareLocksForMigration(lu, level):
75 """Declares locks for L{TLMigrateInstance}. 76 77 @type lu: L{LogicalUnit} 78 @param level: Lock level 79 80 """ 81 if level == locking.LEVEL_NODE_ALLOC: 82 assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE) 83 84 instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid) 85 86 # Node locks are already declared here rather than at LEVEL_NODE as we need 87 # the instance object anyway to declare the node allocation lock. 88 if instance.disk_template in constants.DTS_EXT_MIRROR: 89 if lu.op.target_node is None: 90 lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 91 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 92 else: 93 lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, 94 lu.op.target_node_uuid] 95 del lu.recalculate_locks[locking.LEVEL_NODE] 96 else: 97 lu._LockInstancesNodes() # pylint: disable=W0212 98 99 elif level == locking.LEVEL_NODE: 100 # Node locks are declared together with the node allocation lock 101 assert (lu.needed_locks[locking.LEVEL_NODE] or 102 lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET) 103 104 elif level == locking.LEVEL_NODE_RES: 105 # Copy node locks 106 lu.needed_locks[locking.LEVEL_NODE_RES] = \ 107 CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
108 109
110 -class LUInstanceFailover(LogicalUnit):
111 """Failover an instance. 112 113 """ 114 HPATH = "instance-failover" 115 HTYPE = constants.HTYPE_INSTANCE 116 REQ_BGL = False 117
118 - def CheckArguments(self):
119 """Check the arguments. 120 121 """ 122 self.iallocator = getattr(self.op, "iallocator", None) 123 self.target_node = getattr(self.op, "target_node", None)
124
125 - def ExpandNames(self):
126 self._ExpandAndLockInstance() 127 _ExpandNamesForMigration(self) 128 129 self._migrater = \ 130 TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name, 131 self.op.cleanup, True, False, 132 self.op.ignore_consistency, True, 133 self.op.shutdown_timeout, self.op.ignore_ipolicy) 134 135 self.tasklets = [self._migrater]
136
137 - def DeclareLocks(self, level):
138 _DeclareLocksForMigration(self, level)
139
140 - def BuildHooksEnv(self):
141 """Build hooks env. 142 143 This runs on master, primary and secondary nodes of the instance. 144 145 """ 146 instance = self._migrater.instance 147 source_node_uuid = instance.primary_node 148 target_node_uuid = self._migrater.target_node_uuid 149 env = { 150 "IGNORE_CONSISTENCY": self.op.ignore_consistency, 151 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, 152 "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid), 153 "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid), 154 "FAILOVER_CLEANUP": self.op.cleanup, 155 } 156 157 if instance.disk_template in constants.DTS_INT_MIRROR: 158 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 159 env["OLD_SECONDARY"] = self.cfg.GetNodeName(secondary_nodes[0]) 160 env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid) 161 else: 162 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = "" 163 164 env.update(BuildInstanceHookEnvByObject(self, instance)) 165 166 return env
167
168 - def BuildHooksNodes(self):
169 """Build hooks nodes. 170 171 """ 172 instance = self._migrater.instance 173 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 174 nl = [self.cfg.GetMasterNode()] + list(secondary_nodes) 175 nl.append(self._migrater.target_node_uuid) 176 return (nl, nl + [instance.primary_node])
177 178
179 -class LUInstanceMigrate(LogicalUnit):
180 """Migrate an instance. 181 182 This is migration without shutting down, compared to the failover, 183 which is done with shutdown. 184 185 """ 186 HPATH = "instance-migrate" 187 HTYPE = constants.HTYPE_INSTANCE 188 REQ_BGL = False 189
190 - def ExpandNames(self):
191 self._ExpandAndLockInstance() 192 _ExpandNamesForMigration(self) 193 194 self._migrater = \ 195 TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name, 196 self.op.cleanup, False, self.op.allow_failover, False, 197 self.op.allow_runtime_changes, 198 constants.DEFAULT_SHUTDOWN_TIMEOUT, 199 self.op.ignore_ipolicy) 200 201 self.tasklets = [self._migrater]
202
203 - def DeclareLocks(self, level):
204 _DeclareLocksForMigration(self, level)
205
206 - def BuildHooksEnv(self):
207 """Build hooks env. 208 209 This runs on master, primary and secondary nodes of the instance. 210 211 """ 212 instance = self._migrater.instance 213 source_node_uuid = instance.primary_node 214 target_node_uuid = self._migrater.target_node_uuid 215 env = BuildInstanceHookEnvByObject(self, instance) 216 env.update({ 217 "MIGRATE_LIVE": self._migrater.live, 218 "MIGRATE_CLEANUP": self.op.cleanup, 219 "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid), 220 "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid), 221 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 222 }) 223 224 if instance.disk_template in constants.DTS_INT_MIRROR: 225 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 226 env["OLD_SECONDARY"] = self.cfg.GetNodeName(secondary_nodes[0]) 227 env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid) 228 else: 229 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = "" 230 231 return env
232
233 - def BuildHooksNodes(self):
234 """Build hooks nodes. 235 236 """ 237 instance = self._migrater.instance 238 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 239 snode_uuids = list(secondary_nodes) 240 nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids 241 nl.append(self._migrater.target_node_uuid) 242 return (nl, nl)
243 244
245 -class TLMigrateInstance(Tasklet):
246 """Tasklet class for instance migration. 247 248 @type live: boolean 249 @ivar live: whether the migration will be done live or non-live; 250 this variable is initalized only after CheckPrereq has run 251 @type cleanup: boolean 252 @ivar cleanup: Wheater we cleanup from a failed migration 253 @type iallocator: string 254 @ivar iallocator: The iallocator used to determine target_node 255 @type target_node_uuid: string 256 @ivar target_node_uuid: If given, the target node UUID to reallocate the 257 instance to 258 @type failover: boolean 259 @ivar failover: Whether operation results in failover or migration 260 @type fallback: boolean 261 @ivar fallback: Whether fallback to failover is allowed if migration not 262 possible 263 @type ignore_consistency: boolean 264 @ivar ignore_consistency: Wheter we should ignore consistency between source 265 and target node 266 @type shutdown_timeout: int 267 @ivar shutdown_timeout: In case of failover timeout of the shutdown 268 @type ignore_ipolicy: bool 269 @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating 270 271 """ 272 273 # Constants 274 _MIGRATION_POLL_INTERVAL = 1 # seconds 275 _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds 276
277 - def __init__(self, lu, instance_uuid, instance_name, cleanup, failover, 278 fallback, ignore_consistency, allow_runtime_changes, 279 shutdown_timeout, ignore_ipolicy):
280 """Initializes this class. 281 282 """ 283 Tasklet.__init__(self, lu) 284 285 # Parameters 286 self.instance_uuid = instance_uuid 287 self.instance_name = instance_name 288 self.cleanup = cleanup 289 self.live = False # will be overridden later 290 self.failover = failover 291 self.fallback = fallback 292 self.ignore_consistency = ignore_consistency 293 self.shutdown_timeout = shutdown_timeout 294 self.ignore_ipolicy = ignore_ipolicy 295 self.allow_runtime_changes = allow_runtime_changes
296
297 - def CheckPrereq(self):
298 """Check prerequisites. 299 300 This checks that the instance is in the cluster. 301 302 """ 303 (self.instance_uuid, self.instance_name) = \ 304 ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid, 305 self.instance_name) 306 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 307 assert self.instance is not None 308 cluster = self.cfg.GetClusterInfo() 309 310 if (not self.cleanup and 311 not self.instance.admin_state == constants.ADMINST_UP and 312 not self.failover and self.fallback): 313 self.lu.LogInfo("Instance is marked down or offline, fallback allowed," 314 " switching to failover") 315 self.failover = True 316 317 if self.instance.disk_template not in constants.DTS_MIRRORED: 318 if self.failover: 319 text = "failovers" 320 else: 321 text = "migrations" 322 raise errors.OpPrereqError("Instance's disk layout '%s' does not allow" 323 " %s" % (self.instance.disk_template, text), 324 errors.ECODE_STATE) 325 326 if self.instance.disk_template in constants.DTS_EXT_MIRROR: 327 CheckIAllocatorOrNode(self.lu, "iallocator", "target_node") 328 329 if self.lu.op.iallocator: 330 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC) 331 self._RunAllocator() 332 else: 333 # We set set self.target_node_uuid as it is required by 334 # BuildHooksEnv 335 self.target_node_uuid = self.lu.op.target_node_uuid 336 337 # Check that the target node is correct in terms of instance policy 338 nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid) 339 group_info = self.cfg.GetNodeGroup(nodeinfo.group) 340 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 341 group_info) 342 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo, 343 self.cfg, ignore=self.ignore_ipolicy) 344 345 # self.target_node is already populated, either directly or by the 346 # iallocator run 347 target_node_uuid = self.target_node_uuid 348 if self.target_node_uuid == self.instance.primary_node: 349 raise errors.OpPrereqError( 350 "Cannot migrate instance %s to its primary (%s)" % 351 (self.instance.name, 352 self.cfg.GetNodeName(self.instance.primary_node)), 353 errors.ECODE_STATE) 354 355 if len(self.lu.tasklets) == 1: 356 # It is safe to release locks only when we're the only tasklet 357 # in the LU 358 ReleaseLocks(self.lu, locking.LEVEL_NODE, 359 keep=[self.instance.primary_node, self.target_node_uuid]) 360 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) 361 362 else: 363 secondary_node_uuids = \ 364 self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 365 if not secondary_node_uuids: 366 raise errors.ConfigurationError("No secondary node but using" 367 " %s disk template" % 368 self.instance.disk_template) 369 self.target_node_uuid = target_node_uuid = secondary_node_uuids[0] 370 if self.lu.op.iallocator or \ 371 (self.lu.op.target_node_uuid and 372 self.lu.op.target_node_uuid != target_node_uuid): 373 if self.failover: 374 text = "failed over" 375 else: 376 text = "migrated" 377 raise errors.OpPrereqError("Instances with disk template %s cannot" 378 " be %s to arbitrary nodes" 379 " (neither an iallocator nor a target" 380 " node can be passed)" % 381 (self.instance.disk_template, text), 382 errors.ECODE_INVAL) 383 nodeinfo = self.cfg.GetNodeInfo(target_node_uuid) 384 group_info = self.cfg.GetNodeGroup(nodeinfo.group) 385 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 386 group_info) 387 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo, 388 self.cfg, ignore=self.ignore_ipolicy) 389 390 i_be = cluster.FillBE(self.instance) 391 392 # check memory requirements on the secondary node 393 if (not self.cleanup and 394 (not self.failover or 395 self.instance.admin_state == constants.ADMINST_UP)): 396 self.tgt_free_mem = CheckNodeFreeMemory( 397 self.lu, target_node_uuid, 398 "migrating instance %s" % self.instance.name, 399 i_be[constants.BE_MINMEM], self.instance.hypervisor, 400 self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor]) 401 else: 402 self.lu.LogInfo("Not checking memory on the secondary node as" 403 " instance will not be started") 404 405 # check if failover must be forced instead of migration 406 if (not self.cleanup and not self.failover and 407 i_be[constants.BE_ALWAYS_FAILOVER]): 408 self.lu.LogInfo("Instance configured to always failover; fallback" 409 " to failover") 410 self.failover = True 411 412 # check bridge existance 413 CheckInstanceBridgesExist(self.lu, self.instance, 414 node_uuid=target_node_uuid) 415 416 if not self.cleanup: 417 CheckNodeNotDrained(self.lu, target_node_uuid) 418 if not self.failover: 419 result = self.rpc.call_instance_migratable(self.instance.primary_node, 420 self.instance) 421 if result.fail_msg and self.fallback: 422 self.lu.LogInfo("Can't migrate, instance offline, fallback to" 423 " failover") 424 self.failover = True 425 else: 426 result.Raise("Can't migrate, please use failover", 427 prereq=True, ecode=errors.ECODE_STATE) 428 429 assert not (self.failover and self.cleanup) 430 431 if not self.failover: 432 if self.lu.op.live is not None and self.lu.op.mode is not None: 433 raise errors.OpPrereqError("Only one of the 'live' and 'mode'" 434 " parameters are accepted", 435 errors.ECODE_INVAL) 436 if self.lu.op.live is not None: 437 if self.lu.op.live: 438 self.lu.op.mode = constants.HT_MIGRATION_LIVE 439 else: 440 self.lu.op.mode = constants.HT_MIGRATION_NONLIVE 441 # reset the 'live' parameter to None so that repeated 442 # invocations of CheckPrereq do not raise an exception 443 self.lu.op.live = None 444 elif self.lu.op.mode is None: 445 # read the default value from the hypervisor 446 i_hv = cluster.FillHV(self.instance, skip_globals=False) 447 self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] 448 449 self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE 450 else: 451 # Failover is never live 452 self.live = False 453 454 if not (self.failover or self.cleanup): 455 remote_info = self.rpc.call_instance_info( 456 self.instance.primary_node, self.instance.name, 457 self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor]) 458 remote_info.Raise("Error checking instance on node %s" % 459 self.cfg.GetNodeName(self.instance.primary_node), 460 prereq=True) 461 instance_running = bool(remote_info.payload) 462 if instance_running: 463 self.current_mem = int(remote_info.payload["memory"])
464
465 - def _RunAllocator(self):
466 """Run the allocator based on input opcode. 467 468 """ 469 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC) 470 471 # FIXME: add a self.ignore_ipolicy option 472 req = iallocator.IAReqRelocate( 473 inst_uuid=self.instance_uuid, 474 relocate_from_node_uuids=[self.instance.primary_node]) 475 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 476 477 ial.Run(self.lu.op.iallocator) 478 479 if not ial.success: 480 raise errors.OpPrereqError("Can't compute nodes using" 481 " iallocator '%s': %s" % 482 (self.lu.op.iallocator, ial.info), 483 errors.ECODE_NORES) 484 self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid 485 self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 486 self.instance_name, self.lu.op.iallocator, 487 utils.CommaJoin(ial.result))
488
489 - def _WaitUntilSync(self):
490 """Poll with custom rpc for disk sync. 491 492 This uses our own step-based rpc call. 493 494 """ 495 self.feedback_fn("* wait until resync is done") 496 all_done = False 497 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 498 while not all_done: 499 all_done = True 500 result = self.rpc.call_drbd_wait_sync(self.all_node_uuids, 501 (disks, self.instance)) 502 min_percent = 100 503 for node_uuid, nres in result.items(): 504 nres.Raise("Cannot resync disks on node %s" % 505 self.cfg.GetNodeName(node_uuid)) 506 node_done, node_percent = nres.payload 507 all_done = all_done and node_done 508 if node_percent is not None: 509 min_percent = min(min_percent, node_percent) 510 if not all_done: 511 if min_percent < 100: 512 self.feedback_fn(" - progress: %.1f%%" % min_percent) 513 time.sleep(2)
514
515 - def _EnsureSecondary(self, node_uuid):
516 """Demote a node to secondary. 517 518 """ 519 self.feedback_fn("* switching node %s to secondary mode" % 520 self.cfg.GetNodeName(node_uuid)) 521 522 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 523 result = self.rpc.call_blockdev_close(node_uuid, self.instance.name, 524 (disks, self.instance)) 525 result.Raise("Cannot change disk to secondary on node %s" % 526 self.cfg.GetNodeName(node_uuid))
527
528 - def _GoStandalone(self):
529 """Disconnect from the network. 530 531 """ 532 self.feedback_fn("* changing into standalone mode") 533 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 534 result = self.rpc.call_drbd_disconnect_net( 535 self.all_node_uuids, (disks, self.instance)) 536 for node_uuid, nres in result.items(): 537 nres.Raise("Cannot disconnect disks node %s" % 538 self.cfg.GetNodeName(node_uuid))
539
540 - def _GoReconnect(self, multimaster):
541 """Reconnect to the network. 542 543 """ 544 if multimaster: 545 msg = "dual-master" 546 else: 547 msg = "single-master" 548 self.feedback_fn("* changing disks into %s mode" % msg) 549 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 550 result = self.rpc.call_drbd_attach_net(self.all_node_uuids, 551 (disks, self.instance), 552 self.instance.name, multimaster) 553 for node_uuid, nres in result.items(): 554 nres.Raise("Cannot change disks config on node %s" % 555 self.cfg.GetNodeName(node_uuid))
556
557 - def _ExecCleanup(self):
558 """Try to cleanup after a failed migration. 559 560 The cleanup is done by: 561 - check that the instance is running only on one node 562 (and update the config if needed) 563 - change disks on its secondary node to secondary 564 - wait until disks are fully synchronized 565 - disconnect from the network 566 - change disks into single-master mode 567 - wait again until disks are fully synchronized 568 569 """ 570 # check running on only one node 571 self.feedback_fn("* checking where the instance actually runs" 572 " (if this hangs, the hypervisor might be in" 573 " a bad state)") 574 cluster_hvparams = self.cfg.GetClusterInfo().hvparams 575 ins_l = self.rpc.call_instance_list(self.all_node_uuids, 576 [self.instance.hypervisor], 577 cluster_hvparams) 578 for node_uuid, result in ins_l.items(): 579 result.Raise("Can't contact node %s" % node_uuid) 580 581 runningon_source = self.instance.name in \ 582 ins_l[self.source_node_uuid].payload 583 runningon_target = self.instance.name in \ 584 ins_l[self.target_node_uuid].payload 585 586 if runningon_source and runningon_target: 587 raise errors.OpExecError("Instance seems to be running on two nodes," 588 " or the hypervisor is confused; you will have" 589 " to ensure manually that it runs only on one" 590 " and restart this operation") 591 592 if not (runningon_source or runningon_target): 593 raise errors.OpExecError("Instance does not seem to be running at all;" 594 " in this case it's safer to repair by" 595 " running 'gnt-instance stop' to ensure disk" 596 " shutdown, and then restarting it") 597 598 if runningon_target: 599 # the migration has actually succeeded, we need to update the config 600 self.feedback_fn("* instance running on secondary node (%s)," 601 " updating config" % 602 self.cfg.GetNodeName(self.target_node_uuid)) 603 self.cfg.SetInstancePrimaryNode(self.instance.uuid, 604 self.target_node_uuid) 605 demoted_node_uuid = self.source_node_uuid 606 else: 607 self.feedback_fn("* instance confirmed to be running on its" 608 " primary node (%s)" % 609 self.cfg.GetNodeName(self.source_node_uuid)) 610 demoted_node_uuid = self.target_node_uuid 611 612 if self.instance.disk_template in constants.DTS_INT_MIRROR: 613 self._EnsureSecondary(demoted_node_uuid) 614 try: 615 self._WaitUntilSync() 616 except errors.OpExecError: 617 # we ignore here errors, since if the device is standalone, it 618 # won't be able to sync 619 pass 620 self._GoStandalone() 621 self._GoReconnect(False) 622 self._WaitUntilSync() 623 624 self.feedback_fn("* done")
625
626 - def _RevertDiskStatus(self):
627 """Try to revert the disk status after a failed migration. 628 629 """ 630 if self.instance.disk_template in constants.DTS_EXT_MIRROR: 631 return 632 633 try: 634 self._EnsureSecondary(self.target_node_uuid) 635 self._GoStandalone() 636 self._GoReconnect(False) 637 self._WaitUntilSync() 638 except errors.OpExecError, err: 639 self.lu.LogWarning("Migration failed and I can't reconnect the drives," 640 " please try to recover the instance manually;" 641 " error '%s'" % str(err))
642
643 - def _AbortMigration(self):
644 """Call the hypervisor code to abort a started migration. 645 646 """ 647 abort_result = self.rpc.call_instance_finalize_migration_dst( 648 self.target_node_uuid, self.instance, self.migration_info, 649 False) 650 abort_msg = abort_result.fail_msg 651 if abort_msg: 652 logging.error("Aborting migration failed on target node %s: %s", 653 self.cfg.GetNodeName(self.target_node_uuid), abort_msg) 654 # Don't raise an exception here, as we stil have to try to revert the 655 # disk status, even if this step failed. 656 657 abort_result = self.rpc.call_instance_finalize_migration_src( 658 self.source_node_uuid, self.instance, False, self.live) 659 abort_msg = abort_result.fail_msg 660 if abort_msg: 661 logging.error("Aborting migration failed on source node %s: %s", 662 self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
663
664 - def _ExecMigration(self):
665 """Migrate an instance. 666 667 The migrate is done by: 668 - change the disks into dual-master mode 669 - wait until disks are fully synchronized again 670 - migrate the instance 671 - change disks on the new secondary node (the old primary) to secondary 672 - wait until disks are fully synchronized 673 - change disks into single-master mode 674 675 """ 676 # Check for hypervisor version mismatch and warn the user. 677 hvspecs = [(self.instance.hypervisor, 678 self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])] 679 nodeinfo = self.rpc.call_node_info( 680 [self.source_node_uuid, self.target_node_uuid], None, hvspecs) 681 for ninfo in nodeinfo.values(): 682 ninfo.Raise("Unable to retrieve node information from node '%s'" % 683 ninfo.node) 684 (_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload 685 (_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload 686 687 if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and 688 (constants.HV_NODEINFO_KEY_VERSION in dst_info)): 689 src_version = src_info[constants.HV_NODEINFO_KEY_VERSION] 690 dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION] 691 if src_version != dst_version: 692 self.feedback_fn("* warning: hypervisor version mismatch between" 693 " source (%s) and target (%s) node" % 694 (src_version, dst_version)) 695 696 self.feedback_fn("* checking disk consistency between source and target") 697 for (idx, dev) in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)): 698 if not CheckDiskConsistency(self.lu, self.instance, dev, 699 self.target_node_uuid, 700 False): 701 raise errors.OpExecError("Disk %s is degraded or not fully" 702 " synchronized on target node," 703 " aborting migration" % idx) 704 705 if self.current_mem > self.tgt_free_mem: 706 if not self.allow_runtime_changes: 707 raise errors.OpExecError("Memory ballooning not allowed and not enough" 708 " free memory to fit instance %s on target" 709 " node %s (have %dMB, need %dMB)" % 710 (self.instance.name, 711 self.cfg.GetNodeName(self.target_node_uuid), 712 self.tgt_free_mem, self.current_mem)) 713 self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem) 714 rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node, 715 self.instance, 716 self.tgt_free_mem) 717 rpcres.Raise("Cannot modify instance runtime memory") 718 719 # First get the migration information from the remote node 720 result = self.rpc.call_migration_info(self.source_node_uuid, self.instance) 721 msg = result.fail_msg 722 if msg: 723 log_err = ("Failed fetching source migration information from %s: %s" % 724 (self.cfg.GetNodeName(self.source_node_uuid), msg)) 725 logging.error(log_err) 726 raise errors.OpExecError(log_err) 727 728 self.migration_info = migration_info = result.payload 729 730 if self.instance.disk_template not in constants.DTS_EXT_MIRROR: 731 # Then switch the disks to master/master mode 732 self._EnsureSecondary(self.target_node_uuid) 733 self._GoStandalone() 734 self._GoReconnect(True) 735 self._WaitUntilSync() 736 737 self.feedback_fn("* preparing %s to accept the instance" % 738 self.cfg.GetNodeName(self.target_node_uuid)) 739 result = self.rpc.call_accept_instance(self.target_node_uuid, 740 self.instance, 741 migration_info, 742 self.nodes_ip[self.target_node_uuid]) 743 744 msg = result.fail_msg 745 if msg: 746 logging.error("Instance pre-migration failed, trying to revert" 747 " disk status: %s", msg) 748 self.feedback_fn("Pre-migration failed, aborting") 749 self._AbortMigration() 750 self._RevertDiskStatus() 751 raise errors.OpExecError("Could not pre-migrate instance %s: %s" % 752 (self.instance.name, msg)) 753 754 self.feedback_fn("* migrating instance to %s" % 755 self.cfg.GetNodeName(self.target_node_uuid)) 756 cluster = self.cfg.GetClusterInfo() 757 result = self.rpc.call_instance_migrate( 758 self.source_node_uuid, cluster.cluster_name, self.instance, 759 self.nodes_ip[self.target_node_uuid], self.live) 760 msg = result.fail_msg 761 if msg: 762 logging.error("Instance migration failed, trying to revert" 763 " disk status: %s", msg) 764 self.feedback_fn("Migration failed, aborting") 765 self._AbortMigration() 766 self._RevertDiskStatus() 767 raise errors.OpExecError("Could not migrate instance %s: %s" % 768 (self.instance.name, msg)) 769 770 self.feedback_fn("* starting memory transfer") 771 last_feedback = time.time() 772 while True: 773 result = self.rpc.call_instance_get_migration_status( 774 self.source_node_uuid, self.instance) 775 msg = result.fail_msg 776 ms = result.payload # MigrationStatus instance 777 if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES): 778 logging.error("Instance migration failed, trying to revert" 779 " disk status: %s", msg) 780 self.feedback_fn("Migration failed, aborting") 781 self._AbortMigration() 782 self._RevertDiskStatus() 783 if not msg: 784 msg = "hypervisor returned failure" 785 raise errors.OpExecError("Could not migrate instance %s: %s" % 786 (self.instance.name, msg)) 787 788 if result.payload.status != constants.HV_MIGRATION_ACTIVE: 789 self.feedback_fn("* memory transfer complete") 790 break 791 792 if (utils.TimeoutExpired(last_feedback, 793 self._MIGRATION_FEEDBACK_INTERVAL) and 794 ms.transferred_ram is not None): 795 mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram) 796 self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress) 797 last_feedback = time.time() 798 799 time.sleep(self._MIGRATION_POLL_INTERVAL) 800 801 result = self.rpc.call_instance_finalize_migration_src( 802 self.source_node_uuid, self.instance, True, self.live) 803 msg = result.fail_msg 804 if msg: 805 logging.error("Instance migration succeeded, but finalization failed" 806 " on the source node: %s", msg) 807 raise errors.OpExecError("Could not finalize instance migration: %s" % 808 msg) 809 810 self.cfg.SetInstancePrimaryNode(self.instance.uuid, self.target_node_uuid) 811 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 812 813 result = self.rpc.call_instance_finalize_migration_dst( 814 self.target_node_uuid, self.instance, migration_info, True) 815 msg = result.fail_msg 816 if msg: 817 logging.error("Instance migration succeeded, but finalization failed" 818 " on the target node: %s", msg) 819 raise errors.OpExecError("Could not finalize instance migration: %s" % 820 msg) 821 822 if self.instance.disk_template not in constants.DTS_EXT_MIRROR: 823 self._EnsureSecondary(self.source_node_uuid) 824 self._WaitUntilSync() 825 self._GoStandalone() 826 self._GoReconnect(False) 827 self._WaitUntilSync() 828 829 # If the instance's disk template is `rbd' or `ext' and there was a 830 # successful migration, unmap the device from the source node. 831 if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT): 832 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 833 disks = ExpandCheckDisks(inst_disks, inst_disks) 834 self.feedback_fn("* unmapping instance's disks from %s" % 835 self.cfg.GetNodeName(self.source_node_uuid)) 836 for disk in disks: 837 result = self.rpc.call_blockdev_shutdown(self.source_node_uuid, 838 (disk, self.instance)) 839 msg = result.fail_msg 840 if msg: 841 logging.error("Migration was successful, but couldn't unmap the" 842 " block device %s on source node %s: %s", 843 disk.iv_name, 844 self.cfg.GetNodeName(self.source_node_uuid), msg) 845 logging.error("You need to unmap the device %s manually on %s", 846 disk.iv_name, 847 self.cfg.GetNodeName(self.source_node_uuid)) 848 849 self.feedback_fn("* done")
850
851 - def _ExecFailover(self):
852 """Failover an instance. 853 854 The failover is done by shutting it down on its present node and 855 starting it on the secondary. 856 857 """ 858 primary_node = self.cfg.GetNodeInfo(self.instance.primary_node) 859 860 source_node_uuid = self.instance.primary_node 861 862 if self.instance.disks_active: 863 self.feedback_fn("* checking disk consistency between source and target") 864 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 865 for (idx, dev) in enumerate(inst_disks): 866 # for drbd, these are drbd over lvm 867 if not CheckDiskConsistency(self.lu, self.instance, dev, 868 self.target_node_uuid, False): 869 if primary_node.offline: 870 self.feedback_fn("Node %s is offline, ignoring degraded disk %s on" 871 " target node %s" % 872 (primary_node.name, idx, 873 self.cfg.GetNodeName(self.target_node_uuid))) 874 elif not self.ignore_consistency: 875 raise errors.OpExecError("Disk %s is degraded on target node," 876 " aborting failover" % idx) 877 else: 878 self.feedback_fn("* not checking disk consistency as instance is not" 879 " running") 880 881 self.feedback_fn("* shutting down instance on source node") 882 logging.info("Shutting down instance %s on node %s", 883 self.instance.name, self.cfg.GetNodeName(source_node_uuid)) 884 885 result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance, 886 self.shutdown_timeout, 887 self.lu.op.reason) 888 msg = result.fail_msg 889 if msg: 890 if self.ignore_consistency or primary_node.offline: 891 self.lu.LogWarning("Could not shutdown instance %s on node %s," 892 " proceeding anyway; please make sure node" 893 " %s is down; error details: %s", 894 self.instance.name, 895 self.cfg.GetNodeName(source_node_uuid), 896 self.cfg.GetNodeName(source_node_uuid), msg) 897 else: 898 raise errors.OpExecError("Could not shutdown instance %s on" 899 " node %s: %s" % 900 (self.instance.name, 901 self.cfg.GetNodeName(source_node_uuid), msg)) 902 903 self.feedback_fn("* deactivating the instance's disks on source node") 904 if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True): 905 raise errors.OpExecError("Can't shut down the instance's disks") 906 907 self.cfg.SetInstancePrimaryNode(self.instance.uuid, self.target_node_uuid) 908 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 909 910 # Only start the instance if it's marked as up 911 if self.instance.admin_state == constants.ADMINST_UP: 912 self.feedback_fn("* activating the instance's disks on target node %s" % 913 self.cfg.GetNodeName(self.target_node_uuid)) 914 logging.info("Starting instance %s on node %s", self.instance.name, 915 self.cfg.GetNodeName(self.target_node_uuid)) 916 917 disks_ok, _ = AssembleInstanceDisks(self.lu, self.instance, 918 ignore_secondaries=True) 919 if not disks_ok: 920 ShutdownInstanceDisks(self.lu, self.instance) 921 raise errors.OpExecError("Can't activate the instance's disks") 922 923 self.feedback_fn("* starting the instance on the target node %s" % 924 self.cfg.GetNodeName(self.target_node_uuid)) 925 result = self.rpc.call_instance_start(self.target_node_uuid, 926 (self.instance, None, None), False, 927 self.lu.op.reason) 928 msg = result.fail_msg 929 if msg: 930 ShutdownInstanceDisks(self.lu, self.instance) 931 raise errors.OpExecError("Could not start instance %s on node %s: %s" % 932 (self.instance.name, 933 self.cfg.GetNodeName(self.target_node_uuid), 934 msg))
935
936 - def Exec(self, feedback_fn):
937 """Perform the migration. 938 939 """ 940 self.feedback_fn = feedback_fn 941 self.source_node_uuid = self.instance.primary_node 942 943 # FIXME: if we implement migrate-to-any in DRBD, this needs fixing 944 if self.instance.disk_template in constants.DTS_INT_MIRROR: 945 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 946 self.target_node_uuid = secondary_nodes[0] 947 # Otherwise self.target_node has been populated either 948 # directly, or through an iallocator. 949 950 self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid] 951 self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node) 952 in self.cfg.GetMultiNodeInfo(self.all_node_uuids)) 953 954 if self.failover: 955 feedback_fn("Failover instance %s" % self.instance.name) 956 self._ExecFailover() 957 else: 958 feedback_fn("Migrating instance %s" % self.instance.name) 959 960 if self.cleanup: 961 return self._ExecCleanup() 962 else: 963 return self._ExecMigration()
964