1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with nodes."""
32
33 import logging
34 import operator
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import netutils
40 from ganeti import objects
41 from ganeti import opcodes
42 import ganeti.rpc.node as rpc
43 from ganeti import utils
44 from ganeti.masterd import iallocator
45
46 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
47 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
48 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
49 IsExclusiveStorageEnabledNode, CheckNodePVs, \
50 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
51 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
52 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
53 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
54 FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \
55 AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \
56 EnsureKvmdOnNodes, WarnAboutFailedSshUpdates
57
58
68
69
71 """Ensure that a node has the given secondary ip.
72
73 @type lu: L{LogicalUnit}
74 @param lu: the LU on behalf of which we make the check
75 @type node: L{objects.Node}
76 @param node: the node to check
77 @type secondary_ip: string
78 @param secondary_ip: the ip to check
79 @type prereq: boolean
80 @param prereq: whether to throw a prerequisite or an execute error
81 @raise errors.OpPrereqError: if the node doesn't have the ip,
82 and prereq=True
83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
84
85 """
86
87
88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
89 result.Raise("Failure checking secondary ip on node %s" % node.name,
90 prereq=prereq, ecode=errors.ECODE_ENVIRON)
91 if not result.payload:
92 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
93 " please fix and re-run this command" % secondary_ip)
94 if prereq:
95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
96 else:
97 raise errors.OpExecError(msg)
98
99
101 """Logical unit for adding node to the cluster.
102
103 """
104 HPATH = "node-add"
105 HTYPE = constants.HTYPE_NODE
106 _NFLAGS = ["master_capable", "vm_capable"]
107
122
124 """Build hooks env.
125
126 This will run on all nodes before, and on all nodes + the new node after.
127
128 """
129 return {
130 "OP_TARGET": self.op.node_name,
131 "NODE_NAME": self.op.node_name,
132 "NODE_PIP": self.op.primary_ip,
133 "NODE_SIP": self.op.secondary_ip,
134 "MASTER_CAPABLE": str(self.op.master_capable),
135 "VM_CAPABLE": str(self.op.vm_capable),
136 }
137
139 """Build hooks nodes.
140
141 """
142 hook_nodes = self.cfg.GetNodeList()
143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
144 if new_node_info is not None:
145
146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
147
148
149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
155 """Check prerequisites.
156
157 This checks:
158 - the new node is not already in the config
159 - it is resolvable
160 - its parameters (single/dual homed) matches the cluster
161
162 Any errors are signaled by raising errors.OpPrereqError.
163
164 """
165 node_name = self.hostname.name
166 self.op.primary_ip = self.hostname.ip
167 if self.op.secondary_ip is None:
168 if self.primary_ip_family == netutils.IP6Address.family:
169 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
170 " IPv4 address must be given as secondary",
171 errors.ECODE_INVAL)
172 self.op.secondary_ip = self.op.primary_ip
173
174 secondary_ip = self.op.secondary_ip
175 if not netutils.IP4Address.IsValid(secondary_ip):
176 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
177 " address" % secondary_ip, errors.ECODE_INVAL)
178
179 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
180 if not self.op.readd and existing_node_info is not None:
181 raise errors.OpPrereqError("Node %s is already in the configuration" %
182 node_name, errors.ECODE_EXISTS)
183 elif self.op.readd and existing_node_info is None:
184 raise errors.OpPrereqError("Node %s is not in the configuration" %
185 node_name, errors.ECODE_NOENT)
186
187 self.changed_primary_ip = False
188
189 for existing_node in self.cfg.GetAllNodesInfo().values():
190 if self.op.readd and node_name == existing_node.name:
191 if existing_node.secondary_ip != secondary_ip:
192 raise errors.OpPrereqError("Readded node doesn't have the same IP"
193 " address configuration as before",
194 errors.ECODE_INVAL)
195 if existing_node.primary_ip != self.op.primary_ip:
196 self.changed_primary_ip = True
197
198 continue
199
200 if (existing_node.primary_ip == self.op.primary_ip or
201 existing_node.secondary_ip == self.op.primary_ip or
202 existing_node.primary_ip == secondary_ip or
203 existing_node.secondary_ip == secondary_ip):
204 raise errors.OpPrereqError("New node ip address(es) conflict with"
205 " existing node %s" % existing_node.name,
206 errors.ECODE_NOTUNIQUE)
207
208
209
210 if self.op.readd:
211 assert existing_node_info is not None, \
212 "Can't retrieve locked node %s" % node_name
213 for attr in self._NFLAGS:
214 if getattr(self.op, attr) is None:
215 setattr(self.op, attr, getattr(existing_node_info, attr))
216 else:
217 for attr in self._NFLAGS:
218 if getattr(self.op, attr) is None:
219 setattr(self.op, attr, True)
220
221 if self.op.readd and not self.op.vm_capable:
222 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
223 if pri or sec:
224 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
225 " flag set to false, but it already holds"
226 " instances" % node_name,
227 errors.ECODE_STATE)
228
229
230
231 myself = self.cfg.GetMasterNodeInfo()
232 master_singlehomed = myself.secondary_ip == myself.primary_ip
233 newbie_singlehomed = secondary_ip == self.op.primary_ip
234 if master_singlehomed != newbie_singlehomed:
235 if master_singlehomed:
236 raise errors.OpPrereqError("The master has no secondary ip but the"
237 " new node has one",
238 errors.ECODE_INVAL)
239 else:
240 raise errors.OpPrereqError("The master has a secondary ip but the"
241 " new node doesn't have one",
242 errors.ECODE_INVAL)
243
244
245 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
246 raise errors.OpPrereqError("Node not reachable by ping",
247 errors.ECODE_ENVIRON)
248
249 if not newbie_singlehomed:
250
251 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
252 source=myself.secondary_ip):
253 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
254 " based ping to node daemon port",
255 errors.ECODE_ENVIRON)
256
257 if self.op.readd:
258 exceptions = [existing_node_info.uuid]
259 else:
260 exceptions = []
261
262 if self.op.master_capable:
263 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
264 else:
265 self.master_candidate = False
266
267 self.node_group = None
268 if self.op.readd:
269 self.new_node = existing_node_info
270 self.node_group = existing_node_info.group
271 else:
272 self.node_group = self.cfg.LookupNodeGroup(self.op.group)
273 self.new_node = objects.Node(name=node_name,
274 primary_ip=self.op.primary_ip,
275 secondary_ip=secondary_ip,
276 master_candidate=self.master_candidate,
277 offline=False, drained=False,
278 group=self.node_group, ndparams={})
279
280 if self.op.ndparams:
281 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
282 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
283 "node", "cluster or group")
284
285 if self.op.hv_state:
286 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
287
288 if self.op.disk_state:
289 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
290
291
292
293 rpcrunner = rpc.DnsOnlyRunner()
294 result = rpcrunner.call_version([node_name])[node_name]
295 result.Raise("Can't get version information from node %s" % node_name,
296 prereq=True, ecode=errors.ECODE_ENVIRON)
297 if constants.PROTOCOL_VERSION == result.payload:
298 logging.info("Communication to node %s fine, sw version %s match",
299 node_name, result.payload)
300 else:
301 raise errors.OpPrereqError("Version mismatch master version %s,"
302 " node version %s" %
303 (constants.PROTOCOL_VERSION, result.payload),
304 errors.ECODE_ENVIRON)
305
306 vg_name = self.cfg.GetVGName()
307 if vg_name is not None:
308 vparams = {constants.NV_PVLIST: [vg_name]}
309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
310 cname = self.cfg.GetClusterName()
311 result = rpcrunner.call_node_verify_light(
312 [node_name], vparams, cname,
313 self.cfg.GetClusterInfo().hvparams,
314 {node_name: self.node_group},
315 self.cfg.GetAllNodeGroupsInfoDict()
316 )[node_name]
317 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
318 if errmsgs:
319 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
320 "; ".join(errmsgs), errors.ECODE_ENVIRON)
321
339
340 - def _SshUpdate(self, new_node_uuid, new_node_name, is_master_candidate,
341 is_potential_master_candidate, rpcrunner, readd, feedback_fn):
342 """Update the SSH setup of all nodes after adding a new node.
343
344 @type readd: boolean
345 @param readd: whether or not this node is readded
346
347 """
348 potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
349 master_node = self.cfg.GetMasterNode()
350
351 if readd:
352
353 master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
354 remove_result = rpcrunner.call_node_ssh_key_remove(
355 [master_node],
356 new_node_uuid, new_node_name,
357 master_candidate_uuids,
358 potential_master_candidates,
359 True,
360 True,
361 False,
362 True,
363 True)
364 remove_result[master_node].Raise(
365 "Could not remove SSH keys of node %s before readding,"
366 " (UUID: %s)." % (new_node_name, new_node_uuid))
367 WarnAboutFailedSshUpdates(remove_result, master_node, feedback_fn)
368
369 result = rpcrunner.call_node_ssh_key_add(
370 [master_node], new_node_uuid, new_node_name,
371 potential_master_candidates,
372 is_master_candidate, is_potential_master_candidate,
373 is_potential_master_candidate)
374
375 result[master_node].Raise("Could not update the node's SSH setup.")
376 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
377
378 - def Exec(self, feedback_fn):
379 """Adds the new node to the cluster.
380
381 """
382 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
383 "Not owning BGL"
384
385
386 self.new_node.powered = True
387
388
389
390
391
392 if self.op.readd:
393 self.new_node.offline = False
394 self.new_node.drained = False
395 self.LogInfo("Readding a node, the offline/drained flags were reset")
396
397 self.new_node.master_candidate = self.master_candidate
398 if self.changed_primary_ip:
399 self.new_node.primary_ip = self.op.primary_ip
400
401
402 for attr in self._NFLAGS:
403 setattr(self.new_node, attr, getattr(self.op, attr))
404
405
406 if self.new_node.master_candidate:
407 self.LogInfo("Node will be a master candidate")
408
409 if self.op.ndparams:
410 self.new_node.ndparams = self.op.ndparams
411 else:
412 self.new_node.ndparams = {}
413
414 if self.op.hv_state:
415 self.new_node.hv_state_static = self.new_hv_state
416
417 if self.op.disk_state:
418 self.new_node.disk_state_static = self.new_disk_state
419
420
421 if self.cfg.GetClusterInfo().modify_etc_hosts:
422 master_node = self.cfg.GetMasterNode()
423 result = self.rpc.call_etc_hosts_modify(
424 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
425 self.hostname.ip)
426 result.Raise("Can't update hosts file with new host data")
427
428 if self.new_node.secondary_ip != self.new_node.primary_ip:
429 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
430 False)
431
432 node_verifier_uuids = [self.cfg.GetMasterNode()]
433 node_verify_param = {
434 constants.NV_NODELIST: ([self.new_node.name], {}, []),
435
436 }
437
438 result = self.rpc.call_node_verify(
439 node_verifier_uuids, node_verify_param,
440 self.cfg.GetClusterName(),
441 self.cfg.GetClusterInfo().hvparams,
442 {self.new_node.name: self.cfg.LookupNodeGroup(self.node_group)},
443 self.cfg.GetAllNodeGroupsInfoDict()
444 )
445 for verifier in node_verifier_uuids:
446 result[verifier].Raise("Cannot communicate with node %s" % verifier)
447 nl_payload = result[verifier].payload[constants.NV_NODELIST]
448 if nl_payload:
449 for failed in nl_payload:
450 feedback_fn("ssh/hostname verification failed"
451 " (checking from %s): %s" %
452 (verifier, nl_payload[failed]))
453 raise errors.OpExecError("ssh/hostname verification failed")
454
455 self._InitOpenVSwitch()
456
457 if self.op.readd:
458 self.context.ReaddNode(self.new_node)
459 RedistributeAncillaryFiles(self)
460
461 self.cfg.Update(self.new_node, feedback_fn)
462
463 if not self.new_node.master_candidate:
464 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
465 result.Warn("Node failed to demote itself from master candidate status",
466 self.LogWarning)
467 else:
468 self.context.AddNode(self.cfg, self.new_node, self.proc.GetECId())
469 RedistributeAncillaryFiles(self)
470
471
472 digest = GetClientCertDigest(self, self.new_node.uuid)
473 if self.new_node.master_candidate:
474 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest)
475 else:
476 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None)
477
478 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid])
479
480
481 if self.op.node_setup:
482
483 self._SshUpdate(self.new_node.uuid, self.new_node.name,
484 self.new_node.master_candidate, True,
485 self.rpc, self.op.readd, feedback_fn)
486
487
489 """Modifies the parameters of a node.
490
491 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
492 to the node role (as _ROLE_*)
493 @cvar _R2F: a dictionary from node role to tuples of flags
494 @cvar _FLAGS: a list of attribute names corresponding to the flags
495
496 """
497 HPATH = "node-modify"
498 HTYPE = constants.HTYPE_NODE
499 REQ_BGL = False
500 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
501 _F2R = {
502 (True, False, False): _ROLE_CANDIDATE,
503 (False, True, False): _ROLE_DRAINED,
504 (False, False, True): _ROLE_OFFLINE,
505 (False, False, False): _ROLE_REGULAR,
506 }
507 _R2F = dict((v, k) for k, v in _F2R.items())
508 _FLAGS = ["master_candidate", "drained", "offline"]
509
511 (self.op.node_uuid, self.op.node_name) = \
512 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
513 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
514 self.op.master_capable, self.op.vm_capable,
515 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
516 self.op.disk_state]
517 if all_mods.count(None) == len(all_mods):
518 raise errors.OpPrereqError("Please pass at least one modification",
519 errors.ECODE_INVAL)
520 if all_mods.count(True) > 1:
521 raise errors.OpPrereqError("Can't set the node into more than one"
522 " state at the same time",
523 errors.ECODE_INVAL)
524
525
526 self.might_demote = (self.op.master_candidate is False or
527 self.op.offline is True or
528 self.op.drained is True or
529 self.op.master_capable is False)
530
531 if self.op.secondary_ip:
532 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
533 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
534 " address" % self.op.secondary_ip,
535 errors.ECODE_INVAL)
536
537 self.lock_all = self.op.auto_promote and self.might_demote
538 self.lock_instances = self.op.secondary_ip is not None
539
548
574
576 """Build hooks env.
577
578 This runs on the master node.
579
580 """
581 return {
582 "OP_TARGET": self.op.node_name,
583 "MASTER_CANDIDATE": str(self.op.master_candidate),
584 "OFFLINE": str(self.op.offline),
585 "DRAINED": str(self.op.drained),
586 "MASTER_CAPABLE": str(self.op.master_capable),
587 "VM_CAPABLE": str(self.op.vm_capable),
588 }
589
591 """Build hooks nodes.
592
593 """
594 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
595 return (nl, nl)
596
598 """Check prerequisites.
599
600 This only checks the instance list against the existing names.
601
602 """
603 node = self.cfg.GetNodeInfo(self.op.node_uuid)
604 if self.lock_instances:
605 affected_instances = \
606 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
607
608
609 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
610 wanted_instance_names = frozenset([inst.name for inst in
611 affected_instances.values()])
612 if wanted_instance_names - owned_instance_names:
613 raise errors.OpPrereqError("Instances affected by changing node %s's"
614 " secondary IP address have changed since"
615 " locks were acquired, wanted '%s', have"
616 " '%s'; retry the operation" %
617 (node.name,
618 utils.CommaJoin(wanted_instance_names),
619 utils.CommaJoin(owned_instance_names)),
620 errors.ECODE_STATE)
621 else:
622 affected_instances = None
623
624 if (self.op.master_candidate is not None or
625 self.op.drained is not None or
626 self.op.offline is not None):
627
628 if node.uuid == self.cfg.GetMasterNode():
629 raise errors.OpPrereqError("The master role can be changed"
630 " only via master-failover",
631 errors.ECODE_INVAL)
632
633 if self.op.master_candidate and not node.master_capable:
634 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
635 " it a master candidate" % node.name,
636 errors.ECODE_STATE)
637
638 if self.op.vm_capable is False:
639 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
640 if ipri or isec:
641 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
642 " the vm_capable flag" % node.name,
643 errors.ECODE_STATE)
644
645 if node.master_candidate and self.might_demote and not self.lock_all:
646 assert not self.op.auto_promote, "auto_promote set but lock_all not"
647
648
649 (mc_remaining, mc_should, _) = \
650 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
651 if mc_remaining < mc_should:
652 raise errors.OpPrereqError("Not enough master candidates, please"
653 " pass auto promote option to allow"
654 " promotion (--auto-promote or RAPI"
655 " auto_promote=True)", errors.ECODE_STATE)
656
657 self.old_flags = old_flags = (node.master_candidate,
658 node.drained, node.offline)
659 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
660 self.old_role = old_role = self._F2R[old_flags]
661
662
663 for attr in self._FLAGS:
664 if getattr(self.op, attr) is False and getattr(node, attr) is False:
665 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
666 setattr(self.op, attr, None)
667
668
669
670
671
672 if SupportsOob(self.cfg, node):
673 if self.op.offline is False and not (node.powered or
674 self.op.powered is True):
675 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
676 " offline status can be reset") %
677 self.op.node_name, errors.ECODE_STATE)
678 elif self.op.powered is not None:
679 raise errors.OpPrereqError(("Unable to change powered state for node %s"
680 " as it does not support out-of-band"
681 " handling") % self.op.node_name,
682 errors.ECODE_STATE)
683
684
685 if (self.op.drained is False or self.op.offline is False or
686 (self.op.master_capable and not node.master_capable)):
687 if _DecideSelfPromotion(self):
688 self.op.master_candidate = True
689 self.LogInfo("Auto-promoting node to master candidate")
690
691
692 if self.op.master_capable is False and node.master_candidate:
693 if self.op.node_uuid == self.cfg.GetMasterNode():
694 raise errors.OpPrereqError("Master must remain master capable",
695 errors.ECODE_STATE)
696 self.LogInfo("Demoting from master candidate")
697 self.op.master_candidate = False
698
699
700 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
701 if self.op.master_candidate:
702 new_role = self._ROLE_CANDIDATE
703 elif self.op.drained:
704 new_role = self._ROLE_DRAINED
705 elif self.op.offline:
706 new_role = self._ROLE_OFFLINE
707 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
708
709
710 new_role = self._ROLE_REGULAR
711 else:
712 new_role = old_role
713
714 self.new_role = new_role
715
716 if old_role == self._ROLE_OFFLINE and new_role != old_role:
717
718 result = self.rpc.call_version([node.uuid])[node.uuid]
719 if result.fail_msg:
720 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
721 " to report its version: %s" %
722 (node.name, result.fail_msg),
723 errors.ECODE_STATE)
724 else:
725 self.LogWarning("Transitioning node from offline to online state"
726 " without using re-add. Please make sure the node"
727 " is healthy!")
728
729
730
731
732 if self.op.secondary_ip:
733
734 master = self.cfg.GetMasterNodeInfo()
735 master_singlehomed = master.secondary_ip == master.primary_ip
736 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
737 if self.op.force and node.uuid == master.uuid:
738 self.LogWarning("Transitioning from single-homed to multi-homed"
739 " cluster; all nodes will require a secondary IP"
740 " address")
741 else:
742 raise errors.OpPrereqError("Changing the secondary ip on a"
743 " single-homed cluster requires the"
744 " --force option to be passed, and the"
745 " target node to be the master",
746 errors.ECODE_INVAL)
747 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
748 if self.op.force and node.uuid == master.uuid:
749 self.LogWarning("Transitioning from multi-homed to single-homed"
750 " cluster; secondary IP addresses will have to be"
751 " removed")
752 else:
753 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
754 " same as the primary IP on a multi-homed"
755 " cluster, unless the --force option is"
756 " passed, and the target node is the"
757 " master", errors.ECODE_INVAL)
758
759 assert not (set([inst.name for inst in affected_instances.values()]) -
760 self.owned_locks(locking.LEVEL_INSTANCE))
761
762 if node.offline:
763 if affected_instances:
764 msg = ("Cannot change secondary IP address: offline node has"
765 " instances (%s) configured to use it" %
766 utils.CommaJoin(
767 [inst.name for inst in affected_instances.values()]))
768 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
769 else:
770
771
772 for instance in affected_instances.values():
773 CheckInstanceState(self, instance, INSTANCE_DOWN,
774 msg="cannot change secondary ip")
775
776 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
777 if master.uuid != node.uuid:
778
779 if not netutils.TcpPing(self.op.secondary_ip,
780 constants.DEFAULT_NODED_PORT,
781 source=master.secondary_ip):
782 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
783 " based ping to node daemon port",
784 errors.ECODE_ENVIRON)
785
786 if self.op.ndparams:
787 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
788 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
789 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
790 "node", "cluster or group")
791 self.new_ndparams = new_ndparams
792
793 if self.op.hv_state:
794 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
795 node.hv_state_static)
796
797 if self.op.disk_state:
798 self.new_disk_state = \
799 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
800
801 - def Exec(self, feedback_fn):
802 """Modifies a node.
803
804 """
805 node = self.cfg.GetNodeInfo(self.op.node_uuid)
806 result = []
807
808 if self.op.ndparams:
809 node.ndparams = self.new_ndparams
810
811 if self.op.powered is not None:
812 node.powered = self.op.powered
813
814 if self.op.hv_state:
815 node.hv_state_static = self.new_hv_state
816
817 if self.op.disk_state:
818 node.disk_state_static = self.new_disk_state
819
820 for attr in ["master_capable", "vm_capable"]:
821 val = getattr(self.op, attr)
822 if val is not None:
823 setattr(node, attr, val)
824 result.append((attr, str(val)))
825
826 if self.op.secondary_ip:
827 node.secondary_ip = self.op.secondary_ip
828 result.append(("secondary_ip", self.op.secondary_ip))
829
830
831 self.cfg.Update(node, feedback_fn)
832
833 if self.new_role != self.old_role:
834 new_flags = self._R2F[self.new_role]
835 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
836 if of != nf:
837 result.append((desc, str(nf)))
838 (node.master_candidate, node.drained, node.offline) = new_flags
839 self.cfg.Update(node, feedback_fn)
840
841
842
843
844 if self.old_role == self._ROLE_CANDIDATE and \
845 self.new_role != self._ROLE_OFFLINE:
846 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
847 if msg:
848 self.LogWarning("Node failed to demote itself: %s", msg)
849
850
851 if self.lock_all:
852 AdjustCandidatePool(self, [node.uuid])
853
854
855 if self.new_role == self._ROLE_CANDIDATE:
856 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid)
857
858 if self.old_role == self._ROLE_CANDIDATE:
859 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid)
860
861 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid])
862
863
864
865 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
866 self.context.ReaddNode(node)
867
868 if self.cfg.GetClusterInfo().modify_ssh_setup:
869 potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
870 master_node = self.cfg.GetMasterNode()
871 if self.old_role == self._ROLE_CANDIDATE:
872 master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
873 ssh_result = self.rpc.call_node_ssh_key_remove(
874 [master_node],
875 node.uuid, node.name,
876 master_candidate_uuids, potential_master_candidates,
877 True,
878 False,
879 False,
880 False,
881 False)
882 ssh_result[master_node].Raise(
883 "Could not adjust the SSH setup after demoting node '%s'"
884 " (UUID: %s)." % (node.name, node.uuid))
885 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn)
886
887 if self.new_role == self._ROLE_CANDIDATE:
888 ssh_result = self.rpc.call_node_ssh_key_add(
889 [master_node], node.uuid, node.name,
890 potential_master_candidates,
891 True,
892 True,
893 False)
894 ssh_result[master_node].Raise(
895 "Could not update the SSH setup of node '%s' after promotion"
896 " (UUID: %s)." % (node.name, node.uuid))
897 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn)
898
899 return result
900
901
903 """Powercycles a node.
904
905 """
906 REQ_BGL = False
907
916
918 """Locking for PowercycleNode.
919
920 This is a last-resort option and shouldn't block on other
921 jobs. Therefore, we grab no locks.
922
923 """
924 self.needed_locks = {}
925
926 - def Exec(self, feedback_fn):
927 """Reboots a node.
928
929 """
930 default_hypervisor = self.cfg.GetHypervisorType()
931 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
932 result = self.rpc.call_node_powercycle(self.op.node_uuid,
933 default_hypervisor,
934 hvparams)
935 result.Raise("Failed to schedule the reboot")
936 return result.payload
937
938
941
942
944 """Returns primary instances on a node.
945
946 """
947 return _GetNodeInstancesInner(cfg,
948 lambda inst: node_uuid == inst.primary_node)
949
950
958
959
961 """Returns a list of all primary and secondary instances on a node.
962
963 """
964
965 return _GetNodeInstancesInner(cfg,
966 lambda inst: node_uuid in
967 cfg.GetInstanceNodes(inst.uuid))
968
969
971 """Evacuates instances off a list of nodes.
972
973 """
974 REQ_BGL = False
975
978
980 (self.op.node_uuid, self.op.node_name) = \
981 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
982
983 if self.op.remote_node is not None:
984 (self.op.remote_node_uuid, self.op.remote_node) = \
985 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
986 self.op.remote_node)
987 assert self.op.remote_node
988
989 if self.op.node_uuid == self.op.remote_node_uuid:
990 raise errors.OpPrereqError("Can not use evacuated node as a new"
991 " secondary node", errors.ECODE_INVAL)
992
993 if self.op.mode != constants.NODE_EVAC_SEC:
994 raise errors.OpPrereqError("Without the use of an iallocator only"
995 " secondary instances can be evacuated",
996 errors.ECODE_INVAL)
997
998
999 self.share_locks = ShareAll()
1000 self.needed_locks = {
1001 locking.LEVEL_INSTANCE: [],
1002 locking.LEVEL_NODEGROUP: [],
1003 locking.LEVEL_NODE: [],
1004 }
1005
1006
1007
1008 self.lock_nodes = self._DetermineNodes()
1009
1011 """Gets the list of node UUIDs to operate on.
1012
1013 """
1014 if self.op.remote_node is None:
1015
1016 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
1017 else:
1018 group_nodes = frozenset([self.op.remote_node_uuid])
1019
1020
1021 return set([self.op.node_uuid]) | group_nodes
1022
1051
1067
1069
1070 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
1071 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
1072 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1073
1074 need_nodes = self._DetermineNodes()
1075
1076 if not owned_nodes.issuperset(need_nodes):
1077 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
1078 " locks were acquired, current nodes are"
1079 " are '%s', used to be '%s'; retry the"
1080 " operation" %
1081 (self.op.node_name,
1082 utils.CommaJoin(need_nodes),
1083 utils.CommaJoin(owned_nodes)),
1084 errors.ECODE_STATE)
1085
1086 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
1087 if owned_groups != wanted_groups:
1088 raise errors.OpExecError("Node groups changed since locks were acquired,"
1089 " current groups are '%s', used to be '%s';"
1090 " retry the operation" %
1091 (utils.CommaJoin(wanted_groups),
1092 utils.CommaJoin(owned_groups)))
1093
1094
1095 self.instances = self._DetermineInstances()
1096 self.instance_names = [i.name for i in self.instances]
1097
1098 if set(self.instance_names) != owned_instance_names:
1099 raise errors.OpExecError("Instances on node '%s' changed since locks"
1100 " were acquired, current instances are '%s',"
1101 " used to be '%s'; retry the operation" %
1102 (self.op.node_name,
1103 utils.CommaJoin(self.instance_names),
1104 utils.CommaJoin(owned_instance_names)))
1105
1106 if self.instance_names:
1107 self.LogInfo("Evacuating instances from node '%s': %s",
1108 self.op.node_name,
1109 utils.CommaJoin(utils.NiceSort(self.instance_names)))
1110 else:
1111 self.LogInfo("No instances to evacuate from node '%s'",
1112 self.op.node_name)
1113
1114 if self.op.remote_node is not None:
1115 for i in self.instances:
1116 if i.primary_node == self.op.remote_node_uuid:
1117 raise errors.OpPrereqError("Node %s is the primary node of"
1118 " instance %s, cannot use it as"
1119 " secondary" %
1120 (self.op.remote_node, i.name),
1121 errors.ECODE_INVAL)
1122
1123 - def Exec(self, feedback_fn):
1124 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1125
1126 if not self.instance_names:
1127
1128 jobs = []
1129
1130 elif self.op.iallocator is not None:
1131
1132 req = iallocator.IAReqNodeEvac(
1133 evac_mode=self.op.mode, instances=list(self.instance_names),
1134 ignore_soft_errors=self.op.ignore_soft_errors)
1135 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1136
1137 ial.Run(self.op.iallocator)
1138
1139 if not ial.success:
1140 raise errors.OpPrereqError("Can't compute node evacuation using"
1141 " iallocator '%s': %s" %
1142 (self.op.iallocator, ial.info),
1143 errors.ECODE_NORES)
1144
1145 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1146
1147 elif self.op.remote_node is not None:
1148 assert self.op.mode == constants.NODE_EVAC_SEC
1149 jobs = [
1150 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1151 remote_node=self.op.remote_node,
1152 disks=[],
1153 mode=constants.REPLACE_DISK_CHG,
1154 early_release=self.op.early_release)]
1155 for instance_name in self.instance_names]
1156
1157 else:
1158 raise errors.ProgrammerError("No iallocator or remote node")
1159
1160 return ResultWithJobs(jobs)
1161
1162
1164 """Migrate all instances from a node.
1165
1166 """
1167 HPATH = "node-migrate"
1168 HTYPE = constants.HTYPE_NODE
1169 REQ_BGL = False
1170
1173
1175 (self.op.node_uuid, self.op.node_name) = \
1176 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1177
1178 self.share_locks = ShareAll()
1179 self.needed_locks = {
1180 locking.LEVEL_NODE: [self.op.node_uuid],
1181 }
1182
1184 """Build hooks env.
1185
1186 This runs on the master, the primary and all the secondaries.
1187
1188 """
1189 return {
1190 "NODE_NAME": self.op.node_name,
1191 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1192 }
1193
1195 """Build hooks nodes.
1196
1197 """
1198 nl = [self.cfg.GetMasterNode()]
1199 return (nl, nl)
1200
1203
1204 - def Exec(self, feedback_fn):
1205
1206 jobs = [
1207 [opcodes.OpInstanceMigrate(
1208 instance_name=inst.name,
1209 mode=self.op.mode,
1210 live=self.op.live,
1211 iallocator=self.op.iallocator,
1212 target_node=self.op.target_node,
1213 allow_runtime_changes=self.op.allow_runtime_changes,
1214 ignore_ipolicy=self.op.ignore_ipolicy)]
1215 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1216
1217
1218
1219
1220
1221
1222 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1223 frozenset([self.op.node_uuid]))
1224
1225 return ResultWithJobs(jobs)
1226
1227
1242
1243
1245 """Logical unit for modifying a storage volume on a node.
1246
1247 """
1248 REQ_BGL = False
1249
1251 (self.op.node_uuid, self.op.node_name) = \
1252 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1253
1254 storage_type = self.op.storage_type
1255
1256 try:
1257 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1258 except KeyError:
1259 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1260 " modified" % storage_type,
1261 errors.ECODE_INVAL)
1262
1263 diff = set(self.op.changes.keys()) - modifiable
1264 if diff:
1265 raise errors.OpPrereqError("The following fields can not be modified for"
1266 " storage units of type '%s': %r" %
1267 (storage_type, list(diff)),
1268 errors.ECODE_INVAL)
1269
1275
1277 self.needed_locks = {
1278 locking.LEVEL_NODE: self.op.node_uuid,
1279 }
1280
1281 - def Exec(self, feedback_fn):
1282 """Computes the list of nodes and their attributes.
1283
1284 """
1285 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1286 result = self.rpc.call_storage_modify(self.op.node_uuid,
1287 self.op.storage_type, st_args,
1288 self.op.name, self.op.changes)
1289 result.Raise("Failed to modify storage unit '%s' on %s" %
1290 (self.op.name, self.op.node_name))
1291
1292
1294 """Checks whether all selected fields are valid according to fields.
1295
1296 @type fields: L{utils.FieldSet}
1297 @param fields: fields set
1298 @type selected: L{utils.FieldSet}
1299 @param selected: fields set
1300
1301 """
1302 delta = fields.NonMatching(selected)
1303 if delta:
1304 raise errors.OpPrereqError("Unknown output fields selected: %s"
1305 % ",".join(delta), errors.ECODE_INVAL)
1306
1307
1309 """Logical unit for getting volumes on node(s).
1310
1311 """
1312 REQ_BGL = False
1313
1319
1331
1332 - def Exec(self, feedback_fn):
1383
1384
1386 """Logical unit for getting information on storage units on node(s).
1387
1388 """
1389 REQ_BGL = False
1390
1394
1406
1408 """Determines the default storage type of the cluster.
1409
1410 """
1411 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1412 default_storage_type = \
1413 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1414 return default_storage_type
1415
1417 """Check prerequisites.
1418
1419 """
1420 if self.op.storage_type:
1421 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1422 self.storage_type = self.op.storage_type
1423 else:
1424 self.storage_type = self._DetermineStorageType()
1425 supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1426 if self.storage_type not in supported_storage_types:
1427 raise errors.OpPrereqError(
1428 "Storage reporting for storage type '%s' is not supported. Please"
1429 " use the --storage-type option to specify one of the supported"
1430 " storage types (%s) or set the default disk template to one that"
1431 " supports storage reporting." %
1432 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1433
1434 - def Exec(self, feedback_fn):
1435 """Computes the list of nodes and their attributes.
1436
1437 """
1438 if self.op.storage_type:
1439 self.storage_type = self.op.storage_type
1440 else:
1441 self.storage_type = self._DetermineStorageType()
1442
1443 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1444
1445
1446 if constants.SF_NAME in self.op.output_fields:
1447 fields = self.op.output_fields[:]
1448 else:
1449 fields = [constants.SF_NAME] + self.op.output_fields
1450
1451
1452 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1453 while extra in fields:
1454 fields.remove(extra)
1455
1456 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1457 name_idx = field_idx[constants.SF_NAME]
1458
1459 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1460 data = self.rpc.call_storage_list(self.node_uuids,
1461 self.storage_type, st_args,
1462 self.op.name, fields)
1463
1464 result = []
1465
1466 for node_uuid in utils.NiceSort(self.node_uuids):
1467 node_name = self.cfg.GetNodeName(node_uuid)
1468 nresult = data[node_uuid]
1469 if nresult.offline:
1470 continue
1471
1472 msg = nresult.fail_msg
1473 if msg:
1474 self.LogWarning("Can't get storage data from node %s: %s",
1475 node_name, msg)
1476 continue
1477
1478 rows = dict([(row[name_idx], row) for row in nresult.payload])
1479
1480 for name in utils.NiceSort(rows.keys()):
1481 row = rows[name]
1482
1483 out = []
1484
1485 for field in self.op.output_fields:
1486 if field == constants.SF_NODE:
1487 val = node_name
1488 elif field == constants.SF_TYPE:
1489 val = self.storage_type
1490 elif field in field_idx:
1491 val = row[field_idx[field]]
1492 else:
1493 raise errors.ParameterError(field)
1494
1495 out.append(val)
1496
1497 result.append(out)
1498
1499 return result
1500
1501
1503 """Logical unit for removing a node.
1504
1505 """
1506 HPATH = "node-remove"
1507 HTYPE = constants.HTYPE_NODE
1508
1510 """Build hooks env.
1511
1512 """
1513 return {
1514 "OP_TARGET": self.op.node_name,
1515 "NODE_NAME": self.op.node_name,
1516 }
1517
1519 """Build hooks nodes.
1520
1521 This doesn't run on the target node in the pre phase as a failed
1522 node would then be impossible to remove.
1523
1524 """
1525 all_nodes = self.cfg.GetNodeList()
1526 try:
1527 all_nodes.remove(self.op.node_uuid)
1528 except ValueError:
1529 pass
1530 return (all_nodes, all_nodes)
1531
1560
1561 - def Exec(self, feedback_fn):
1562 """Removes the node from the cluster.
1563
1564 """
1565 logging.info("Stopping the node daemon and removing configs from node %s",
1566 self.node.name)
1567
1568 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1569
1570 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1571 "Not owning BGL"
1572
1573 if modify_ssh_setup:
1574
1575
1576 potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
1577 potential_master_candidate = \
1578 self.op.node_name in potential_master_candidates
1579 master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
1580 master_node = self.cfg.GetMasterNode()
1581 result = self.rpc.call_node_ssh_key_remove(
1582 [master_node],
1583 self.node.uuid, self.op.node_name,
1584 master_candidate_uuids, potential_master_candidates,
1585 self.node.master_candidate,
1586 potential_master_candidate,
1587 True,
1588 True,
1589 False)
1590 result[master_node].Raise(
1591 "Could not remove the SSH key of node '%s' (UUID: %s)." %
1592 (self.op.node_name, self.node.uuid))
1593 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
1594
1595
1596 AdjustCandidatePool(self, [self.node.uuid])
1597 self.context.RemoveNode(self.cfg, self.node)
1598
1599
1600 RunPostHook(self, self.node.name)
1601
1602
1603
1604 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1605 msg = result.fail_msg
1606 if msg:
1607 self.LogWarning("Errors encountered on the remote node while leaving"
1608 " the cluster: %s", msg)
1609
1610 cluster = self.cfg.GetClusterInfo()
1611
1612
1613 if self.node.master_candidate:
1614 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid)
1615
1616
1617 if cluster.modify_etc_hosts:
1618 master_node_uuid = self.cfg.GetMasterNode()
1619 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1620 constants.ETC_HOSTS_REMOVE,
1621 self.node.name, None)
1622 result.Raise("Can't update hosts file with new host data")
1623 RedistributeAncillaryFiles(self)
1624
1625
1627 """Repairs the volume group on a node.
1628
1629 """
1630 REQ_BGL = False
1631
1643
1645 self.needed_locks = {
1646 locking.LEVEL_NODE: [self.op.node_uuid],
1647 }
1648
1664
1666 """Check prerequisites.
1667
1668 """
1669 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1670
1671
1672 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1673 if not inst.disks_active:
1674 continue
1675 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid))
1676 check_nodes.discard(self.op.node_uuid)
1677 for inst_node_uuid in check_nodes:
1678 self._CheckFaultyDisks(inst, inst_node_uuid)
1679
1680 - def Exec(self, feedback_fn):
1691