Package ganeti :: Package cmdlib :: Module instance_migration
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_migration

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Logical units dealing with instance migration an failover.""" 
 23   
 24  import logging 
 25  import time 
 26   
 27  from ganeti import constants 
 28  from ganeti import errors 
 29  from ganeti import locking 
 30  from ganeti.masterd import iallocator 
 31  from ganeti import utils 
 32  from ganeti.cmdlib.base import LogicalUnit, Tasklet 
 33  from ganeti.cmdlib.common import ExpandInstanceName, \ 
 34    CheckIAllocatorOrNode, ExpandNodeName 
 35  from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \ 
 36    ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks 
 37  from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \ 
 38    CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \ 
 39    CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist 
 40   
 41  import ganeti.masterd.instance 
 42   
 43   
44 -def _ExpandNamesForMigration(lu):
45 """Expands names for use with L{TLMigrateInstance}. 46 47 @type lu: L{LogicalUnit} 48 49 """ 50 if lu.op.target_node is not None: 51 lu.op.target_node = ExpandNodeName(lu.cfg, lu.op.target_node) 52 53 lu.needed_locks[locking.LEVEL_NODE] = [] 54 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 55 56 lu.needed_locks[locking.LEVEL_NODE_RES] = [] 57 lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE 58 59 # The node allocation lock is actually only needed for externally replicated 60 # instances (e.g. sharedfile or RBD) and if an iallocator is used. 61 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
62 63
64 -def _DeclareLocksForMigration(lu, level):
65 """Declares locks for L{TLMigrateInstance}. 66 67 @type lu: L{LogicalUnit} 68 @param level: Lock level 69 70 """ 71 if level == locking.LEVEL_NODE_ALLOC: 72 assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE) 73 74 instance = lu.cfg.GetInstanceInfo(lu.op.instance_name) 75 76 # Node locks are already declared here rather than at LEVEL_NODE as we need 77 # the instance object anyway to declare the node allocation lock. 78 if instance.disk_template in constants.DTS_EXT_MIRROR: 79 if lu.op.target_node is None: 80 lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 81 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 82 else: 83 lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, 84 lu.op.target_node] 85 del lu.recalculate_locks[locking.LEVEL_NODE] 86 else: 87 lu._LockInstancesNodes() # pylint: disable=W0212 88 89 elif level == locking.LEVEL_NODE: 90 # Node locks are declared together with the node allocation lock 91 assert (lu.needed_locks[locking.LEVEL_NODE] or 92 lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET) 93 94 elif level == locking.LEVEL_NODE_RES: 95 # Copy node locks 96 lu.needed_locks[locking.LEVEL_NODE_RES] = \ 97 CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
98 99
100 -class LUInstanceFailover(LogicalUnit):
101 """Failover an instance. 102 103 """ 104 HPATH = "instance-failover" 105 HTYPE = constants.HTYPE_INSTANCE 106 REQ_BGL = False 107
108 - def CheckArguments(self):
109 """Check the arguments. 110 111 """ 112 self.iallocator = getattr(self.op, "iallocator", None) 113 self.target_node = getattr(self.op, "target_node", None)
114
115 - def ExpandNames(self):
116 self._ExpandAndLockInstance() 117 _ExpandNamesForMigration(self) 118 119 self._migrater = \ 120 TLMigrateInstance(self, self.op.instance_name, self.op.cleanup, True, 121 False, self.op.ignore_consistency, True, 122 self.op.shutdown_timeout, self.op.ignore_ipolicy) 123 124 self.tasklets = [self._migrater]
125
126 - def DeclareLocks(self, level):
127 _DeclareLocksForMigration(self, level)
128
129 - def BuildHooksEnv(self):
130 """Build hooks env. 131 132 This runs on master, primary and secondary nodes of the instance. 133 134 """ 135 instance = self._migrater.instance 136 source_node = instance.primary_node 137 target_node = self.op.target_node 138 env = { 139 "IGNORE_CONSISTENCY": self.op.ignore_consistency, 140 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, 141 "OLD_PRIMARY": source_node, 142 "NEW_PRIMARY": target_node, 143 "FAILOVER_CLEANUP": self.op.cleanup, 144 } 145 146 if instance.disk_template in constants.DTS_INT_MIRROR: 147 env["OLD_SECONDARY"] = instance.secondary_nodes[0] 148 env["NEW_SECONDARY"] = source_node 149 else: 150 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = "" 151 152 env.update(BuildInstanceHookEnvByObject(self, instance)) 153 154 return env
155
156 - def BuildHooksNodes(self):
157 """Build hooks nodes. 158 159 """ 160 instance = self._migrater.instance 161 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes) 162 return (nl, nl + [instance.primary_node])
163 164
165 -class LUInstanceMigrate(LogicalUnit):
166 """Migrate an instance. 167 168 This is migration without shutting down, compared to the failover, 169 which is done with shutdown. 170 171 """ 172 HPATH = "instance-migrate" 173 HTYPE = constants.HTYPE_INSTANCE 174 REQ_BGL = False 175
176 - def ExpandNames(self):
177 self._ExpandAndLockInstance() 178 _ExpandNamesForMigration(self) 179 180 self._migrater = \ 181 TLMigrateInstance(self, self.op.instance_name, self.op.cleanup, 182 False, self.op.allow_failover, False, 183 self.op.allow_runtime_changes, 184 constants.DEFAULT_SHUTDOWN_TIMEOUT, 185 self.op.ignore_ipolicy) 186 187 self.tasklets = [self._migrater]
188
189 - def DeclareLocks(self, level):
190 _DeclareLocksForMigration(self, level)
191
192 - def BuildHooksEnv(self):
193 """Build hooks env. 194 195 This runs on master, primary and secondary nodes of the instance. 196 197 """ 198 instance = self._migrater.instance 199 source_node = instance.primary_node 200 target_node = self.op.target_node 201 env = BuildInstanceHookEnvByObject(self, instance) 202 env.update({ 203 "MIGRATE_LIVE": self._migrater.live, 204 "MIGRATE_CLEANUP": self.op.cleanup, 205 "OLD_PRIMARY": source_node, 206 "NEW_PRIMARY": target_node, 207 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 208 }) 209 210 if instance.disk_template in constants.DTS_INT_MIRROR: 211 env["OLD_SECONDARY"] = target_node 212 env["NEW_SECONDARY"] = source_node 213 else: 214 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None 215 216 return env
217
218 - def BuildHooksNodes(self):
219 """Build hooks nodes. 220 221 """ 222 instance = self._migrater.instance 223 snodes = list(instance.secondary_nodes) 224 nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes 225 return (nl, nl)
226 227
228 -class TLMigrateInstance(Tasklet):
229 """Tasklet class for instance migration. 230 231 @type live: boolean 232 @ivar live: whether the migration will be done live or non-live; 233 this variable is initalized only after CheckPrereq has run 234 @type cleanup: boolean 235 @ivar cleanup: Wheater we cleanup from a failed migration 236 @type iallocator: string 237 @ivar iallocator: The iallocator used to determine target_node 238 @type target_node: string 239 @ivar target_node: If given, the target_node to reallocate the instance to 240 @type failover: boolean 241 @ivar failover: Whether operation results in failover or migration 242 @type fallback: boolean 243 @ivar fallback: Whether fallback to failover is allowed if migration not 244 possible 245 @type ignore_consistency: boolean 246 @ivar ignore_consistency: Wheter we should ignore consistency between source 247 and target node 248 @type shutdown_timeout: int 249 @ivar shutdown_timeout: In case of failover timeout of the shutdown 250 @type ignore_ipolicy: bool 251 @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating 252 253 """ 254 255 # Constants 256 _MIGRATION_POLL_INTERVAL = 1 # seconds 257 _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds 258
259 - def __init__(self, lu, instance_name, cleanup, failover, fallback, 260 ignore_consistency, allow_runtime_changes, shutdown_timeout, 261 ignore_ipolicy):
262 """Initializes this class. 263 264 """ 265 Tasklet.__init__(self, lu) 266 267 # Parameters 268 self.instance_name = instance_name 269 self.cleanup = cleanup 270 self.live = False # will be overridden later 271 self.failover = failover 272 self.fallback = fallback 273 self.ignore_consistency = ignore_consistency 274 self.shutdown_timeout = shutdown_timeout 275 self.ignore_ipolicy = ignore_ipolicy 276 self.allow_runtime_changes = allow_runtime_changes
277
278 - def CheckPrereq(self):
279 """Check prerequisites. 280 281 This checks that the instance is in the cluster. 282 283 """ 284 instance_name = ExpandInstanceName(self.lu.cfg, self.instance_name) 285 instance = self.cfg.GetInstanceInfo(instance_name) 286 assert instance is not None 287 self.instance = instance 288 cluster = self.cfg.GetClusterInfo() 289 290 if (not self.cleanup and 291 not instance.admin_state == constants.ADMINST_UP and 292 not self.failover and self.fallback): 293 self.lu.LogInfo("Instance is marked down or offline, fallback allowed," 294 " switching to failover") 295 self.failover = True 296 297 if instance.disk_template not in constants.DTS_MIRRORED: 298 if self.failover: 299 text = "failovers" 300 else: 301 text = "migrations" 302 raise errors.OpPrereqError("Instance's disk layout '%s' does not allow" 303 " %s" % (instance.disk_template, text), 304 errors.ECODE_STATE) 305 306 if instance.disk_template in constants.DTS_EXT_MIRROR: 307 CheckIAllocatorOrNode(self.lu, "iallocator", "target_node") 308 309 if self.lu.op.iallocator: 310 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC) 311 self._RunAllocator() 312 else: 313 # We set set self.target_node as it is required by 314 # BuildHooksEnv 315 self.target_node = self.lu.op.target_node 316 317 # Check that the target node is correct in terms of instance policy 318 nodeinfo = self.cfg.GetNodeInfo(self.target_node) 319 group_info = self.cfg.GetNodeGroup(nodeinfo.group) 320 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 321 group_info) 322 CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg, 323 ignore=self.ignore_ipolicy) 324 325 # self.target_node is already populated, either directly or by the 326 # iallocator run 327 target_node = self.target_node 328 if self.target_node == instance.primary_node: 329 raise errors.OpPrereqError("Cannot migrate instance %s" 330 " to its primary (%s)" % 331 (instance.name, instance.primary_node), 332 errors.ECODE_STATE) 333 334 if len(self.lu.tasklets) == 1: 335 # It is safe to release locks only when we're the only tasklet 336 # in the LU 337 ReleaseLocks(self.lu, locking.LEVEL_NODE, 338 keep=[instance.primary_node, self.target_node]) 339 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) 340 341 else: 342 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) 343 344 secondary_nodes = instance.secondary_nodes 345 if not secondary_nodes: 346 raise errors.ConfigurationError("No secondary node but using" 347 " %s disk template" % 348 instance.disk_template) 349 target_node = secondary_nodes[0] 350 if self.lu.op.iallocator or (self.lu.op.target_node and 351 self.lu.op.target_node != target_node): 352 if self.failover: 353 text = "failed over" 354 else: 355 text = "migrated" 356 raise errors.OpPrereqError("Instances with disk template %s cannot" 357 " be %s to arbitrary nodes" 358 " (neither an iallocator nor a target" 359 " node can be passed)" % 360 (instance.disk_template, text), 361 errors.ECODE_INVAL) 362 nodeinfo = self.cfg.GetNodeInfo(target_node) 363 group_info = self.cfg.GetNodeGroup(nodeinfo.group) 364 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 365 group_info) 366 CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg, 367 ignore=self.ignore_ipolicy) 368 369 i_be = cluster.FillBE(instance) 370 371 # check memory requirements on the secondary node 372 if (not self.cleanup and 373 (not self.failover or instance.admin_state == constants.ADMINST_UP)): 374 self.tgt_free_mem = CheckNodeFreeMemory(self.lu, target_node, 375 "migrating instance %s" % 376 instance.name, 377 i_be[constants.BE_MINMEM], 378 instance.hypervisor) 379 else: 380 self.lu.LogInfo("Not checking memory on the secondary node as" 381 " instance will not be started") 382 383 # check if failover must be forced instead of migration 384 if (not self.cleanup and not self.failover and 385 i_be[constants.BE_ALWAYS_FAILOVER]): 386 self.lu.LogInfo("Instance configured to always failover; fallback" 387 " to failover") 388 self.failover = True 389 390 # check bridge existance 391 CheckInstanceBridgesExist(self.lu, instance, node=target_node) 392 393 if not self.cleanup: 394 CheckNodeNotDrained(self.lu, target_node) 395 if not self.failover: 396 result = self.rpc.call_instance_migratable(instance.primary_node, 397 instance) 398 if result.fail_msg and self.fallback: 399 self.lu.LogInfo("Can't migrate, instance offline, fallback to" 400 " failover") 401 self.failover = True 402 else: 403 result.Raise("Can't migrate, please use failover", 404 prereq=True, ecode=errors.ECODE_STATE) 405 406 assert not (self.failover and self.cleanup) 407 408 if not self.failover: 409 if self.lu.op.live is not None and self.lu.op.mode is not None: 410 raise errors.OpPrereqError("Only one of the 'live' and 'mode'" 411 " parameters are accepted", 412 errors.ECODE_INVAL) 413 if self.lu.op.live is not None: 414 if self.lu.op.live: 415 self.lu.op.mode = constants.HT_MIGRATION_LIVE 416 else: 417 self.lu.op.mode = constants.HT_MIGRATION_NONLIVE 418 # reset the 'live' parameter to None so that repeated 419 # invocations of CheckPrereq do not raise an exception 420 self.lu.op.live = None 421 elif self.lu.op.mode is None: 422 # read the default value from the hypervisor 423 i_hv = cluster.FillHV(self.instance, skip_globals=False) 424 self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] 425 426 self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE 427 else: 428 # Failover is never live 429 self.live = False 430 431 if not (self.failover or self.cleanup): 432 remote_info = self.rpc.call_instance_info(instance.primary_node, 433 instance.name, 434 instance.hypervisor) 435 remote_info.Raise("Error checking instance on node %s" % 436 instance.primary_node) 437 instance_running = bool(remote_info.payload) 438 if instance_running: 439 self.current_mem = int(remote_info.payload["memory"])
440
441 - def _RunAllocator(self):
442 """Run the allocator based on input opcode. 443 444 """ 445 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC) 446 447 # FIXME: add a self.ignore_ipolicy option 448 req = iallocator.IAReqRelocate(name=self.instance_name, 449 relocate_from=[self.instance.primary_node]) 450 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 451 452 ial.Run(self.lu.op.iallocator) 453 454 if not ial.success: 455 raise errors.OpPrereqError("Can't compute nodes using" 456 " iallocator '%s': %s" % 457 (self.lu.op.iallocator, ial.info), 458 errors.ECODE_NORES) 459 self.target_node = ial.result[0] 460 self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 461 self.instance_name, self.lu.op.iallocator, 462 utils.CommaJoin(ial.result))
463
464 - def _WaitUntilSync(self):
465 """Poll with custom rpc for disk sync. 466 467 This uses our own step-based rpc call. 468 469 """ 470 self.feedback_fn("* wait until resync is done") 471 all_done = False 472 while not all_done: 473 all_done = True 474 result = self.rpc.call_drbd_wait_sync(self.all_nodes, 475 self.nodes_ip, 476 (self.instance.disks, 477 self.instance)) 478 min_percent = 100 479 for node, nres in result.items(): 480 nres.Raise("Cannot resync disks on node %s" % node) 481 node_done, node_percent = nres.payload 482 all_done = all_done and node_done 483 if node_percent is not None: 484 min_percent = min(min_percent, node_percent) 485 if not all_done: 486 if min_percent < 100: 487 self.feedback_fn(" - progress: %.1f%%" % min_percent) 488 time.sleep(2)
489
490 - def _EnsureSecondary(self, node):
491 """Demote a node to secondary. 492 493 """ 494 self.feedback_fn("* switching node %s to secondary mode" % node) 495 496 for dev in self.instance.disks: 497 self.cfg.SetDiskID(dev, node) 498 499 result = self.rpc.call_blockdev_close(node, self.instance.name, 500 self.instance.disks) 501 result.Raise("Cannot change disk to secondary on node %s" % node)
502
503 - def _GoStandalone(self):
504 """Disconnect from the network. 505 506 """ 507 self.feedback_fn("* changing into standalone mode") 508 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip, 509 self.instance.disks) 510 for node, nres in result.items(): 511 nres.Raise("Cannot disconnect disks node %s" % node)
512
513 - def _GoReconnect(self, multimaster):
514 """Reconnect to the network. 515 516 """ 517 if multimaster: 518 msg = "dual-master" 519 else: 520 msg = "single-master" 521 self.feedback_fn("* changing disks into %s mode" % msg) 522 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip, 523 (self.instance.disks, self.instance), 524 self.instance.name, multimaster) 525 for node, nres in result.items(): 526 nres.Raise("Cannot change disks config on node %s" % node)
527
528 - def _ExecCleanup(self):
529 """Try to cleanup after a failed migration. 530 531 The cleanup is done by: 532 - check that the instance is running only on one node 533 (and update the config if needed) 534 - change disks on its secondary node to secondary 535 - wait until disks are fully synchronized 536 - disconnect from the network 537 - change disks into single-master mode 538 - wait again until disks are fully synchronized 539 540 """ 541 instance = self.instance 542 target_node = self.target_node 543 source_node = self.source_node 544 545 # check running on only one node 546 self.feedback_fn("* checking where the instance actually runs" 547 " (if this hangs, the hypervisor might be in" 548 " a bad state)") 549 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor]) 550 for node, result in ins_l.items(): 551 result.Raise("Can't contact node %s" % node) 552 553 runningon_source = instance.name in ins_l[source_node].payload 554 runningon_target = instance.name in ins_l[target_node].payload 555 556 if runningon_source and runningon_target: 557 raise errors.OpExecError("Instance seems to be running on two nodes," 558 " or the hypervisor is confused; you will have" 559 " to ensure manually that it runs only on one" 560 " and restart this operation") 561 562 if not (runningon_source or runningon_target): 563 raise errors.OpExecError("Instance does not seem to be running at all;" 564 " in this case it's safer to repair by" 565 " running 'gnt-instance stop' to ensure disk" 566 " shutdown, and then restarting it") 567 568 if runningon_target: 569 # the migration has actually succeeded, we need to update the config 570 self.feedback_fn("* instance running on secondary node (%s)," 571 " updating config" % target_node) 572 instance.primary_node = target_node 573 self.cfg.Update(instance, self.feedback_fn) 574 demoted_node = source_node 575 else: 576 self.feedback_fn("* instance confirmed to be running on its" 577 " primary node (%s)" % source_node) 578 demoted_node = target_node 579 580 if instance.disk_template in constants.DTS_INT_MIRROR: 581 self._EnsureSecondary(demoted_node) 582 try: 583 self._WaitUntilSync() 584 except errors.OpExecError: 585 # we ignore here errors, since if the device is standalone, it 586 # won't be able to sync 587 pass 588 self._GoStandalone() 589 self._GoReconnect(False) 590 self._WaitUntilSync() 591 592 self.feedback_fn("* done")
593
594 - def _RevertDiskStatus(self):
595 """Try to revert the disk status after a failed migration. 596 597 """ 598 target_node = self.target_node 599 if self.instance.disk_template in constants.DTS_EXT_MIRROR: 600 return 601 602 try: 603 self._EnsureSecondary(target_node) 604 self._GoStandalone() 605 self._GoReconnect(False) 606 self._WaitUntilSync() 607 except errors.OpExecError, err: 608 self.lu.LogWarning("Migration failed and I can't reconnect the drives," 609 " please try to recover the instance manually;" 610 " error '%s'" % str(err))
611
612 - def _AbortMigration(self):
613 """Call the hypervisor code to abort a started migration. 614 615 """ 616 instance = self.instance 617 target_node = self.target_node 618 source_node = self.source_node 619 migration_info = self.migration_info 620 621 abort_result = self.rpc.call_instance_finalize_migration_dst(target_node, 622 instance, 623 migration_info, 624 False) 625 abort_msg = abort_result.fail_msg 626 if abort_msg: 627 logging.error("Aborting migration failed on target node %s: %s", 628 target_node, abort_msg) 629 # Don't raise an exception here, as we stil have to try to revert the 630 # disk status, even if this step failed. 631 632 abort_result = self.rpc.call_instance_finalize_migration_src( 633 source_node, instance, False, self.live) 634 abort_msg = abort_result.fail_msg 635 if abort_msg: 636 logging.error("Aborting migration failed on source node %s: %s", 637 source_node, abort_msg)
638
639 - def _ExecMigration(self):
640 """Migrate an instance. 641 642 The migrate is done by: 643 - change the disks into dual-master mode 644 - wait until disks are fully synchronized again 645 - migrate the instance 646 - change disks on the new secondary node (the old primary) to secondary 647 - wait until disks are fully synchronized 648 - change disks into single-master mode 649 650 """ 651 instance = self.instance 652 target_node = self.target_node 653 source_node = self.source_node 654 655 # Check for hypervisor version mismatch and warn the user. 656 nodeinfo = self.rpc.call_node_info([source_node, target_node], 657 None, [self.instance.hypervisor], False) 658 for ninfo in nodeinfo.values(): 659 ninfo.Raise("Unable to retrieve node information from node '%s'" % 660 ninfo.node) 661 (_, _, (src_info, )) = nodeinfo[source_node].payload 662 (_, _, (dst_info, )) = nodeinfo[target_node].payload 663 664 if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and 665 (constants.HV_NODEINFO_KEY_VERSION in dst_info)): 666 src_version = src_info[constants.HV_NODEINFO_KEY_VERSION] 667 dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION] 668 if src_version != dst_version: 669 self.feedback_fn("* warning: hypervisor version mismatch between" 670 " source (%s) and target (%s) node" % 671 (src_version, dst_version)) 672 673 self.feedback_fn("* checking disk consistency between source and target") 674 for (idx, dev) in enumerate(instance.disks): 675 if not CheckDiskConsistency(self.lu, instance, dev, target_node, False): 676 raise errors.OpExecError("Disk %s is degraded or not fully" 677 " synchronized on target node," 678 " aborting migration" % idx) 679 680 if self.current_mem > self.tgt_free_mem: 681 if not self.allow_runtime_changes: 682 raise errors.OpExecError("Memory ballooning not allowed and not enough" 683 " free memory to fit instance %s on target" 684 " node %s (have %dMB, need %dMB)" % 685 (instance.name, target_node, 686 self.tgt_free_mem, self.current_mem)) 687 self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem) 688 rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node, 689 instance, 690 self.tgt_free_mem) 691 rpcres.Raise("Cannot modify instance runtime memory") 692 693 # First get the migration information from the remote node 694 result = self.rpc.call_migration_info(source_node, instance) 695 msg = result.fail_msg 696 if msg: 697 log_err = ("Failed fetching source migration information from %s: %s" % 698 (source_node, msg)) 699 logging.error(log_err) 700 raise errors.OpExecError(log_err) 701 702 self.migration_info = migration_info = result.payload 703 704 if self.instance.disk_template not in constants.DTS_EXT_MIRROR: 705 # Then switch the disks to master/master mode 706 self._EnsureSecondary(target_node) 707 self._GoStandalone() 708 self._GoReconnect(True) 709 self._WaitUntilSync() 710 711 self.feedback_fn("* preparing %s to accept the instance" % target_node) 712 # This fills physical_id slot that may be missing on newly created disks 713 for disk in instance.disks: 714 self.cfg.SetDiskID(disk, target_node) 715 result = self.rpc.call_accept_instance(target_node, 716 instance, 717 migration_info, 718 self.nodes_ip[target_node]) 719 720 msg = result.fail_msg 721 if msg: 722 logging.error("Instance pre-migration failed, trying to revert" 723 " disk status: %s", msg) 724 self.feedback_fn("Pre-migration failed, aborting") 725 self._AbortMigration() 726 self._RevertDiskStatus() 727 raise errors.OpExecError("Could not pre-migrate instance %s: %s" % 728 (instance.name, msg)) 729 730 self.feedback_fn("* migrating instance to %s" % target_node) 731 result = self.rpc.call_instance_migrate(source_node, instance, 732 self.nodes_ip[target_node], 733 self.live) 734 msg = result.fail_msg 735 if msg: 736 logging.error("Instance migration failed, trying to revert" 737 " disk status: %s", msg) 738 self.feedback_fn("Migration failed, aborting") 739 self._AbortMigration() 740 self._RevertDiskStatus() 741 raise errors.OpExecError("Could not migrate instance %s: %s" % 742 (instance.name, msg)) 743 744 self.feedback_fn("* starting memory transfer") 745 last_feedback = time.time() 746 while True: 747 result = self.rpc.call_instance_get_migration_status(source_node, 748 instance) 749 msg = result.fail_msg 750 ms = result.payload # MigrationStatus instance 751 if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES): 752 logging.error("Instance migration failed, trying to revert" 753 " disk status: %s", msg) 754 self.feedback_fn("Migration failed, aborting") 755 self._AbortMigration() 756 self._RevertDiskStatus() 757 if not msg: 758 msg = "hypervisor returned failure" 759 raise errors.OpExecError("Could not migrate instance %s: %s" % 760 (instance.name, msg)) 761 762 if result.payload.status != constants.HV_MIGRATION_ACTIVE: 763 self.feedback_fn("* memory transfer complete") 764 break 765 766 if (utils.TimeoutExpired(last_feedback, 767 self._MIGRATION_FEEDBACK_INTERVAL) and 768 ms.transferred_ram is not None): 769 mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram) 770 self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress) 771 last_feedback = time.time() 772 773 time.sleep(self._MIGRATION_POLL_INTERVAL) 774 775 result = self.rpc.call_instance_finalize_migration_src(source_node, 776 instance, 777 True, 778 self.live) 779 msg = result.fail_msg 780 if msg: 781 logging.error("Instance migration succeeded, but finalization failed" 782 " on the source node: %s", msg) 783 raise errors.OpExecError("Could not finalize instance migration: %s" % 784 msg) 785 786 instance.primary_node = target_node 787 788 # distribute new instance config to the other nodes 789 self.cfg.Update(instance, self.feedback_fn) 790 791 result = self.rpc.call_instance_finalize_migration_dst(target_node, 792 instance, 793 migration_info, 794 True) 795 msg = result.fail_msg 796 if msg: 797 logging.error("Instance migration succeeded, but finalization failed" 798 " on the target node: %s", msg) 799 raise errors.OpExecError("Could not finalize instance migration: %s" % 800 msg) 801 802 if self.instance.disk_template not in constants.DTS_EXT_MIRROR: 803 self._EnsureSecondary(source_node) 804 self._WaitUntilSync() 805 self._GoStandalone() 806 self._GoReconnect(False) 807 self._WaitUntilSync() 808 809 # If the instance's disk template is `rbd' or `ext' and there was a 810 # successful migration, unmap the device from the source node. 811 if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT): 812 disks = ExpandCheckDisks(instance, instance.disks) 813 self.feedback_fn("* unmapping instance's disks from %s" % source_node) 814 for disk in disks: 815 result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance)) 816 msg = result.fail_msg 817 if msg: 818 logging.error("Migration was successful, but couldn't unmap the" 819 " block device %s on source node %s: %s", 820 disk.iv_name, source_node, msg) 821 logging.error("You need to unmap the device %s manually on %s", 822 disk.iv_name, source_node) 823 824 self.feedback_fn("* done")
825
826 - def _ExecFailover(self):
827 """Failover an instance. 828 829 The failover is done by shutting it down on its present node and 830 starting it on the secondary. 831 832 """ 833 instance = self.instance 834 primary_node = self.cfg.GetNodeInfo(instance.primary_node) 835 836 source_node = instance.primary_node 837 target_node = self.target_node 838 839 if instance.disks_active: 840 self.feedback_fn("* checking disk consistency between source and target") 841 for (idx, dev) in enumerate(instance.disks): 842 # for drbd, these are drbd over lvm 843 if not CheckDiskConsistency(self.lu, instance, dev, target_node, 844 False): 845 if primary_node.offline: 846 self.feedback_fn("Node %s is offline, ignoring degraded disk %s on" 847 " target node %s" % 848 (primary_node.name, idx, target_node)) 849 elif not self.ignore_consistency: 850 raise errors.OpExecError("Disk %s is degraded on target node," 851 " aborting failover" % idx) 852 else: 853 self.feedback_fn("* not checking disk consistency as instance is not" 854 " running") 855 856 self.feedback_fn("* shutting down instance on source node") 857 logging.info("Shutting down instance %s on node %s", 858 instance.name, source_node) 859 860 result = self.rpc.call_instance_shutdown(source_node, instance, 861 self.shutdown_timeout, 862 self.lu.op.reason) 863 msg = result.fail_msg 864 if msg: 865 if self.ignore_consistency or primary_node.offline: 866 self.lu.LogWarning("Could not shutdown instance %s on node %s," 867 " proceeding anyway; please make sure node" 868 " %s is down; error details: %s", 869 instance.name, source_node, source_node, msg) 870 else: 871 raise errors.OpExecError("Could not shutdown instance %s on" 872 " node %s: %s" % 873 (instance.name, source_node, msg)) 874 875 self.feedback_fn("* deactivating the instance's disks on source node") 876 if not ShutdownInstanceDisks(self.lu, instance, ignore_primary=True): 877 raise errors.OpExecError("Can't shut down the instance's disks") 878 879 instance.primary_node = target_node 880 # distribute new instance config to the other nodes 881 self.cfg.Update(instance, self.feedback_fn) 882 883 # Only start the instance if it's marked as up 884 if instance.admin_state == constants.ADMINST_UP: 885 self.feedback_fn("* activating the instance's disks on target node %s" % 886 target_node) 887 logging.info("Starting instance %s on node %s", 888 instance.name, target_node) 889 890 disks_ok, _ = AssembleInstanceDisks(self.lu, instance, 891 ignore_secondaries=True) 892 if not disks_ok: 893 ShutdownInstanceDisks(self.lu, instance) 894 raise errors.OpExecError("Can't activate the instance's disks") 895 896 self.feedback_fn("* starting the instance on the target node %s" % 897 target_node) 898 result = self.rpc.call_instance_start(target_node, (instance, None, None), 899 False, self.lu.op.reason) 900 msg = result.fail_msg 901 if msg: 902 ShutdownInstanceDisks(self.lu, instance) 903 raise errors.OpExecError("Could not start instance %s on node %s: %s" % 904 (instance.name, target_node, msg))
905
906 - def Exec(self, feedback_fn):
907 """Perform the migration. 908 909 """ 910 self.feedback_fn = feedback_fn 911 self.source_node = self.instance.primary_node 912 913 # FIXME: if we implement migrate-to-any in DRBD, this needs fixing 914 if self.instance.disk_template in constants.DTS_INT_MIRROR: 915 self.target_node = self.instance.secondary_nodes[0] 916 # Otherwise self.target_node has been populated either 917 # directly, or through an iallocator. 918 919 self.all_nodes = [self.source_node, self.target_node] 920 self.nodes_ip = dict((name, node.secondary_ip) for (name, node) 921 in self.cfg.GetMultiNodeInfo(self.all_nodes)) 922 923 if self.failover: 924 feedback_fn("Failover instance %s" % self.instance.name) 925 self._ExecFailover() 926 else: 927 feedback_fn("Migrating instance %s" % self.instance.name) 928 929 if self.cleanup: 930 return self._ExecCleanup() 931 else: 932 return self._ExecMigration()
933