Package ganeti :: Package cmdlib :: Module instance_migration
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_migration

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Logical units dealing with instance migration an failover.""" 
 23   
 24  import logging 
 25  import time 
 26   
 27  from ganeti import constants 
 28  from ganeti import errors 
 29  from ganeti import locking 
 30  from ganeti.masterd import iallocator 
 31  from ganeti import utils 
 32  from ganeti.cmdlib.base import LogicalUnit, Tasklet 
 33  from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \ 
 34    CheckIAllocatorOrNode, ExpandNodeUuidAndName 
 35  from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \ 
 36    ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks 
 37  from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \ 
 38    CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \ 
 39    CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist 
 40   
 41  import ganeti.masterd.instance 
 42   
 43   
44 -def _ExpandNamesForMigration(lu):
45 """Expands names for use with L{TLMigrateInstance}. 46 47 @type lu: L{LogicalUnit} 48 49 """ 50 if lu.op.target_node is not None: 51 (lu.op.target_node_uuid, lu.op.target_node) = \ 52 ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node) 53 54 lu.needed_locks[locking.LEVEL_NODE] = [] 55 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 56 57 lu.needed_locks[locking.LEVEL_NODE_RES] = [] 58 lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE 59 60 # The node allocation lock is actually only needed for externally replicated 61 # instances (e.g. sharedfile or RBD) and if an iallocator is used. 62 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
63 64
65 -def _DeclareLocksForMigration(lu, level):
66 """Declares locks for L{TLMigrateInstance}. 67 68 @type lu: L{LogicalUnit} 69 @param level: Lock level 70 71 """ 72 if level == locking.LEVEL_NODE_ALLOC: 73 assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE) 74 75 instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid) 76 77 # Node locks are already declared here rather than at LEVEL_NODE as we need 78 # the instance object anyway to declare the node allocation lock. 79 if instance.disk_template in constants.DTS_EXT_MIRROR: 80 if lu.op.target_node is None: 81 lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 82 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET 83 else: 84 lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, 85 lu.op.target_node_uuid] 86 del lu.recalculate_locks[locking.LEVEL_NODE] 87 else: 88 lu._LockInstancesNodes() # pylint: disable=W0212 89 90 elif level == locking.LEVEL_NODE: 91 # Node locks are declared together with the node allocation lock 92 assert (lu.needed_locks[locking.LEVEL_NODE] or 93 lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET) 94 95 elif level == locking.LEVEL_NODE_RES: 96 # Copy node locks 97 lu.needed_locks[locking.LEVEL_NODE_RES] = \ 98 CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
99 100
101 -class LUInstanceFailover(LogicalUnit):
102 """Failover an instance. 103 104 """ 105 HPATH = "instance-failover" 106 HTYPE = constants.HTYPE_INSTANCE 107 REQ_BGL = False 108
109 - def CheckArguments(self):
110 """Check the arguments. 111 112 """ 113 self.iallocator = getattr(self.op, "iallocator", None) 114 self.target_node = getattr(self.op, "target_node", None)
115
116 - def ExpandNames(self):
117 self._ExpandAndLockInstance() 118 _ExpandNamesForMigration(self) 119 120 self._migrater = \ 121 TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name, 122 self.op.cleanup, True, False, 123 self.op.ignore_consistency, True, 124 self.op.shutdown_timeout, self.op.ignore_ipolicy) 125 126 self.tasklets = [self._migrater]
127
128 - def DeclareLocks(self, level):
129 _DeclareLocksForMigration(self, level)
130
131 - def BuildHooksEnv(self):
132 """Build hooks env. 133 134 This runs on master, primary and secondary nodes of the instance. 135 136 """ 137 instance = self._migrater.instance 138 source_node_uuid = instance.primary_node 139 env = { 140 "IGNORE_CONSISTENCY": self.op.ignore_consistency, 141 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, 142 "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid), 143 "NEW_PRIMARY": self.op.target_node, 144 "FAILOVER_CLEANUP": self.op.cleanup, 145 } 146 147 if instance.disk_template in constants.DTS_INT_MIRROR: 148 env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0]) 149 env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid) 150 else: 151 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = "" 152 153 env.update(BuildInstanceHookEnvByObject(self, instance)) 154 155 return env
156
157 - def BuildHooksNodes(self):
158 """Build hooks nodes. 159 160 """ 161 instance = self._migrater.instance 162 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes) 163 return (nl, nl + [instance.primary_node])
164 165
166 -class LUInstanceMigrate(LogicalUnit):
167 """Migrate an instance. 168 169 This is migration without shutting down, compared to the failover, 170 which is done with shutdown. 171 172 """ 173 HPATH = "instance-migrate" 174 HTYPE = constants.HTYPE_INSTANCE 175 REQ_BGL = False 176
177 - def ExpandNames(self):
178 self._ExpandAndLockInstance() 179 _ExpandNamesForMigration(self) 180 181 self._migrater = \ 182 TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name, 183 self.op.cleanup, False, self.op.allow_failover, False, 184 self.op.allow_runtime_changes, 185 constants.DEFAULT_SHUTDOWN_TIMEOUT, 186 self.op.ignore_ipolicy) 187 188 self.tasklets = [self._migrater]
189
190 - def DeclareLocks(self, level):
191 _DeclareLocksForMigration(self, level)
192
193 - def BuildHooksEnv(self):
194 """Build hooks env. 195 196 This runs on master, primary and secondary nodes of the instance. 197 198 """ 199 instance = self._migrater.instance 200 source_node_uuid = instance.primary_node 201 env = BuildInstanceHookEnvByObject(self, instance) 202 env.update({ 203 "MIGRATE_LIVE": self._migrater.live, 204 "MIGRATE_CLEANUP": self.op.cleanup, 205 "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid), 206 "NEW_PRIMARY": self.op.target_node, 207 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 208 }) 209 210 if instance.disk_template in constants.DTS_INT_MIRROR: 211 env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0]) 212 env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid) 213 else: 214 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None 215 216 return env
217
218 - def BuildHooksNodes(self):
219 """Build hooks nodes. 220 221 """ 222 instance = self._migrater.instance 223 snode_uuids = list(instance.secondary_nodes) 224 nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids 225 return (nl, nl)
226 227
228 -class TLMigrateInstance(Tasklet):
229 """Tasklet class for instance migration. 230 231 @type live: boolean 232 @ivar live: whether the migration will be done live or non-live; 233 this variable is initalized only after CheckPrereq has run 234 @type cleanup: boolean 235 @ivar cleanup: Wheater we cleanup from a failed migration 236 @type iallocator: string 237 @ivar iallocator: The iallocator used to determine target_node 238 @type target_node_uuid: string 239 @ivar target_node_uuid: If given, the target node UUID to reallocate the 240 instance to 241 @type failover: boolean 242 @ivar failover: Whether operation results in failover or migration 243 @type fallback: boolean 244 @ivar fallback: Whether fallback to failover is allowed if migration not 245 possible 246 @type ignore_consistency: boolean 247 @ivar ignore_consistency: Wheter we should ignore consistency between source 248 and target node 249 @type shutdown_timeout: int 250 @ivar shutdown_timeout: In case of failover timeout of the shutdown 251 @type ignore_ipolicy: bool 252 @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating 253 254 """ 255 256 # Constants 257 _MIGRATION_POLL_INTERVAL = 1 # seconds 258 _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds 259
260 - def __init__(self, lu, instance_uuid, instance_name, cleanup, failover, 261 fallback, ignore_consistency, allow_runtime_changes, 262 shutdown_timeout, ignore_ipolicy):
263 """Initializes this class. 264 265 """ 266 Tasklet.__init__(self, lu) 267 268 # Parameters 269 self.instance_uuid = instance_uuid 270 self.instance_name = instance_name 271 self.cleanup = cleanup 272 self.live = False # will be overridden later 273 self.failover = failover 274 self.fallback = fallback 275 self.ignore_consistency = ignore_consistency 276 self.shutdown_timeout = shutdown_timeout 277 self.ignore_ipolicy = ignore_ipolicy 278 self.allow_runtime_changes = allow_runtime_changes
279
280 - def CheckPrereq(self):
281 """Check prerequisites. 282 283 This checks that the instance is in the cluster. 284 285 """ 286 (self.instance_uuid, self.instance_name) = \ 287 ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid, 288 self.instance_name) 289 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 290 assert self.instance is not None 291 cluster = self.cfg.GetClusterInfo() 292 293 if (not self.cleanup and 294 not self.instance.admin_state == constants.ADMINST_UP and 295 not self.failover and self.fallback): 296 self.lu.LogInfo("Instance is marked down or offline, fallback allowed," 297 " switching to failover") 298 self.failover = True 299 300 if self.instance.disk_template not in constants.DTS_MIRRORED: 301 if self.failover: 302 text = "failovers" 303 else: 304 text = "migrations" 305 raise errors.OpPrereqError("Instance's disk layout '%s' does not allow" 306 " %s" % (self.instance.disk_template, text), 307 errors.ECODE_STATE) 308 309 if self.instance.disk_template in constants.DTS_EXT_MIRROR: 310 CheckIAllocatorOrNode(self.lu, "iallocator", "target_node") 311 312 if self.lu.op.iallocator: 313 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC) 314 self._RunAllocator() 315 else: 316 # We set set self.target_node_uuid as it is required by 317 # BuildHooksEnv 318 self.target_node_uuid = self.lu.op.target_node_uuid 319 320 # Check that the target node is correct in terms of instance policy 321 nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid) 322 group_info = self.cfg.GetNodeGroup(nodeinfo.group) 323 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 324 group_info) 325 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo, 326 self.cfg, ignore=self.ignore_ipolicy) 327 328 # self.target_node is already populated, either directly or by the 329 # iallocator run 330 target_node_uuid = self.target_node_uuid 331 if self.target_node_uuid == self.instance.primary_node: 332 raise errors.OpPrereqError( 333 "Cannot migrate instance %s to its primary (%s)" % 334 (self.instance.name, 335 self.cfg.GetNodeName(self.instance.primary_node)), 336 errors.ECODE_STATE) 337 338 if len(self.lu.tasklets) == 1: 339 # It is safe to release locks only when we're the only tasklet 340 # in the LU 341 ReleaseLocks(self.lu, locking.LEVEL_NODE, 342 keep=[self.instance.primary_node, self.target_node_uuid]) 343 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) 344 345 else: 346 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) 347 348 secondary_node_uuids = self.instance.secondary_nodes 349 if not secondary_node_uuids: 350 raise errors.ConfigurationError("No secondary node but using" 351 " %s disk template" % 352 self.instance.disk_template) 353 target_node_uuid = secondary_node_uuids[0] 354 if self.lu.op.iallocator or \ 355 (self.lu.op.target_node_uuid and 356 self.lu.op.target_node_uuid != target_node_uuid): 357 if self.failover: 358 text = "failed over" 359 else: 360 text = "migrated" 361 raise errors.OpPrereqError("Instances with disk template %s cannot" 362 " be %s to arbitrary nodes" 363 " (neither an iallocator nor a target" 364 " node can be passed)" % 365 (self.instance.disk_template, text), 366 errors.ECODE_INVAL) 367 nodeinfo = self.cfg.GetNodeInfo(target_node_uuid) 368 group_info = self.cfg.GetNodeGroup(nodeinfo.group) 369 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 370 group_info) 371 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo, 372 self.cfg, ignore=self.ignore_ipolicy) 373 374 i_be = cluster.FillBE(self.instance) 375 376 # check memory requirements on the secondary node 377 if (not self.cleanup and 378 (not self.failover or 379 self.instance.admin_state == constants.ADMINST_UP)): 380 self.tgt_free_mem = CheckNodeFreeMemory( 381 self.lu, target_node_uuid, 382 "migrating instance %s" % self.instance.name, 383 i_be[constants.BE_MINMEM], self.instance.hypervisor, 384 self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor]) 385 else: 386 self.lu.LogInfo("Not checking memory on the secondary node as" 387 " instance will not be started") 388 389 # check if failover must be forced instead of migration 390 if (not self.cleanup and not self.failover and 391 i_be[constants.BE_ALWAYS_FAILOVER]): 392 self.lu.LogInfo("Instance configured to always failover; fallback" 393 " to failover") 394 self.failover = True 395 396 # check bridge existance 397 CheckInstanceBridgesExist(self.lu, self.instance, 398 node_uuid=target_node_uuid) 399 400 if not self.cleanup: 401 CheckNodeNotDrained(self.lu, target_node_uuid) 402 if not self.failover: 403 result = self.rpc.call_instance_migratable(self.instance.primary_node, 404 self.instance) 405 if result.fail_msg and self.fallback: 406 self.lu.LogInfo("Can't migrate, instance offline, fallback to" 407 " failover") 408 self.failover = True 409 else: 410 result.Raise("Can't migrate, please use failover", 411 prereq=True, ecode=errors.ECODE_STATE) 412 413 assert not (self.failover and self.cleanup) 414 415 if not self.failover: 416 if self.lu.op.live is not None and self.lu.op.mode is not None: 417 raise errors.OpPrereqError("Only one of the 'live' and 'mode'" 418 " parameters are accepted", 419 errors.ECODE_INVAL) 420 if self.lu.op.live is not None: 421 if self.lu.op.live: 422 self.lu.op.mode = constants.HT_MIGRATION_LIVE 423 else: 424 self.lu.op.mode = constants.HT_MIGRATION_NONLIVE 425 # reset the 'live' parameter to None so that repeated 426 # invocations of CheckPrereq do not raise an exception 427 self.lu.op.live = None 428 elif self.lu.op.mode is None: 429 # read the default value from the hypervisor 430 i_hv = cluster.FillHV(self.instance, skip_globals=False) 431 self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] 432 433 self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE 434 else: 435 # Failover is never live 436 self.live = False 437 438 if not (self.failover or self.cleanup): 439 remote_info = self.rpc.call_instance_info( 440 self.instance.primary_node, self.instance.name, 441 self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor]) 442 remote_info.Raise("Error checking instance on node %s" % 443 self.cfg.GetNodeName(self.instance.primary_node)) 444 instance_running = bool(remote_info.payload) 445 if instance_running: 446 self.current_mem = int(remote_info.payload["memory"])
447
448 - def _RunAllocator(self):
449 """Run the allocator based on input opcode. 450 451 """ 452 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC) 453 454 # FIXME: add a self.ignore_ipolicy option 455 req = iallocator.IAReqRelocate( 456 inst_uuid=self.instance_uuid, 457 relocate_from_node_uuids=[self.instance.primary_node]) 458 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 459 460 ial.Run(self.lu.op.iallocator) 461 462 if not ial.success: 463 raise errors.OpPrereqError("Can't compute nodes using" 464 " iallocator '%s': %s" % 465 (self.lu.op.iallocator, ial.info), 466 errors.ECODE_NORES) 467 self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid 468 self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 469 self.instance_name, self.lu.op.iallocator, 470 utils.CommaJoin(ial.result))
471
472 - def _WaitUntilSync(self):
473 """Poll with custom rpc for disk sync. 474 475 This uses our own step-based rpc call. 476 477 """ 478 self.feedback_fn("* wait until resync is done") 479 all_done = False 480 while not all_done: 481 all_done = True 482 result = self.rpc.call_drbd_wait_sync(self.all_node_uuids, 483 self.nodes_ip, 484 (self.instance.disks, 485 self.instance)) 486 min_percent = 100 487 for node_uuid, nres in result.items(): 488 nres.Raise("Cannot resync disks on node %s" % 489 self.cfg.GetNodeName(node_uuid)) 490 node_done, node_percent = nres.payload 491 all_done = all_done and node_done 492 if node_percent is not None: 493 min_percent = min(min_percent, node_percent) 494 if not all_done: 495 if min_percent < 100: 496 self.feedback_fn(" - progress: %.1f%%" % min_percent) 497 time.sleep(2)
498
499 - def _EnsureSecondary(self, node_uuid):
500 """Demote a node to secondary. 501 502 """ 503 self.feedback_fn("* switching node %s to secondary mode" % 504 self.cfg.GetNodeName(node_uuid)) 505 506 for dev in self.instance.disks: 507 self.cfg.SetDiskID(dev, node_uuid) 508 509 result = self.rpc.call_blockdev_close(node_uuid, self.instance.name, 510 self.instance.disks) 511 result.Raise("Cannot change disk to secondary on node %s" % 512 self.cfg.GetNodeName(node_uuid))
513
514 - def _GoStandalone(self):
515 """Disconnect from the network. 516 517 """ 518 self.feedback_fn("* changing into standalone mode") 519 result = self.rpc.call_drbd_disconnect_net(self.all_node_uuids, 520 self.nodes_ip, 521 self.instance.disks) 522 for node_uuid, nres in result.items(): 523 nres.Raise("Cannot disconnect disks node %s" % 524 self.cfg.GetNodeName(node_uuid))
525
526 - def _GoReconnect(self, multimaster):
527 """Reconnect to the network. 528 529 """ 530 if multimaster: 531 msg = "dual-master" 532 else: 533 msg = "single-master" 534 self.feedback_fn("* changing disks into %s mode" % msg) 535 result = self.rpc.call_drbd_attach_net(self.all_node_uuids, self.nodes_ip, 536 (self.instance.disks, self.instance), 537 self.instance.name, multimaster) 538 for node_uuid, nres in result.items(): 539 nres.Raise("Cannot change disks config on node %s" % 540 self.cfg.GetNodeName(node_uuid))
541
542 - def _ExecCleanup(self):
543 """Try to cleanup after a failed migration. 544 545 The cleanup is done by: 546 - check that the instance is running only on one node 547 (and update the config if needed) 548 - change disks on its secondary node to secondary 549 - wait until disks are fully synchronized 550 - disconnect from the network 551 - change disks into single-master mode 552 - wait again until disks are fully synchronized 553 554 """ 555 # check running on only one node 556 self.feedback_fn("* checking where the instance actually runs" 557 " (if this hangs, the hypervisor might be in" 558 " a bad state)") 559 cluster_hvparams = self.cfg.GetClusterInfo().hvparams 560 ins_l = self.rpc.call_instance_list(self.all_node_uuids, 561 [self.instance.hypervisor], 562 cluster_hvparams) 563 for node_uuid, result in ins_l.items(): 564 result.Raise("Can't contact node %s" % node_uuid) 565 566 runningon_source = self.instance.name in \ 567 ins_l[self.source_node_uuid].payload 568 runningon_target = self.instance.name in \ 569 ins_l[self.target_node_uuid].payload 570 571 if runningon_source and runningon_target: 572 raise errors.OpExecError("Instance seems to be running on two nodes," 573 " or the hypervisor is confused; you will have" 574 " to ensure manually that it runs only on one" 575 " and restart this operation") 576 577 if not (runningon_source or runningon_target): 578 raise errors.OpExecError("Instance does not seem to be running at all;" 579 " in this case it's safer to repair by" 580 " running 'gnt-instance stop' to ensure disk" 581 " shutdown, and then restarting it") 582 583 if runningon_target: 584 # the migration has actually succeeded, we need to update the config 585 self.feedback_fn("* instance running on secondary node (%s)," 586 " updating config" % 587 self.cfg.GetNodeName(self.target_node_uuid)) 588 self.instance.primary_node = self.target_node_uuid 589 self.cfg.Update(self.instance, self.feedback_fn) 590 demoted_node_uuid = self.source_node_uuid 591 else: 592 self.feedback_fn("* instance confirmed to be running on its" 593 " primary node (%s)" % 594 self.cfg.GetNodeName(self.source_node_uuid)) 595 demoted_node_uuid = self.target_node_uuid 596 597 if self.instance.disk_template in constants.DTS_INT_MIRROR: 598 self._EnsureSecondary(demoted_node_uuid) 599 try: 600 self._WaitUntilSync() 601 except errors.OpExecError: 602 # we ignore here errors, since if the device is standalone, it 603 # won't be able to sync 604 pass 605 self._GoStandalone() 606 self._GoReconnect(False) 607 self._WaitUntilSync() 608 609 self.feedback_fn("* done")
610
611 - def _RevertDiskStatus(self):
612 """Try to revert the disk status after a failed migration. 613 614 """ 615 if self.instance.disk_template in constants.DTS_EXT_MIRROR: 616 return 617 618 try: 619 self._EnsureSecondary(self.target_node_uuid) 620 self._GoStandalone() 621 self._GoReconnect(False) 622 self._WaitUntilSync() 623 except errors.OpExecError, err: 624 self.lu.LogWarning("Migration failed and I can't reconnect the drives," 625 " please try to recover the instance manually;" 626 " error '%s'" % str(err))
627
628 - def _AbortMigration(self):
629 """Call the hypervisor code to abort a started migration. 630 631 """ 632 abort_result = self.rpc.call_instance_finalize_migration_dst( 633 self.target_node_uuid, self.instance, self.migration_info, 634 False) 635 abort_msg = abort_result.fail_msg 636 if abort_msg: 637 logging.error("Aborting migration failed on target node %s: %s", 638 self.cfg.GetNodeName(self.target_node_uuid), abort_msg) 639 # Don't raise an exception here, as we stil have to try to revert the 640 # disk status, even if this step failed. 641 642 abort_result = self.rpc.call_instance_finalize_migration_src( 643 self.source_node_uuid, self.instance, False, self.live) 644 abort_msg = abort_result.fail_msg 645 if abort_msg: 646 logging.error("Aborting migration failed on source node %s: %s", 647 self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
648
649 - def _ExecMigration(self):
650 """Migrate an instance. 651 652 The migrate is done by: 653 - change the disks into dual-master mode 654 - wait until disks are fully synchronized again 655 - migrate the instance 656 - change disks on the new secondary node (the old primary) to secondary 657 - wait until disks are fully synchronized 658 - change disks into single-master mode 659 660 """ 661 # Check for hypervisor version mismatch and warn the user. 662 hvspecs = [(self.instance.hypervisor, 663 self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])] 664 nodeinfo = self.rpc.call_node_info( 665 [self.source_node_uuid, self.target_node_uuid], None, hvspecs) 666 for ninfo in nodeinfo.values(): 667 ninfo.Raise("Unable to retrieve node information from node '%s'" % 668 ninfo.node) 669 (_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload 670 (_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload 671 672 if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and 673 (constants.HV_NODEINFO_KEY_VERSION in dst_info)): 674 src_version = src_info[constants.HV_NODEINFO_KEY_VERSION] 675 dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION] 676 if src_version != dst_version: 677 self.feedback_fn("* warning: hypervisor version mismatch between" 678 " source (%s) and target (%s) node" % 679 (src_version, dst_version)) 680 681 self.feedback_fn("* checking disk consistency between source and target") 682 for (idx, dev) in enumerate(self.instance.disks): 683 if not CheckDiskConsistency(self.lu, self.instance, dev, 684 self.target_node_uuid, 685 False): 686 raise errors.OpExecError("Disk %s is degraded or not fully" 687 " synchronized on target node," 688 " aborting migration" % idx) 689 690 if self.current_mem > self.tgt_free_mem: 691 if not self.allow_runtime_changes: 692 raise errors.OpExecError("Memory ballooning not allowed and not enough" 693 " free memory to fit instance %s on target" 694 " node %s (have %dMB, need %dMB)" % 695 (self.instance.name, 696 self.cfg.GetNodeName(self.target_node_uuid), 697 self.tgt_free_mem, self.current_mem)) 698 self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem) 699 rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node, 700 self.instance, 701 self.tgt_free_mem) 702 rpcres.Raise("Cannot modify instance runtime memory") 703 704 # First get the migration information from the remote node 705 result = self.rpc.call_migration_info(self.source_node_uuid, self.instance) 706 msg = result.fail_msg 707 if msg: 708 log_err = ("Failed fetching source migration information from %s: %s" % 709 (self.cfg.GetNodeName(self.source_node_uuid), msg)) 710 logging.error(log_err) 711 raise errors.OpExecError(log_err) 712 713 self.migration_info = migration_info = result.payload 714 715 if self.instance.disk_template not in constants.DTS_EXT_MIRROR: 716 # Then switch the disks to master/master mode 717 self._EnsureSecondary(self.target_node_uuid) 718 self._GoStandalone() 719 self._GoReconnect(True) 720 self._WaitUntilSync() 721 722 self.feedback_fn("* preparing %s to accept the instance" % 723 self.cfg.GetNodeName(self.target_node_uuid)) 724 # This fills physical_id slot that may be missing on newly created disks 725 for disk in self.instance.disks: 726 self.cfg.SetDiskID(disk, self.target_node_uuid) 727 result = self.rpc.call_accept_instance(self.target_node_uuid, 728 self.instance, 729 migration_info, 730 self.nodes_ip[self.target_node_uuid]) 731 732 msg = result.fail_msg 733 if msg: 734 logging.error("Instance pre-migration failed, trying to revert" 735 " disk status: %s", msg) 736 self.feedback_fn("Pre-migration failed, aborting") 737 self._AbortMigration() 738 self._RevertDiskStatus() 739 raise errors.OpExecError("Could not pre-migrate instance %s: %s" % 740 (self.instance.name, msg)) 741 742 self.feedback_fn("* migrating instance to %s" % 743 self.cfg.GetNodeName(self.target_node_uuid)) 744 cluster = self.cfg.GetClusterInfo() 745 result = self.rpc.call_instance_migrate( 746 self.source_node_uuid, cluster.cluster_name, self.instance, 747 self.nodes_ip[self.target_node_uuid], self.live) 748 msg = result.fail_msg 749 if msg: 750 logging.error("Instance migration failed, trying to revert" 751 " disk status: %s", msg) 752 self.feedback_fn("Migration failed, aborting") 753 self._AbortMigration() 754 self._RevertDiskStatus() 755 raise errors.OpExecError("Could not migrate instance %s: %s" % 756 (self.instance.name, msg)) 757 758 self.feedback_fn("* starting memory transfer") 759 last_feedback = time.time() 760 while True: 761 result = self.rpc.call_instance_get_migration_status( 762 self.source_node_uuid, self.instance) 763 msg = result.fail_msg 764 ms = result.payload # MigrationStatus instance 765 if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES): 766 logging.error("Instance migration failed, trying to revert" 767 " disk status: %s", msg) 768 self.feedback_fn("Migration failed, aborting") 769 self._AbortMigration() 770 self._RevertDiskStatus() 771 if not msg: 772 msg = "hypervisor returned failure" 773 raise errors.OpExecError("Could not migrate instance %s: %s" % 774 (self.instance.name, msg)) 775 776 if result.payload.status != constants.HV_MIGRATION_ACTIVE: 777 self.feedback_fn("* memory transfer complete") 778 break 779 780 if (utils.TimeoutExpired(last_feedback, 781 self._MIGRATION_FEEDBACK_INTERVAL) and 782 ms.transferred_ram is not None): 783 mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram) 784 self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress) 785 last_feedback = time.time() 786 787 time.sleep(self._MIGRATION_POLL_INTERVAL) 788 789 result = self.rpc.call_instance_finalize_migration_src( 790 self.source_node_uuid, self.instance, True, self.live) 791 msg = result.fail_msg 792 if msg: 793 logging.error("Instance migration succeeded, but finalization failed" 794 " on the source node: %s", msg) 795 raise errors.OpExecError("Could not finalize instance migration: %s" % 796 msg) 797 798 self.instance.primary_node = self.target_node_uuid 799 800 # distribute new instance config to the other nodes 801 self.cfg.Update(self.instance, self.feedback_fn) 802 803 result = self.rpc.call_instance_finalize_migration_dst( 804 self.target_node_uuid, self.instance, migration_info, True) 805 msg = result.fail_msg 806 if msg: 807 logging.error("Instance migration succeeded, but finalization failed" 808 " on the target node: %s", msg) 809 raise errors.OpExecError("Could not finalize instance migration: %s" % 810 msg) 811 812 if self.instance.disk_template not in constants.DTS_EXT_MIRROR: 813 self._EnsureSecondary(self.source_node_uuid) 814 self._WaitUntilSync() 815 self._GoStandalone() 816 self._GoReconnect(False) 817 self._WaitUntilSync() 818 819 # If the instance's disk template is `rbd' or `ext' and there was a 820 # successful migration, unmap the device from the source node. 821 if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT): 822 disks = ExpandCheckDisks(self.instance, self.instance.disks) 823 self.feedback_fn("* unmapping instance's disks from %s" % 824 self.cfg.GetNodeName(self.source_node_uuid)) 825 for disk in disks: 826 result = self.rpc.call_blockdev_shutdown(self.source_node_uuid, 827 (disk, self.instance)) 828 msg = result.fail_msg 829 if msg: 830 logging.error("Migration was successful, but couldn't unmap the" 831 " block device %s on source node %s: %s", 832 disk.iv_name, 833 self.cfg.GetNodeName(self.source_node_uuid), msg) 834 logging.error("You need to unmap the device %s manually on %s", 835 disk.iv_name, 836 self.cfg.GetNodeName(self.source_node_uuid)) 837 838 self.feedback_fn("* done")
839
840 - def _ExecFailover(self):
841 """Failover an instance. 842 843 The failover is done by shutting it down on its present node and 844 starting it on the secondary. 845 846 """ 847 primary_node = self.cfg.GetNodeInfo(self.instance.primary_node) 848 849 source_node_uuid = self.instance.primary_node 850 851 if self.instance.disks_active: 852 self.feedback_fn("* checking disk consistency between source and target") 853 for (idx, dev) in enumerate(self.instance.disks): 854 # for drbd, these are drbd over lvm 855 if not CheckDiskConsistency(self.lu, self.instance, dev, 856 self.target_node_uuid, False): 857 if primary_node.offline: 858 self.feedback_fn("Node %s is offline, ignoring degraded disk %s on" 859 " target node %s" % 860 (primary_node.name, idx, 861 self.cfg.GetNodeName(self.target_node_uuid))) 862 elif not self.ignore_consistency: 863 raise errors.OpExecError("Disk %s is degraded on target node," 864 " aborting failover" % idx) 865 else: 866 self.feedback_fn("* not checking disk consistency as instance is not" 867 " running") 868 869 self.feedback_fn("* shutting down instance on source node") 870 logging.info("Shutting down instance %s on node %s", 871 self.instance.name, self.cfg.GetNodeName(source_node_uuid)) 872 873 result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance, 874 self.shutdown_timeout, 875 self.lu.op.reason) 876 msg = result.fail_msg 877 if msg: 878 if self.ignore_consistency or primary_node.offline: 879 self.lu.LogWarning("Could not shutdown instance %s on node %s," 880 " proceeding anyway; please make sure node" 881 " %s is down; error details: %s", 882 self.instance.name, 883 self.cfg.GetNodeName(source_node_uuid), 884 self.cfg.GetNodeName(source_node_uuid), msg) 885 else: 886 raise errors.OpExecError("Could not shutdown instance %s on" 887 " node %s: %s" % 888 (self.instance.name, 889 self.cfg.GetNodeName(source_node_uuid), msg)) 890 891 self.feedback_fn("* deactivating the instance's disks on source node") 892 if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True): 893 raise errors.OpExecError("Can't shut down the instance's disks") 894 895 self.instance.primary_node = self.target_node_uuid 896 # distribute new instance config to the other nodes 897 self.cfg.Update(self.instance, self.feedback_fn) 898 899 # Only start the instance if it's marked as up 900 if self.instance.admin_state == constants.ADMINST_UP: 901 self.feedback_fn("* activating the instance's disks on target node %s" % 902 self.cfg.GetNodeName(self.target_node_uuid)) 903 logging.info("Starting instance %s on node %s", self.instance.name, 904 self.cfg.GetNodeName(self.target_node_uuid)) 905 906 disks_ok, _ = AssembleInstanceDisks(self.lu, self.instance, 907 ignore_secondaries=True) 908 if not disks_ok: 909 ShutdownInstanceDisks(self.lu, self.instance) 910 raise errors.OpExecError("Can't activate the instance's disks") 911 912 self.feedback_fn("* starting the instance on the target node %s" % 913 self.cfg.GetNodeName(self.target_node_uuid)) 914 result = self.rpc.call_instance_start(self.target_node_uuid, 915 (self.instance, None, None), False, 916 self.lu.op.reason) 917 msg = result.fail_msg 918 if msg: 919 ShutdownInstanceDisks(self.lu, self.instance) 920 raise errors.OpExecError("Could not start instance %s on node %s: %s" % 921 (self.instance.name, 922 self.cfg.GetNodeName(self.target_node_uuid), 923 msg))
924
925 - def Exec(self, feedback_fn):
926 """Perform the migration. 927 928 """ 929 self.feedback_fn = feedback_fn 930 self.source_node_uuid = self.instance.primary_node 931 932 # FIXME: if we implement migrate-to-any in DRBD, this needs fixing 933 if self.instance.disk_template in constants.DTS_INT_MIRROR: 934 self.target_node_uuid = self.instance.secondary_nodes[0] 935 # Otherwise self.target_node has been populated either 936 # directly, or through an iallocator. 937 938 self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid] 939 self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node) 940 in self.cfg.GetMultiNodeInfo(self.all_node_uuids)) 941 942 if self.failover: 943 feedback_fn("Failover instance %s" % self.instance.name) 944 self._ExecFailover() 945 else: 946 feedback_fn("Migrating instance %s" % self.instance.name) 947 948 if self.cleanup: 949 return self._ExecCleanup() 950 else: 951 return self._ExecMigration()
952