Package ganeti :: Package cmdlib :: Module instance_migration
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.instance_migration

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with instance migration an failover.""" 
  32   
  33  import logging 
  34  import time 
  35   
  36  from ganeti import constants 
  37  from ganeti import errors 
  38  from ganeti import locking 
  39  from ganeti import hypervisor 
  40  from ganeti.masterd import iallocator 
  41  from ganeti import utils 
  42  from ganeti.cmdlib.base import LogicalUnit, Tasklet 
  43  from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \ 
  44    CheckIAllocatorOrNode, ExpandNodeUuidAndName 
  45  from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \ 
  46    ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks 
  47  from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \ 
  48    CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \ 
  49    CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist 
  50   
  51  import ganeti.masterd.instance 
  52   
  53   
54 -def _ExpandNamesForMigration(lu):
55 """Expands names for use with L{TLMigrateInstance}. 56 57 @type lu: L{LogicalUnit} 58 59 """ 60 if lu.op.target_node is not None: 61 (lu.op.target_node_uuid, lu.op.target_node) = \ 62 ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node) 63 64 lu.needed_locks[locking.LEVEL_NODE] = [] 65 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE 66 lu.dont_collate_locks[locking.LEVEL_NODE] = True 67 68 lu.needed_locks[locking.LEVEL_NODE_RES] = [] 69 lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE 70 lu.dont_collate_locks[locking.LEVEL_NODE_RES] = True
71 72
73 -def _DeclareLocksForMigration(lu, level):
74 """Declares locks for L{TLMigrateInstance}. 75 76 @type lu: L{LogicalUnit} 77 @param level: Lock level 78 79 """ 80 if level == locking.LEVEL_NODE: 81 assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE) 82 83 instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid) 84 85 disks = lu.cfg.GetInstanceDisks(instance.uuid) 86 if utils.AnyDiskOfType(disks, constants.DTS_EXT_MIRROR): 87 if lu.op.target_node is None: 88 lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET 89 else: 90 lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, 91 lu.op.target_node_uuid] 92 else: 93 lu._LockInstancesNodes() # pylint: disable=W0212 94 95 assert (lu.needed_locks[locking.LEVEL_NODE] or 96 lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET) 97 98 elif level == locking.LEVEL_NODE_RES: 99 # Copy node locks 100 lu.needed_locks[locking.LEVEL_NODE_RES] = \ 101 CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
102 103
104 -class LUInstanceFailover(LogicalUnit):
105 """Failover an instance. 106 107 This is migration by shutting the instance down, but with the disks 108 of the instance already available on the new node. 109 110 See also: 111 L{LUInstanceMove} for moving an instance by copying the data. 112 113 L{LUInstanceMigrate} for the live migration of an instance (no shutdown 114 required). 115 """ 116 HPATH = "instance-failover" 117 HTYPE = constants.HTYPE_INSTANCE 118 REQ_BGL = False 119
120 - def CheckArguments(self):
121 """Check the arguments. 122 123 """ 124 self.iallocator = getattr(self.op, "iallocator", None) 125 self.target_node = getattr(self.op, "target_node", None)
126
127 - def ExpandNames(self):
128 self._ExpandAndLockInstance(allow_forthcoming=True) 129 _ExpandNamesForMigration(self) 130 131 self._migrater = \ 132 TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name, 133 self.op.cleanup, True, False, 134 self.op.ignore_consistency, True, 135 self.op.shutdown_timeout, self.op.ignore_ipolicy, True) 136 137 self.tasklets = [self._migrater]
138
139 - def DeclareLocks(self, level):
140 _DeclareLocksForMigration(self, level)
141
142 - def BuildHooksEnv(self):
143 """Build hooks env. 144 145 This runs on master, primary and secondary nodes of the instance. 146 147 """ 148 instance = self._migrater.instance 149 source_node_uuid = instance.primary_node 150 target_node_uuid = self._migrater.target_node_uuid 151 env = { 152 "IGNORE_CONSISTENCY": self.op.ignore_consistency, 153 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, 154 "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid), 155 "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid), 156 "FAILOVER_CLEANUP": self.op.cleanup, 157 } 158 159 disks = self.cfg.GetInstanceDisks(instance.uuid) 160 if utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR): 161 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 162 env["OLD_SECONDARY"] = self.cfg.GetNodeName(secondary_nodes[0]) 163 env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid) 164 else: 165 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = "" 166 167 env.update(BuildInstanceHookEnvByObject(self, instance)) 168 169 return env
170
171 - def BuildHooksNodes(self):
172 """Build hooks nodes. 173 174 """ 175 instance = self._migrater.instance 176 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 177 nl = [self.cfg.GetMasterNode()] + list(secondary_nodes) 178 nl.append(self._migrater.target_node_uuid) 179 return (nl, nl + [instance.primary_node])
180 181
182 -class LUInstanceMigrate(LogicalUnit):
183 """Migrate an instance. 184 185 This is migration without shutting down (live migration) and the disks are 186 already available on the new node. 187 188 See also: 189 L{LUInstanceMove} for moving an instance by copying the data. 190 191 L{LUInstanceFailover} for the migration of an instance where a shutdown is 192 required. 193 """ 194 HPATH = "instance-migrate" 195 HTYPE = constants.HTYPE_INSTANCE 196 REQ_BGL = False 197
198 - def ExpandNames(self):
199 self._ExpandAndLockInstance() 200 _ExpandNamesForMigration(self) 201 202 self._migrater = \ 203 TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name, 204 self.op.cleanup, False, self.op.allow_failover, False, 205 self.op.allow_runtime_changes, 206 constants.DEFAULT_SHUTDOWN_TIMEOUT, 207 self.op.ignore_ipolicy, self.op.ignore_hvversions) 208 209 self.tasklets = [self._migrater]
210
211 - def DeclareLocks(self, level):
212 _DeclareLocksForMigration(self, level)
213
214 - def BuildHooksEnv(self):
215 """Build hooks env. 216 217 This runs on master, primary and secondary nodes of the instance. 218 219 """ 220 instance = self._migrater.instance 221 source_node_uuid = instance.primary_node 222 target_node_uuid = self._migrater.target_node_uuid 223 env = BuildInstanceHookEnvByObject(self, instance) 224 env.update({ 225 "MIGRATE_LIVE": self._migrater.live, 226 "MIGRATE_CLEANUP": self.op.cleanup, 227 "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid), 228 "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid), 229 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, 230 }) 231 232 disks = self.cfg.GetInstanceDisks(instance.uuid) 233 if utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR): 234 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 235 env["OLD_SECONDARY"] = self.cfg.GetNodeName(secondary_nodes[0]) 236 env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid) 237 else: 238 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = "" 239 240 return env
241
242 - def BuildHooksNodes(self):
243 """Build hooks nodes. 244 245 """ 246 instance = self._migrater.instance 247 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 248 snode_uuids = list(secondary_nodes) 249 nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids 250 nl.append(self._migrater.target_node_uuid) 251 return (nl, nl)
252 253
254 -class TLMigrateInstance(Tasklet):
255 """Tasklet class for instance migration. 256 257 @type live: boolean 258 @ivar live: whether the migration will be done live or non-live; 259 this variable is initalized only after CheckPrereq has run 260 @type cleanup: boolean 261 @ivar cleanup: Wheater we cleanup from a failed migration 262 @type iallocator: string 263 @ivar iallocator: The iallocator used to determine target_node 264 @type target_node_uuid: string 265 @ivar target_node_uuid: If given, the target node UUID to reallocate the 266 instance to 267 @type failover: boolean 268 @ivar failover: Whether operation results in failover or migration 269 @type fallback: boolean 270 @ivar fallback: Whether fallback to failover is allowed if migration not 271 possible 272 @type ignore_consistency: boolean 273 @ivar ignore_consistency: Wheter we should ignore consistency between source 274 and target node 275 @type shutdown_timeout: int 276 @ivar shutdown_timeout: In case of failover timeout of the shutdown 277 @type ignore_ipolicy: bool 278 @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating 279 @type ignore_hvversions: bool 280 @ivar ignore_hvversions: If true, accept incompatible hypervisor versions 281 282 """ 283 284 # Constants 285 _MIGRATION_POLL_INTERVAL = 1 # seconds 286 _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds 287
288 - def __init__(self, lu, instance_uuid, instance_name, cleanup, failover, 289 fallback, ignore_consistency, allow_runtime_changes, 290 shutdown_timeout, ignore_ipolicy, ignore_hvversions):
291 """Initializes this class. 292 293 """ 294 Tasklet.__init__(self, lu) 295 296 # Parameters 297 self.instance_uuid = instance_uuid 298 self.instance_name = instance_name 299 self.cleanup = cleanup 300 self.live = False # will be overridden later 301 self.failover = failover 302 self.fallback = fallback 303 self.ignore_consistency = ignore_consistency 304 self.shutdown_timeout = shutdown_timeout 305 self.ignore_ipolicy = ignore_ipolicy 306 self.allow_runtime_changes = allow_runtime_changes 307 self.ignore_hvversions = ignore_hvversions
308
309 - def CheckPrereq(self):
310 """Check prerequisites. 311 312 This checks that the instance is in the cluster. 313 314 """ 315 (self.instance_uuid, self.instance_name) = \ 316 ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid, 317 self.instance_name) 318 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 319 assert self.instance is not None 320 cluster = self.cfg.GetClusterInfo() 321 322 if (not self.cleanup and 323 not self.instance.admin_state == constants.ADMINST_UP and 324 not self.failover and self.fallback): 325 self.lu.LogInfo("Instance is marked down or offline, fallback allowed," 326 " switching to failover") 327 self.failover = True 328 329 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 330 331 if not utils.AllDiskOfType(disks, constants.DTS_MIRRORED): 332 if self.failover: 333 text = "failovers" 334 else: 335 text = "migrations" 336 invalid_disks = set(d.dev_type for d in disks 337 if d.dev_type not in constants.DTS_MIRRORED) 338 raise errors.OpPrereqError("Instance's disk layout '%s' does not allow" 339 " %s" % (utils.CommaJoin(invalid_disks), text), 340 errors.ECODE_STATE) 341 342 # TODO allow heterogeneous disk types if all are mirrored in some way. 343 if utils.AllDiskOfType(disks, constants.DTS_EXT_MIRROR): 344 CheckIAllocatorOrNode(self.lu, "iallocator", "target_node") 345 346 if self.lu.op.iallocator: 347 self._RunAllocator() 348 else: 349 # We set set self.target_node_uuid as it is required by 350 # BuildHooksEnv 351 self.target_node_uuid = self.lu.op.target_node_uuid 352 353 # Check that the target node is correct in terms of instance policy 354 nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid) 355 group_info = self.cfg.GetNodeGroup(nodeinfo.group) 356 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 357 group_info) 358 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo, 359 self.cfg, ignore=self.ignore_ipolicy) 360 361 # self.target_node is already populated, either directly or by the 362 # iallocator run 363 target_node_uuid = self.target_node_uuid 364 if self.target_node_uuid == self.instance.primary_node: 365 raise errors.OpPrereqError( 366 "Cannot migrate instance %s to its primary (%s)" % 367 (self.instance.name, 368 self.cfg.GetNodeName(self.instance.primary_node)), 369 errors.ECODE_STATE) 370 371 if len(self.lu.tasklets) == 1: 372 # It is safe to release locks only when we're the only tasklet 373 # in the LU 374 ReleaseLocks(self.lu, locking.LEVEL_NODE, 375 keep=[self.instance.primary_node, self.target_node_uuid]) 376 377 elif utils.AllDiskOfType(disks, constants.DTS_INT_MIRROR): 378 templates = [d.dev_type for d in disks] 379 secondary_node_uuids = \ 380 self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 381 if not secondary_node_uuids: 382 raise errors.ConfigurationError("No secondary node but using" 383 " %s disk types" % 384 utils.CommaJoin(set(templates))) 385 self.target_node_uuid = target_node_uuid = secondary_node_uuids[0] 386 if self.lu.op.iallocator or \ 387 (self.lu.op.target_node_uuid and 388 self.lu.op.target_node_uuid != target_node_uuid): 389 if self.failover: 390 text = "failed over" 391 else: 392 text = "migrated" 393 raise errors.OpPrereqError("Instances with disk types %s cannot" 394 " be %s to arbitrary nodes" 395 " (neither an iallocator nor a target" 396 " node can be passed)" % 397 (utils.CommaJoin(set(templates)), text), 398 errors.ECODE_INVAL) 399 nodeinfo = self.cfg.GetNodeInfo(target_node_uuid) 400 group_info = self.cfg.GetNodeGroup(nodeinfo.group) 401 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 402 group_info) 403 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo, 404 self.cfg, ignore=self.ignore_ipolicy) 405 406 else: 407 raise errors.OpPrereqError("Instance mixes internal and external " 408 "mirroring. This is not currently supported.") 409 410 i_be = cluster.FillBE(self.instance) 411 412 # check memory requirements on the secondary node 413 if (not self.cleanup and 414 (not self.failover or 415 self.instance.admin_state == constants.ADMINST_UP)): 416 self.tgt_free_mem = CheckNodeFreeMemory( 417 self.lu, target_node_uuid, 418 "migrating instance %s" % self.instance.name, 419 i_be[constants.BE_MINMEM], self.instance.hypervisor, 420 self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor]) 421 else: 422 self.lu.LogInfo("Not checking memory on the secondary node as" 423 " instance will not be started") 424 425 # check if failover must be forced instead of migration 426 if (not self.cleanup and not self.failover and 427 i_be[constants.BE_ALWAYS_FAILOVER]): 428 self.lu.LogInfo("Instance configured to always failover; fallback" 429 " to failover") 430 self.failover = True 431 432 # check bridge existance 433 CheckInstanceBridgesExist(self.lu, self.instance, 434 node_uuid=target_node_uuid) 435 436 if not self.cleanup: 437 CheckNodeNotDrained(self.lu, target_node_uuid) 438 if not self.failover: 439 result = self.rpc.call_instance_migratable(self.instance.primary_node, 440 self.instance) 441 if result.fail_msg and self.fallback: 442 self.lu.LogInfo("Can't migrate, instance offline, fallback to" 443 " failover") 444 self.failover = True 445 else: 446 result.Raise("Can't migrate, please use failover", 447 prereq=True, ecode=errors.ECODE_STATE) 448 449 assert not (self.failover and self.cleanup) 450 451 if not self.failover: 452 if self.lu.op.live is not None and self.lu.op.mode is not None: 453 raise errors.OpPrereqError("Only one of the 'live' and 'mode'" 454 " parameters are accepted", 455 errors.ECODE_INVAL) 456 if self.lu.op.live is not None: 457 if self.lu.op.live: 458 self.lu.op.mode = constants.HT_MIGRATION_LIVE 459 else: 460 self.lu.op.mode = constants.HT_MIGRATION_NONLIVE 461 # reset the 'live' parameter to None so that repeated 462 # invocations of CheckPrereq do not raise an exception 463 self.lu.op.live = None 464 elif self.lu.op.mode is None: 465 # read the default value from the hypervisor 466 i_hv = cluster.FillHV(self.instance, skip_globals=False) 467 self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] 468 469 self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE 470 else: 471 # Failover is never live 472 self.live = False 473 474 if not (self.failover or self.cleanup): 475 remote_info = self.rpc.call_instance_info( 476 self.instance.primary_node, self.instance.name, 477 self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor]) 478 remote_info.Raise("Error checking instance on node %s" % 479 self.cfg.GetNodeName(self.instance.primary_node), 480 prereq=True) 481 instance_running = bool(remote_info.payload) 482 if instance_running: 483 self.current_mem = int(remote_info.payload["memory"])
484
485 - def _RunAllocator(self):
486 """Run the allocator based on input opcode. 487 488 """ 489 # FIXME: add a self.ignore_ipolicy option 490 req = iallocator.IAReqRelocate( 491 inst_uuid=self.instance_uuid, 492 relocate_from_node_uuids=[self.instance.primary_node]) 493 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 494 495 ial.Run(self.lu.op.iallocator) 496 497 if not ial.success: 498 raise errors.OpPrereqError("Can't compute nodes using" 499 " iallocator '%s': %s" % 500 (self.lu.op.iallocator, ial.info), 501 errors.ECODE_NORES) 502 self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid 503 self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s", 504 self.instance_name, self.lu.op.iallocator, 505 utils.CommaJoin(ial.result))
506
507 - def _WaitUntilSync(self):
508 """Poll with custom rpc for disk sync. 509 510 This uses our own step-based rpc call. 511 512 """ 513 self.feedback_fn("* wait until resync is done") 514 all_done = False 515 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 516 while not all_done: 517 all_done = True 518 result = self.rpc.call_drbd_wait_sync(self.all_node_uuids, 519 (disks, self.instance)) 520 min_percent = 100 521 for node_uuid, nres in result.items(): 522 nres.Raise("Cannot resync disks on node %s" % 523 self.cfg.GetNodeName(node_uuid)) 524 node_done, node_percent = nres.payload 525 all_done = all_done and node_done 526 if node_percent is not None: 527 min_percent = min(min_percent, node_percent) 528 if not all_done: 529 if min_percent < 100: 530 self.feedback_fn(" - progress: %.1f%%" % min_percent) 531 time.sleep(2)
532
533 - def _OpenInstanceDisks(self, node_uuid, exclusive):
534 """Open instance disks. 535 536 """ 537 if exclusive: 538 mode = "in exclusive mode" 539 else: 540 mode = "in shared mode" 541 542 node_name = self.cfg.GetNodeName(node_uuid) 543 544 self.feedback_fn("* opening instance disks on node %s %s" % 545 (node_name, mode)) 546 547 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 548 result = self.rpc.call_blockdev_open(node_uuid, self.instance.name, 549 (disks, self.instance), exclusive) 550 result.Raise("Cannot open instance disks on node %s" % node_name)
551
552 - def _CloseInstanceDisks(self, node_uuid):
553 """Close instance disks. 554 555 """ 556 node_name = self.cfg.GetNodeName(node_uuid) 557 558 self.feedback_fn("* closing instance disks on node %s" % node_name) 559 560 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 561 result = self.rpc.call_blockdev_close(node_uuid, self.instance.name, 562 (disks, self.instance)) 563 msg = result.fail_msg 564 if msg: 565 if result.offline or self.ignore_consistency: 566 self.lu.LogWarning("Could not close instance disks on node %s," 567 " proceeding anyway" % node_name) 568 else: 569 raise errors.OpExecError("Cannot close instance disks on node %s: %s" % 570 (node_name, msg))
571
572 - def _GoStandalone(self):
573 """Disconnect from the network. 574 575 """ 576 self.feedback_fn("* changing into standalone mode") 577 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 578 result = self.rpc.call_drbd_disconnect_net( 579 self.all_node_uuids, (disks, self.instance)) 580 for node_uuid, nres in result.items(): 581 nres.Raise("Cannot disconnect disks node %s" % 582 self.cfg.GetNodeName(node_uuid))
583
584 - def _GoReconnect(self, multimaster):
585 """Reconnect to the network. 586 587 """ 588 if multimaster: 589 msg = "dual-master" 590 else: 591 msg = "single-master" 592 self.feedback_fn("* changing disks into %s mode" % msg) 593 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 594 result = self.rpc.call_drbd_attach_net(self.all_node_uuids, 595 (disks, self.instance), 596 multimaster) 597 for node_uuid, nres in result.items(): 598 nres.Raise("Cannot change disks config on node %s" % 599 self.cfg.GetNodeName(node_uuid))
600
601 - def _ExecCleanup(self):
602 """Try to cleanup after a failed migration. 603 604 The cleanup is done by: 605 - check that the instance is running only on one node 606 (and update the config if needed) 607 - change disks on its secondary node to secondary 608 - wait until disks are fully synchronized 609 - disconnect from the network 610 - change disks into single-master mode 611 - wait again until disks are fully synchronized 612 613 """ 614 # check running on only one node 615 self.feedback_fn("* checking where the instance actually runs" 616 " (if this hangs, the hypervisor might be in" 617 " a bad state)") 618 cluster_hvparams = self.cfg.GetClusterInfo().hvparams 619 ins_l = self.rpc.call_instance_list(self.all_node_uuids, 620 [self.instance.hypervisor], 621 cluster_hvparams) 622 for node_uuid, result in ins_l.items(): 623 result.Raise("Can't contact node %s" % node_uuid) 624 625 runningon_source = self.instance.name in \ 626 ins_l[self.source_node_uuid].payload 627 runningon_target = self.instance.name in \ 628 ins_l[self.target_node_uuid].payload 629 630 if runningon_source and runningon_target: 631 raise errors.OpExecError("Instance seems to be running on two nodes," 632 " or the hypervisor is confused; you will have" 633 " to ensure manually that it runs only on one" 634 " and restart this operation") 635 636 if not (runningon_source or runningon_target): 637 raise errors.OpExecError("Instance does not seem to be running at all;" 638 " in this case it's safer to repair by" 639 " running 'gnt-instance stop' to ensure disk" 640 " shutdown, and then restarting it") 641 642 if runningon_target: 643 # the migration has actually succeeded, we need to update the config 644 self.feedback_fn("* instance running on secondary node (%s)," 645 " updating config" % 646 self.cfg.GetNodeName(self.target_node_uuid)) 647 self.cfg.SetInstancePrimaryNode(self.instance.uuid, 648 self.target_node_uuid) 649 demoted_node_uuid = self.source_node_uuid 650 else: 651 self.feedback_fn("* instance confirmed to be running on its" 652 " primary node (%s)" % 653 self.cfg.GetNodeName(self.source_node_uuid)) 654 demoted_node_uuid = self.target_node_uuid 655 656 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 657 658 self._CloseInstanceDisks(demoted_node_uuid) 659 660 if utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR): 661 try: 662 self._WaitUntilSync() 663 except errors.OpExecError: 664 # we ignore here errors, since if the device is standalone, it 665 # won't be able to sync 666 pass 667 self._GoStandalone() 668 self._GoReconnect(False) 669 self._WaitUntilSync() 670 elif utils.AnyDiskOfType(disks, constants.DTS_EXT_MIRROR): 671 self._OpenInstanceDisks(self.instance.primary_node, True) 672 673 self.feedback_fn("* done")
674
675 - def _RevertDiskStatus(self):
676 """Try to revert the disk status after a failed migration. 677 678 """ 679 680 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 681 682 self._CloseInstanceDisks(self.target_node_uuid) 683 684 if utils.AllDiskOfType(disks, constants.DTS_EXT_MIRROR): 685 self._OpenInstanceDisks(self.source_node_uuid, True) 686 return 687 688 try: 689 self._GoStandalone() 690 self._GoReconnect(False) 691 self._WaitUntilSync() 692 except errors.OpExecError, err: 693 self.lu.LogWarning("Migration failed and I can't reconnect the drives," 694 " please try to recover the instance manually;" 695 " error '%s'" % str(err))
696
697 - def _AbortMigration(self):
698 """Call the hypervisor code to abort a started migration. 699 700 """ 701 abort_result = self.rpc.call_instance_finalize_migration_dst( 702 self.target_node_uuid, self.instance, self.migration_info, 703 False) 704 abort_msg = abort_result.fail_msg 705 if abort_msg: 706 logging.error("Aborting migration failed on target node %s: %s", 707 self.cfg.GetNodeName(self.target_node_uuid), abort_msg) 708 # Don't raise an exception here, as we stil have to try to revert the 709 # disk status, even if this step failed. 710 711 abort_result = self.rpc.call_instance_finalize_migration_src( 712 self.source_node_uuid, self.instance, False, self.live) 713 abort_msg = abort_result.fail_msg 714 if abort_msg: 715 logging.error("Aborting migration failed on source node %s: %s", 716 self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
717
718 - def _ExecMigration(self):
719 """Migrate an instance. 720 721 The migrate is done by: 722 - change the disks into dual-master mode 723 - wait until disks are fully synchronized again 724 - migrate the instance 725 - change disks on the new secondary node (the old primary) to secondary 726 - wait until disks are fully synchronized 727 - change disks into single-master mode 728 729 """ 730 # Check for hypervisor version mismatch and warn the user. 731 hvspecs = [(self.instance.hypervisor, 732 self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])] 733 nodeinfo = self.rpc.call_node_info( 734 [self.source_node_uuid, self.target_node_uuid], None, hvspecs) 735 for ninfo in nodeinfo.values(): 736 ninfo.Raise("Unable to retrieve node information from node '%s'" % 737 ninfo.node) 738 (_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload 739 (_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload 740 741 if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and 742 (constants.HV_NODEINFO_KEY_VERSION in dst_info)): 743 src_version = src_info[constants.HV_NODEINFO_KEY_VERSION] 744 dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION] 745 if src_version != dst_version: 746 self.feedback_fn("* warning: hypervisor version mismatch between" 747 " source (%s) and target (%s) node" % 748 (src_version, dst_version)) 749 hv = hypervisor.GetHypervisorClass(self.instance.hypervisor) 750 if hv.VersionsSafeForMigration(src_version, dst_version): 751 self.feedback_fn(" migrating from hypervisor version %s to %s should" 752 " be safe" % (src_version, dst_version)) 753 else: 754 self.feedback_fn(" migrating from hypervisor version %s to %s is" 755 " likely unsupported" % (src_version, dst_version)) 756 if self.ignore_hvversions: 757 self.feedback_fn(" continuing anyway (told to ignore version" 758 " mismatch)") 759 else: 760 raise errors.OpExecError("Unsupported migration between hypervisor" 761 " versions (%s to %s)" % 762 (src_version, dst_version)) 763 764 self.feedback_fn("* checking disk consistency between source and target") 765 for (idx, dev) in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)): 766 if not CheckDiskConsistency(self.lu, self.instance, dev, 767 self.target_node_uuid, 768 False): 769 raise errors.OpExecError("Disk %s is degraded or not fully" 770 " synchronized on target node," 771 " aborting migration" % idx) 772 773 if self.current_mem > self.tgt_free_mem: 774 if not self.allow_runtime_changes: 775 raise errors.OpExecError("Memory ballooning not allowed and not enough" 776 " free memory to fit instance %s on target" 777 " node %s (have %dMB, need %dMB)" % 778 (self.instance.name, 779 self.cfg.GetNodeName(self.target_node_uuid), 780 self.tgt_free_mem, self.current_mem)) 781 self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem) 782 rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node, 783 self.instance, 784 self.tgt_free_mem) 785 rpcres.Raise("Cannot modify instance runtime memory") 786 787 # First get the migration information from the remote node 788 result = self.rpc.call_migration_info(self.source_node_uuid, self.instance) 789 msg = result.fail_msg 790 if msg: 791 log_err = ("Failed fetching source migration information from %s: %s" % 792 (self.cfg.GetNodeName(self.source_node_uuid), msg)) 793 logging.error(log_err) 794 raise errors.OpExecError(log_err) 795 796 self.migration_info = migration_info = result.payload 797 798 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 799 800 self._CloseInstanceDisks(self.target_node_uuid) 801 802 if utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR): 803 # Then switch the disks to master/master mode 804 self._GoStandalone() 805 self._GoReconnect(True) 806 self._WaitUntilSync() 807 808 self._OpenInstanceDisks(self.source_node_uuid, False) 809 self._OpenInstanceDisks(self.target_node_uuid, False) 810 811 self.feedback_fn("* preparing %s to accept the instance" % 812 self.cfg.GetNodeName(self.target_node_uuid)) 813 result = self.rpc.call_accept_instance(self.target_node_uuid, 814 self.instance, 815 migration_info, 816 self.nodes_ip[self.target_node_uuid]) 817 818 msg = result.fail_msg 819 if msg: 820 logging.error("Instance pre-migration failed, trying to revert" 821 " disk status: %s", msg) 822 self.feedback_fn("Pre-migration failed, aborting") 823 self._AbortMigration() 824 self._RevertDiskStatus() 825 raise errors.OpExecError("Could not pre-migrate instance %s: %s" % 826 (self.instance.name, msg)) 827 828 self.feedback_fn("* migrating instance to %s" % 829 self.cfg.GetNodeName(self.target_node_uuid)) 830 cluster = self.cfg.GetClusterInfo() 831 result = self.rpc.call_instance_migrate( 832 self.source_node_uuid, cluster.cluster_name, self.instance, 833 self.nodes_ip[self.target_node_uuid], self.live) 834 msg = result.fail_msg 835 if msg: 836 logging.error("Instance migration failed, trying to revert" 837 " disk status: %s", msg) 838 self.feedback_fn("Migration failed, aborting") 839 self._AbortMigration() 840 self._RevertDiskStatus() 841 raise errors.OpExecError("Could not migrate instance %s: %s" % 842 (self.instance.name, msg)) 843 844 self.feedback_fn("* starting memory transfer") 845 last_feedback = time.time() 846 while True: 847 result = self.rpc.call_instance_get_migration_status( 848 self.source_node_uuid, self.instance) 849 msg = result.fail_msg 850 ms = result.payload # MigrationStatus instance 851 if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES): 852 logging.error("Instance migration failed, trying to revert" 853 " disk status: %s", msg) 854 self.feedback_fn("Migration failed, aborting") 855 self._AbortMigration() 856 self._RevertDiskStatus() 857 if not msg: 858 msg = "hypervisor returned failure" 859 raise errors.OpExecError("Could not migrate instance %s: %s" % 860 (self.instance.name, msg)) 861 862 if result.payload.status != constants.HV_MIGRATION_ACTIVE: 863 self.feedback_fn("* memory transfer complete") 864 break 865 866 if (utils.TimeoutExpired(last_feedback, 867 self._MIGRATION_FEEDBACK_INTERVAL) and 868 ms.transferred_ram is not None): 869 mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram) 870 self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress) 871 last_feedback = time.time() 872 873 time.sleep(self._MIGRATION_POLL_INTERVAL) 874 875 result = self.rpc.call_instance_finalize_migration_src( 876 self.source_node_uuid, self.instance, True, self.live) 877 msg = result.fail_msg 878 if msg: 879 logging.error("Instance migration succeeded, but finalization failed" 880 " on the source node: %s", msg) 881 raise errors.OpExecError("Could not finalize instance migration: %s" % 882 msg) 883 884 self.cfg.SetInstancePrimaryNode(self.instance.uuid, self.target_node_uuid) 885 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 886 disks = self.cfg.GetInstanceDisks(self.instance_uuid) 887 888 result = self.rpc.call_instance_finalize_migration_dst( 889 self.target_node_uuid, self.instance, migration_info, True) 890 msg = result.fail_msg 891 if msg: 892 logging.error("Instance migration succeeded, but finalization failed" 893 " on the target node: %s", msg) 894 raise errors.OpExecError("Could not finalize instance migration: %s" % 895 msg) 896 897 self._CloseInstanceDisks(self.source_node_uuid) 898 899 if utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR): 900 self._WaitUntilSync() 901 self._GoStandalone() 902 self._GoReconnect(False) 903 self._WaitUntilSync() 904 elif utils.AnyDiskOfType(disks, constants.DTS_EXT_MIRROR): 905 self._OpenInstanceDisks(self.target_node_uuid, True) 906 907 # If the instance's disk template is `rbd' or `ext' and there was a 908 # successful migration, unmap the device from the source node. 909 unmap_types = (constants.DT_RBD, constants.DT_EXT) 910 911 if utils.AnyDiskOfType(disks, unmap_types): 912 unmap_disks = [d for d in disks if d.dev_type in unmap_types] 913 disks = ExpandCheckDisks(unmap_disks, unmap_disks) 914 self.feedback_fn("* unmapping instance's disks %s from %s" % 915 (utils.CommaJoin(d.name for d in unmap_disks), 916 self.cfg.GetNodeName(self.source_node_uuid))) 917 for disk in disks: 918 result = self.rpc.call_blockdev_shutdown(self.source_node_uuid, 919 (disk, self.instance)) 920 msg = result.fail_msg 921 if msg: 922 logging.error("Migration was successful, but couldn't unmap the" 923 " block device %s on source node %s: %s", 924 disk.iv_name, 925 self.cfg.GetNodeName(self.source_node_uuid), msg) 926 logging.error("You need to unmap the device %s manually on %s", 927 disk.iv_name, 928 self.cfg.GetNodeName(self.source_node_uuid)) 929 930 self.feedback_fn("* done")
931
932 - def _ExecFailover(self):
933 """Failover an instance. 934 935 The failover is done by shutting it down on its present node and 936 starting it on the secondary. 937 938 """ 939 if self.instance.forthcoming: 940 self.feedback_fn("Instance is forthcoming, just updating the" 941 " configuration") 942 self.cfg.SetInstancePrimaryNode(self.instance.uuid, 943 self.target_node_uuid) 944 return 945 946 primary_node = self.cfg.GetNodeInfo(self.instance.primary_node) 947 948 source_node_uuid = self.instance.primary_node 949 950 if self.instance.disks_active: 951 self.feedback_fn("* checking disk consistency between source and target") 952 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid) 953 for (idx, dev) in enumerate(inst_disks): 954 # for drbd, these are drbd over lvm 955 if not CheckDiskConsistency(self.lu, self.instance, dev, 956 self.target_node_uuid, False): 957 if primary_node.offline: 958 self.feedback_fn("Node %s is offline, ignoring degraded disk %s on" 959 " target node %s" % 960 (primary_node.name, idx, 961 self.cfg.GetNodeName(self.target_node_uuid))) 962 elif not self.ignore_consistency: 963 raise errors.OpExecError("Disk %s is degraded on target node," 964 " aborting failover" % idx) 965 else: 966 self.feedback_fn("* not checking disk consistency as instance is not" 967 " running") 968 969 self.feedback_fn("* shutting down instance on source node") 970 logging.info("Shutting down instance %s on node %s", 971 self.instance.name, self.cfg.GetNodeName(source_node_uuid)) 972 973 result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance, 974 self.shutdown_timeout, 975 self.lu.op.reason) 976 msg = result.fail_msg 977 if msg: 978 if self.ignore_consistency or primary_node.offline: 979 self.lu.LogWarning("Could not shutdown instance %s on node %s," 980 " proceeding anyway; please make sure node" 981 " %s is down; error details: %s", 982 self.instance.name, 983 self.cfg.GetNodeName(source_node_uuid), 984 self.cfg.GetNodeName(source_node_uuid), msg) 985 else: 986 raise errors.OpExecError("Could not shutdown instance %s on" 987 " node %s: %s" % 988 (self.instance.name, 989 self.cfg.GetNodeName(source_node_uuid), msg)) 990 991 disk_template = self.cfg.GetInstanceDiskTemplate(self.instance.uuid) 992 if disk_template in constants.DTS_EXT_MIRROR: 993 self._CloseInstanceDisks(source_node_uuid) 994 995 self.feedback_fn("* deactivating the instance's disks on source node") 996 if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True): 997 raise errors.OpExecError("Can't shut down the instance's disks") 998 999 self.cfg.SetInstancePrimaryNode(self.instance.uuid, self.target_node_uuid) 1000 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid) 1001 1002 # Only start the instance if it's marked as up 1003 if self.instance.admin_state == constants.ADMINST_UP: 1004 self.feedback_fn("* activating the instance's disks on target node %s" % 1005 self.cfg.GetNodeName(self.target_node_uuid)) 1006 logging.info("Starting instance %s on node %s", self.instance.name, 1007 self.cfg.GetNodeName(self.target_node_uuid)) 1008 1009 disks_ok, _, _ = AssembleInstanceDisks(self.lu, self.instance, 1010 ignore_secondaries=True) 1011 if not disks_ok: 1012 ShutdownInstanceDisks(self.lu, self.instance) 1013 raise errors.OpExecError("Can't activate the instance's disks") 1014 1015 self.feedback_fn("* starting the instance on the target node %s" % 1016 self.cfg.GetNodeName(self.target_node_uuid)) 1017 result = self.rpc.call_instance_start(self.target_node_uuid, 1018 (self.instance, None, None), False, 1019 self.lu.op.reason) 1020 msg = result.fail_msg 1021 if msg: 1022 ShutdownInstanceDisks(self.lu, self.instance) 1023 raise errors.OpExecError("Could not start instance %s on node %s: %s" % 1024 (self.instance.name, 1025 self.cfg.GetNodeName(self.target_node_uuid), 1026 msg))
1027
1028 - def Exec(self, feedback_fn):
1029 """Perform the migration. 1030 1031 """ 1032 self.feedback_fn = feedback_fn 1033 self.source_node_uuid = self.instance.primary_node 1034 1035 # FIXME: if we implement migrate-to-any in DRBD, this needs fixing 1036 disks = self.cfg.GetInstanceDisks(self.instance.uuid) 1037 1038 # TODO allow mixed disks 1039 if utils.AllDiskOfType(disks, constants.DTS_INT_MIRROR): 1040 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid) 1041 self.target_node_uuid = secondary_nodes[0] 1042 # Otherwise self.target_node has been populated either 1043 # directly, or through an iallocator. 1044 1045 self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid] 1046 self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node) 1047 in self.cfg.GetMultiNodeInfo(self.all_node_uuids)) 1048 1049 if self.failover: 1050 feedback_fn("Failover instance %s" % self.instance.name) 1051 self._ExecFailover() 1052 else: 1053 feedback_fn("Migrating instance %s" % self.instance.name) 1054 1055 if self.cleanup: 1056 return self._ExecCleanup() 1057 else: 1058 return self._ExecMigration()
1059