1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with nodes."""
32
33 import logging
34 import operator
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import netutils
40 from ganeti import objects
41 from ganeti import opcodes
42 import ganeti.rpc.node as rpc
43 from ganeti import utils
44 from ganeti.masterd import iallocator
45
46 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
47 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
48 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
49 IsExclusiveStorageEnabledNode, CheckNodePVs, \
50 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
51 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
52 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
53 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
54 FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \
55 AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \
56 EnsureKvmdOnNodes, WarnAboutFailedSshUpdates
57
58
68
69
71 """Ensure that a node has the given secondary ip.
72
73 @type lu: L{LogicalUnit}
74 @param lu: the LU on behalf of which we make the check
75 @type node: L{objects.Node}
76 @param node: the node to check
77 @type secondary_ip: string
78 @param secondary_ip: the ip to check
79 @type prereq: boolean
80 @param prereq: whether to throw a prerequisite or an execute error
81 @raise errors.OpPrereqError: if the node doesn't have the ip,
82 and prereq=True
83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
84
85 """
86
87
88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
89 result.Raise("Failure checking secondary ip on node %s" % node.name,
90 prereq=prereq, ecode=errors.ECODE_ENVIRON)
91 if not result.payload:
92 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
93 " please fix and re-run this command" % secondary_ip)
94 if prereq:
95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
96 else:
97 raise errors.OpExecError(msg)
98
99
101 """Logical unit for adding node to the cluster.
102
103 """
104 HPATH = "node-add"
105 HTYPE = constants.HTYPE_NODE
106 _NFLAGS = ["master_capable", "vm_capable"]
107
122
124 """Build hooks env.
125
126 This will run on all nodes before, and on all nodes + the new node after.
127
128 """
129 return {
130 "OP_TARGET": self.op.node_name,
131 "NODE_NAME": self.op.node_name,
132 "NODE_PIP": self.op.primary_ip,
133 "NODE_SIP": self.op.secondary_ip,
134 "MASTER_CAPABLE": str(self.op.master_capable),
135 "VM_CAPABLE": str(self.op.vm_capable),
136 }
137
139 """Build hooks nodes.
140
141 """
142 hook_nodes = self.cfg.GetNodeList()
143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
144 if new_node_info is not None:
145
146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
147
148
149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
155 """Check prerequisites.
156
157 This checks:
158 - the new node is not already in the config
159 - it is resolvable
160 - its parameters (single/dual homed) matches the cluster
161
162 Any errors are signaled by raising errors.OpPrereqError.
163
164 """
165 node_name = self.hostname.name
166 self.op.primary_ip = self.hostname.ip
167 if self.op.secondary_ip is None:
168 if self.primary_ip_family == netutils.IP6Address.family:
169 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
170 " IPv4 address must be given as secondary",
171 errors.ECODE_INVAL)
172 self.op.secondary_ip = self.op.primary_ip
173
174 secondary_ip = self.op.secondary_ip
175 if not netutils.IP4Address.IsValid(secondary_ip):
176 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
177 " address" % secondary_ip, errors.ECODE_INVAL)
178
179 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
180 if not self.op.readd and existing_node_info is not None:
181 raise errors.OpPrereqError("Node %s is already in the configuration" %
182 node_name, errors.ECODE_EXISTS)
183 elif self.op.readd and existing_node_info is None:
184 raise errors.OpPrereqError("Node %s is not in the configuration" %
185 node_name, errors.ECODE_NOENT)
186
187 self.changed_primary_ip = False
188
189 for existing_node in self.cfg.GetAllNodesInfo().values():
190 if self.op.readd and node_name == existing_node.name:
191 if existing_node.secondary_ip != secondary_ip:
192 raise errors.OpPrereqError("Readded node doesn't have the same IP"
193 " address configuration as before",
194 errors.ECODE_INVAL)
195 if existing_node.primary_ip != self.op.primary_ip:
196 self.changed_primary_ip = True
197
198 continue
199
200 if (existing_node.primary_ip == self.op.primary_ip or
201 existing_node.secondary_ip == self.op.primary_ip or
202 existing_node.primary_ip == secondary_ip or
203 existing_node.secondary_ip == secondary_ip):
204 raise errors.OpPrereqError("New node ip address(es) conflict with"
205 " existing node %s" % existing_node.name,
206 errors.ECODE_NOTUNIQUE)
207
208
209
210 if self.op.readd:
211 assert existing_node_info is not None, \
212 "Can't retrieve locked node %s" % node_name
213 for attr in self._NFLAGS:
214 if getattr(self.op, attr) is None:
215 setattr(self.op, attr, getattr(existing_node_info, attr))
216 else:
217 for attr in self._NFLAGS:
218 if getattr(self.op, attr) is None:
219 setattr(self.op, attr, True)
220
221 if self.op.readd and not self.op.vm_capable:
222 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
223 if pri or sec:
224 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
225 " flag set to false, but it already holds"
226 " instances" % node_name,
227 errors.ECODE_STATE)
228
229
230
231 myself = self.cfg.GetMasterNodeInfo()
232 master_singlehomed = myself.secondary_ip == myself.primary_ip
233 newbie_singlehomed = secondary_ip == self.op.primary_ip
234 if master_singlehomed != newbie_singlehomed:
235 if master_singlehomed:
236 raise errors.OpPrereqError("The master has no secondary ip but the"
237 " new node has one",
238 errors.ECODE_INVAL)
239 else:
240 raise errors.OpPrereqError("The master has a secondary ip but the"
241 " new node doesn't have one",
242 errors.ECODE_INVAL)
243
244
245 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
246 raise errors.OpPrereqError("Node not reachable by ping",
247 errors.ECODE_ENVIRON)
248
249 if not newbie_singlehomed:
250
251 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
252 source=myself.secondary_ip):
253 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
254 " based ping to node daemon port",
255 errors.ECODE_ENVIRON)
256
257 if self.op.readd:
258 exceptions = [existing_node_info.uuid]
259 else:
260 exceptions = []
261
262 if self.op.master_capable:
263 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
264 else:
265 self.master_candidate = False
266
267 self.node_group = None
268 if self.op.readd:
269 self.new_node = existing_node_info
270 self.node_group = existing_node_info.group
271 else:
272 self.node_group = self.cfg.LookupNodeGroup(self.op.group)
273 self.new_node = objects.Node(name=node_name,
274 primary_ip=self.op.primary_ip,
275 secondary_ip=secondary_ip,
276 master_candidate=self.master_candidate,
277 offline=False, drained=False,
278 group=self.node_group, ndparams={})
279
280 if self.op.ndparams:
281 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
282 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
283 "node", "cluster or group")
284
285 if self.op.hv_state:
286 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
287
288 if self.op.disk_state:
289 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
290
291
292
293 rpcrunner = rpc.DnsOnlyRunner()
294 result = rpcrunner.call_version([node_name])[node_name]
295 result.Raise("Can't get version information from node %s" % node_name,
296 prereq=True, ecode=errors.ECODE_ENVIRON)
297 if constants.PROTOCOL_VERSION == result.payload:
298 logging.info("Communication to node %s fine, sw version %s match",
299 node_name, result.payload)
300 else:
301 raise errors.OpPrereqError("Version mismatch master version %s,"
302 " node version %s" %
303 (constants.PROTOCOL_VERSION, result.payload),
304 errors.ECODE_ENVIRON)
305
306 vg_name = self.cfg.GetVGName()
307 if vg_name is not None:
308 vparams = {constants.NV_PVLIST: [vg_name]}
309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
310 cname = self.cfg.GetClusterName()
311 result = rpcrunner.call_node_verify_light(
312 [node_name], vparams, cname,
313 self.cfg.GetClusterInfo().hvparams,
314 {node_name: self.node_group},
315 self.cfg.GetAllNodeGroupsInfoDict()
316 )[node_name]
317 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
318 if errmsgs:
319 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
320 "; ".join(errmsgs), errors.ECODE_ENVIRON)
321
339
340 - def _SshUpdate(self, new_node_uuid, new_node_name, is_master_candidate,
341 is_potential_master_candidate, rpcrunner, readd, feedback_fn):
342 """Update the SSH setup of all nodes after adding a new node.
343
344 @type readd: boolean
345 @param readd: whether or not this node is readded
346
347 """
348 potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
349 master_node = self.cfg.GetMasterNode()
350
351 if readd:
352
353 master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
354 remove_result = rpcrunner.call_node_ssh_key_remove(
355 [master_node],
356 new_node_uuid, new_node_name,
357 master_candidate_uuids,
358 potential_master_candidates,
359 True,
360 True,
361 False,
362 True,
363 True)
364 remove_result[master_node].Raise(
365 "Could not remove SSH keys of node %s before readding,"
366 " (UUID: %s)." % (new_node_name, new_node_uuid))
367 WarnAboutFailedSshUpdates(remove_result, master_node, feedback_fn)
368
369 result = rpcrunner.call_node_ssh_key_add(
370 [master_node], new_node_uuid, new_node_name,
371 potential_master_candidates,
372 is_master_candidate, is_potential_master_candidate,
373 is_potential_master_candidate)
374
375 result[master_node].Raise("Could not update the node's SSH setup.")
376 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
377
378 - def Exec(self, feedback_fn):
379 """Adds the new node to the cluster.
380
381 """
382 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
383 "Not owning BGL"
384
385
386 self.new_node.powered = True
387
388
389
390
391
392 if self.op.readd:
393 self.new_node.offline = False
394 self.new_node.drained = False
395 self.LogInfo("Readding a node, the offline/drained flags were reset")
396
397 self.new_node.master_candidate = self.master_candidate
398 if self.changed_primary_ip:
399 self.new_node.primary_ip = self.op.primary_ip
400
401
402 for attr in self._NFLAGS:
403 setattr(self.new_node, attr, getattr(self.op, attr))
404
405
406 if self.new_node.master_candidate:
407 self.LogInfo("Node will be a master candidate")
408
409 if self.op.ndparams:
410 self.new_node.ndparams = self.op.ndparams
411 else:
412 self.new_node.ndparams = {}
413
414 if self.op.hv_state:
415 self.new_node.hv_state_static = self.new_hv_state
416
417 if self.op.disk_state:
418 self.new_node.disk_state_static = self.new_disk_state
419
420
421 if self.cfg.GetClusterInfo().modify_etc_hosts:
422 master_node = self.cfg.GetMasterNode()
423 result = self.rpc.call_etc_hosts_modify(
424 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
425 self.hostname.ip)
426 result.Raise("Can't update hosts file with new host data")
427
428 if self.new_node.secondary_ip != self.new_node.primary_ip:
429 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
430 False)
431
432 node_verifier_uuids = [self.cfg.GetMasterNode()]
433 node_verify_param = {
434 constants.NV_NODELIST: ([self.new_node.name], {}, []),
435
436 }
437
438 result = self.rpc.call_node_verify(
439 node_verifier_uuids, node_verify_param,
440 self.cfg.GetClusterName(),
441 self.cfg.GetClusterInfo().hvparams,
442 {self.new_node.name: self.cfg.LookupNodeGroup(self.node_group)},
443 self.cfg.GetAllNodeGroupsInfoDict()
444 )
445 for verifier in node_verifier_uuids:
446 result[verifier].Raise("Cannot communicate with node %s" % verifier)
447 nl_payload = result[verifier].payload[constants.NV_NODELIST]
448 if nl_payload:
449 for failed in nl_payload:
450 feedback_fn("ssh/hostname verification failed"
451 " (checking from %s): %s" %
452 (verifier, nl_payload[failed]))
453 raise errors.OpExecError("ssh/hostname verification failed")
454
455 self._InitOpenVSwitch()
456
457 if self.op.readd:
458 self.context.ReaddNode(self.new_node)
459 RedistributeAncillaryFiles(self)
460
461 self.cfg.Update(self.new_node, feedback_fn)
462
463 if not self.new_node.master_candidate:
464 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
465 result.Warn("Node failed to demote itself from master candidate status",
466 self.LogWarning)
467 else:
468 self.context.AddNode(self.cfg, self.new_node, self.proc.GetECId())
469 RedistributeAncillaryFiles(self)
470
471
472 digest = GetClientCertDigest(self, self.new_node.uuid)
473 if self.new_node.master_candidate:
474 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest)
475 else:
476 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None)
477
478 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid])
479
480
481 if self.op.node_setup:
482
483 self._SshUpdate(self.new_node.uuid, self.new_node.name,
484 self.new_node.master_candidate, True,
485 self.rpc, self.op.readd, feedback_fn)
486
487
489 """Modifies the parameters of a node.
490
491 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
492 to the node role (as _ROLE_*)
493 @cvar _R2F: a dictionary from node role to tuples of flags
494 @cvar _FLAGS: a list of attribute names corresponding to the flags
495
496 """
497 HPATH = "node-modify"
498 HTYPE = constants.HTYPE_NODE
499 REQ_BGL = False
500 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
501 _F2R = {
502 (True, False, False): _ROLE_CANDIDATE,
503 (False, True, False): _ROLE_DRAINED,
504 (False, False, True): _ROLE_OFFLINE,
505 (False, False, False): _ROLE_REGULAR,
506 }
507 _R2F = dict((v, k) for k, v in _F2R.items())
508 _FLAGS = ["master_candidate", "drained", "offline"]
509
511 (self.op.node_uuid, self.op.node_name) = \
512 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
513 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
514 self.op.master_capable, self.op.vm_capable,
515 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
516 self.op.disk_state]
517 if all_mods.count(None) == len(all_mods):
518 raise errors.OpPrereqError("Please pass at least one modification",
519 errors.ECODE_INVAL)
520 if all_mods.count(True) > 1:
521 raise errors.OpPrereqError("Can't set the node into more than one"
522 " state at the same time",
523 errors.ECODE_INVAL)
524
525
526 self.might_demote = (self.op.master_candidate is False or
527 self.op.offline is True or
528 self.op.drained is True or
529 self.op.master_capable is False)
530
531 if self.op.secondary_ip:
532 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
533 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
534 " address" % self.op.secondary_ip,
535 errors.ECODE_INVAL)
536
537 self.lock_all = self.op.auto_promote and self.might_demote
538 self.lock_instances = self.op.secondary_ip is not None
539
546
576
578 """Build hooks env.
579
580 This runs on the master node.
581
582 """
583 return {
584 "OP_TARGET": self.op.node_name,
585 "MASTER_CANDIDATE": str(self.op.master_candidate),
586 "OFFLINE": str(self.op.offline),
587 "DRAINED": str(self.op.drained),
588 "MASTER_CAPABLE": str(self.op.master_capable),
589 "VM_CAPABLE": str(self.op.vm_capable),
590 }
591
593 """Build hooks nodes.
594
595 """
596 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
597 return (nl, nl)
598
600 """Check prerequisites.
601
602 This only checks the instance list against the existing names.
603
604 """
605 node = self.cfg.GetNodeInfo(self.op.node_uuid)
606 if self.lock_instances:
607 affected_instances = \
608 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
609
610
611 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
612 wanted_instance_names = frozenset([inst.name for inst in
613 affected_instances.values()])
614 if wanted_instance_names - owned_instance_names:
615 raise errors.OpPrereqError("Instances affected by changing node %s's"
616 " secondary IP address have changed since"
617 " locks were acquired, wanted '%s', have"
618 " '%s'; retry the operation" %
619 (node.name,
620 utils.CommaJoin(wanted_instance_names),
621 utils.CommaJoin(owned_instance_names)),
622 errors.ECODE_STATE)
623 else:
624 affected_instances = None
625
626 if (self.op.master_candidate is not None or
627 self.op.drained is not None or
628 self.op.offline is not None):
629
630 if node.uuid == self.cfg.GetMasterNode():
631 raise errors.OpPrereqError("The master role can be changed"
632 " only via master-failover",
633 errors.ECODE_INVAL)
634
635 if self.op.master_candidate and not node.master_capable:
636 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
637 " it a master candidate" % node.name,
638 errors.ECODE_STATE)
639
640 if self.op.vm_capable is False:
641 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
642 if ipri or isec:
643 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
644 " the vm_capable flag" % node.name,
645 errors.ECODE_STATE)
646
647 if node.master_candidate and self.might_demote and not self.lock_all:
648 assert not self.op.auto_promote, "auto_promote set but lock_all not"
649
650
651 (mc_remaining, mc_should, _) = \
652 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
653 if mc_remaining < mc_should:
654 raise errors.OpPrereqError("Not enough master candidates, please"
655 " pass auto promote option to allow"
656 " promotion (--auto-promote or RAPI"
657 " auto_promote=True)", errors.ECODE_STATE)
658
659 self.old_flags = old_flags = (node.master_candidate,
660 node.drained, node.offline)
661 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
662 self.old_role = old_role = self._F2R[old_flags]
663
664
665 for attr in self._FLAGS:
666 if getattr(self.op, attr) is False and getattr(node, attr) is False:
667 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
668 setattr(self.op, attr, None)
669
670
671
672
673
674 if SupportsOob(self.cfg, node):
675 if self.op.offline is False and not (node.powered or
676 self.op.powered is True):
677 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
678 " offline status can be reset") %
679 self.op.node_name, errors.ECODE_STATE)
680 elif self.op.powered is not None:
681 raise errors.OpPrereqError(("Unable to change powered state for node %s"
682 " as it does not support out-of-band"
683 " handling") % self.op.node_name,
684 errors.ECODE_STATE)
685
686
687 if (self.op.drained is False or self.op.offline is False or
688 (self.op.master_capable and not node.master_capable)):
689 if _DecideSelfPromotion(self):
690 self.op.master_candidate = True
691 self.LogInfo("Auto-promoting node to master candidate")
692
693
694 if self.op.master_capable is False and node.master_candidate:
695 if self.op.node_uuid == self.cfg.GetMasterNode():
696 raise errors.OpPrereqError("Master must remain master capable",
697 errors.ECODE_STATE)
698 self.LogInfo("Demoting from master candidate")
699 self.op.master_candidate = False
700
701
702 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
703 if self.op.master_candidate:
704 new_role = self._ROLE_CANDIDATE
705 elif self.op.drained:
706 new_role = self._ROLE_DRAINED
707 elif self.op.offline:
708 new_role = self._ROLE_OFFLINE
709 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
710
711
712 new_role = self._ROLE_REGULAR
713 else:
714 new_role = old_role
715
716 self.new_role = new_role
717
718 if old_role == self._ROLE_OFFLINE and new_role != old_role:
719
720 result = self.rpc.call_version([node.uuid])[node.uuid]
721 if result.fail_msg:
722 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
723 " to report its version: %s" %
724 (node.name, result.fail_msg),
725 errors.ECODE_STATE)
726 else:
727 self.LogWarning("Transitioning node from offline to online state"
728 " without using re-add. Please make sure the node"
729 " is healthy!")
730
731
732
733
734 if self.op.secondary_ip:
735
736 master = self.cfg.GetMasterNodeInfo()
737 master_singlehomed = master.secondary_ip == master.primary_ip
738 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
739 if self.op.force and node.uuid == master.uuid:
740 self.LogWarning("Transitioning from single-homed to multi-homed"
741 " cluster; all nodes will require a secondary IP"
742 " address")
743 else:
744 raise errors.OpPrereqError("Changing the secondary ip on a"
745 " single-homed cluster requires the"
746 " --force option to be passed, and the"
747 " target node to be the master",
748 errors.ECODE_INVAL)
749 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
750 if self.op.force and node.uuid == master.uuid:
751 self.LogWarning("Transitioning from multi-homed to single-homed"
752 " cluster; secondary IP addresses will have to be"
753 " removed")
754 else:
755 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
756 " same as the primary IP on a multi-homed"
757 " cluster, unless the --force option is"
758 " passed, and the target node is the"
759 " master", errors.ECODE_INVAL)
760
761 assert not (set([inst.name for inst in affected_instances.values()]) -
762 self.owned_locks(locking.LEVEL_INSTANCE))
763
764 if node.offline:
765 if affected_instances:
766 msg = ("Cannot change secondary IP address: offline node has"
767 " instances (%s) configured to use it" %
768 utils.CommaJoin(
769 [inst.name for inst in affected_instances.values()]))
770 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
771 else:
772
773
774 for instance in affected_instances.values():
775 CheckInstanceState(self, instance, INSTANCE_DOWN,
776 msg="cannot change secondary ip")
777
778 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
779 if master.uuid != node.uuid:
780
781 if not netutils.TcpPing(self.op.secondary_ip,
782 constants.DEFAULT_NODED_PORT,
783 source=master.secondary_ip):
784 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
785 " based ping to node daemon port",
786 errors.ECODE_ENVIRON)
787
788 if self.op.ndparams:
789 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
790 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
791 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
792 "node", "cluster or group")
793 self.new_ndparams = new_ndparams
794
795 if self.op.hv_state:
796 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
797 node.hv_state_static)
798
799 if self.op.disk_state:
800 self.new_disk_state = \
801 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
802
803 - def Exec(self, feedback_fn):
804 """Modifies a node.
805
806 """
807 node = self.cfg.GetNodeInfo(self.op.node_uuid)
808 result = []
809
810 if self.op.ndparams:
811 node.ndparams = self.new_ndparams
812
813 if self.op.powered is not None:
814 node.powered = self.op.powered
815
816 if self.op.hv_state:
817 node.hv_state_static = self.new_hv_state
818
819 if self.op.disk_state:
820 node.disk_state_static = self.new_disk_state
821
822 for attr in ["master_capable", "vm_capable"]:
823 val = getattr(self.op, attr)
824 if val is not None:
825 setattr(node, attr, val)
826 result.append((attr, str(val)))
827
828 if self.op.secondary_ip:
829 node.secondary_ip = self.op.secondary_ip
830 result.append(("secondary_ip", self.op.secondary_ip))
831
832
833 self.cfg.Update(node, feedback_fn)
834
835 if self.new_role != self.old_role:
836 new_flags = self._R2F[self.new_role]
837 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
838 if of != nf:
839 result.append((desc, str(nf)))
840 (node.master_candidate, node.drained, node.offline) = new_flags
841 self.cfg.Update(node, feedback_fn)
842
843
844
845
846 if self.old_role == self._ROLE_CANDIDATE and \
847 self.new_role != self._ROLE_OFFLINE:
848 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
849 if msg:
850 self.LogWarning("Node failed to demote itself: %s", msg)
851
852
853 if self.lock_all:
854 AdjustCandidatePool(self, [node.uuid])
855
856
857 if self.new_role == self._ROLE_CANDIDATE:
858 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid)
859
860 if self.old_role == self._ROLE_CANDIDATE:
861 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid)
862
863 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid])
864
865
866
867 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
868 self.context.ReaddNode(node)
869
870 if self.cfg.GetClusterInfo().modify_ssh_setup:
871 potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
872 master_node = self.cfg.GetMasterNode()
873 if self.old_role == self._ROLE_CANDIDATE:
874 master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
875 ssh_result = self.rpc.call_node_ssh_key_remove(
876 [master_node],
877 node.uuid, node.name,
878 master_candidate_uuids, potential_master_candidates,
879 True,
880 False,
881 False,
882 False,
883 False)
884 ssh_result[master_node].Raise(
885 "Could not adjust the SSH setup after demoting node '%s'"
886 " (UUID: %s)." % (node.name, node.uuid))
887 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn)
888
889 if self.new_role == self._ROLE_CANDIDATE:
890 ssh_result = self.rpc.call_node_ssh_key_add(
891 [master_node], node.uuid, node.name,
892 potential_master_candidates,
893 True,
894 True,
895 False)
896 ssh_result[master_node].Raise(
897 "Could not update the SSH setup of node '%s' after promotion"
898 " (UUID: %s)." % (node.name, node.uuid))
899 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn)
900
901 return result
902
903
905 """Powercycles a node.
906
907 """
908 REQ_BGL = False
909
918
920 """Locking for PowercycleNode.
921
922 This is a last-resort option and shouldn't block on other
923 jobs. Therefore, we grab no locks.
924
925 """
926 self.needed_locks = {}
927
928 - def Exec(self, feedback_fn):
929 """Reboots a node.
930
931 """
932 default_hypervisor = self.cfg.GetHypervisorType()
933 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
934 result = self.rpc.call_node_powercycle(self.op.node_uuid,
935 default_hypervisor,
936 hvparams)
937 result.Raise("Failed to schedule the reboot")
938 return result.payload
939
940
943
944
946 """Returns primary instances on a node.
947
948 """
949 return _GetNodeInstancesInner(cfg,
950 lambda inst: node_uuid == inst.primary_node)
951
952
960
961
963 """Returns a list of all primary and secondary instances on a node.
964
965 """
966
967 return _GetNodeInstancesInner(cfg,
968 lambda inst: node_uuid in
969 cfg.GetInstanceNodes(inst.uuid.uuid))
970
971
973 """Evacuates instances off a list of nodes.
974
975 """
976 REQ_BGL = False
977
980
982 (self.op.node_uuid, self.op.node_name) = \
983 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
984
985 if self.op.remote_node is not None:
986 (self.op.remote_node_uuid, self.op.remote_node) = \
987 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
988 self.op.remote_node)
989 assert self.op.remote_node
990
991 if self.op.node_uuid == self.op.remote_node_uuid:
992 raise errors.OpPrereqError("Can not use evacuated node as a new"
993 " secondary node", errors.ECODE_INVAL)
994
995 if self.op.mode != constants.NODE_EVAC_SEC:
996 raise errors.OpPrereqError("Without the use of an iallocator only"
997 " secondary instances can be evacuated",
998 errors.ECODE_INVAL)
999
1000
1001 self.share_locks = ShareAll()
1002 self.needed_locks = {
1003 locking.LEVEL_INSTANCE: [],
1004 locking.LEVEL_NODEGROUP: [],
1005 locking.LEVEL_NODE: [],
1006 }
1007
1008
1009
1010 self.lock_nodes = self._DetermineNodes()
1011
1013 """Gets the list of node UUIDs to operate on.
1014
1015 """
1016 if self.op.remote_node is None:
1017
1018 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
1019 else:
1020 group_nodes = frozenset([self.op.remote_node_uuid])
1021
1022
1023 return set([self.op.node_uuid]) | group_nodes
1024
1053
1069
1071
1072 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
1073 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
1074 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1075
1076 need_nodes = self._DetermineNodes()
1077
1078 if not owned_nodes.issuperset(need_nodes):
1079 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
1080 " locks were acquired, current nodes are"
1081 " are '%s', used to be '%s'; retry the"
1082 " operation" %
1083 (self.op.node_name,
1084 utils.CommaJoin(need_nodes),
1085 utils.CommaJoin(owned_nodes)),
1086 errors.ECODE_STATE)
1087
1088 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
1089 if owned_groups != wanted_groups:
1090 raise errors.OpExecError("Node groups changed since locks were acquired,"
1091 " current groups are '%s', used to be '%s';"
1092 " retry the operation" %
1093 (utils.CommaJoin(wanted_groups),
1094 utils.CommaJoin(owned_groups)))
1095
1096
1097 self.instances = self._DetermineInstances()
1098 self.instance_names = [i.name for i in self.instances]
1099
1100 if set(self.instance_names) != owned_instance_names:
1101 raise errors.OpExecError("Instances on node '%s' changed since locks"
1102 " were acquired, current instances are '%s',"
1103 " used to be '%s'; retry the operation" %
1104 (self.op.node_name,
1105 utils.CommaJoin(self.instance_names),
1106 utils.CommaJoin(owned_instance_names)))
1107
1108 if self.instance_names:
1109 self.LogInfo("Evacuating instances from node '%s': %s",
1110 self.op.node_name,
1111 utils.CommaJoin(utils.NiceSort(self.instance_names)))
1112 else:
1113 self.LogInfo("No instances to evacuate from node '%s'",
1114 self.op.node_name)
1115
1116 if self.op.remote_node is not None:
1117 for i in self.instances:
1118 if i.primary_node == self.op.remote_node_uuid:
1119 raise errors.OpPrereqError("Node %s is the primary node of"
1120 " instance %s, cannot use it as"
1121 " secondary" %
1122 (self.op.remote_node, i.name),
1123 errors.ECODE_INVAL)
1124
1125 - def Exec(self, feedback_fn):
1126 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1127
1128 if not self.instance_names:
1129
1130 jobs = []
1131
1132 elif self.op.iallocator is not None:
1133
1134 req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1135 instances=list(self.instance_names))
1136 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1137
1138 ial.Run(self.op.iallocator)
1139
1140 if not ial.success:
1141 raise errors.OpPrereqError("Can't compute node evacuation using"
1142 " iallocator '%s': %s" %
1143 (self.op.iallocator, ial.info),
1144 errors.ECODE_NORES)
1145
1146 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1147
1148 elif self.op.remote_node is not None:
1149 assert self.op.mode == constants.NODE_EVAC_SEC
1150 jobs = [
1151 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1152 remote_node=self.op.remote_node,
1153 disks=[],
1154 mode=constants.REPLACE_DISK_CHG,
1155 early_release=self.op.early_release)]
1156 for instance_name in self.instance_names]
1157
1158 else:
1159 raise errors.ProgrammerError("No iallocator or remote node")
1160
1161 return ResultWithJobs(jobs)
1162
1163
1165 """Migrate all instances from a node.
1166
1167 """
1168 HPATH = "node-migrate"
1169 HTYPE = constants.HTYPE_NODE
1170 REQ_BGL = False
1171
1174
1176 (self.op.node_uuid, self.op.node_name) = \
1177 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1178
1179 self.share_locks = ShareAll()
1180 self.needed_locks = {
1181 locking.LEVEL_NODE: [self.op.node_uuid],
1182 }
1183
1185 """Build hooks env.
1186
1187 This runs on the master, the primary and all the secondaries.
1188
1189 """
1190 return {
1191 "NODE_NAME": self.op.node_name,
1192 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1193 }
1194
1196 """Build hooks nodes.
1197
1198 """
1199 nl = [self.cfg.GetMasterNode()]
1200 return (nl, nl)
1201
1204
1205 - def Exec(self, feedback_fn):
1206
1207 jobs = [
1208 [opcodes.OpInstanceMigrate(
1209 instance_name=inst.name,
1210 mode=self.op.mode,
1211 live=self.op.live,
1212 iallocator=self.op.iallocator,
1213 target_node=self.op.target_node,
1214 allow_runtime_changes=self.op.allow_runtime_changes,
1215 ignore_ipolicy=self.op.ignore_ipolicy)]
1216 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1217
1218
1219
1220
1221
1222
1223 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1224 frozenset([self.op.node_uuid]))
1225
1226 return ResultWithJobs(jobs)
1227
1228
1243
1244
1246 """Logical unit for modifying a storage volume on a node.
1247
1248 """
1249 REQ_BGL = False
1250
1252 (self.op.node_uuid, self.op.node_name) = \
1253 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1254
1255 storage_type = self.op.storage_type
1256
1257 try:
1258 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1259 except KeyError:
1260 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1261 " modified" % storage_type,
1262 errors.ECODE_INVAL)
1263
1264 diff = set(self.op.changes.keys()) - modifiable
1265 if diff:
1266 raise errors.OpPrereqError("The following fields can not be modified for"
1267 " storage units of type '%s': %r" %
1268 (storage_type, list(diff)),
1269 errors.ECODE_INVAL)
1270
1276
1278 self.needed_locks = {
1279 locking.LEVEL_NODE: self.op.node_uuid,
1280 }
1281
1282 - def Exec(self, feedback_fn):
1283 """Computes the list of nodes and their attributes.
1284
1285 """
1286 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1287 result = self.rpc.call_storage_modify(self.op.node_uuid,
1288 self.op.storage_type, st_args,
1289 self.op.name, self.op.changes)
1290 result.Raise("Failed to modify storage unit '%s' on %s" %
1291 (self.op.name, self.op.node_name))
1292
1293
1295 """Checks whether all selected fields are valid according to fields.
1296
1297 @type fields: L{utils.FieldSet}
1298 @param fields: fields set
1299 @type selected: L{utils.FieldSet}
1300 @param selected: fields set
1301
1302 """
1303 delta = fields.NonMatching(selected)
1304 if delta:
1305 raise errors.OpPrereqError("Unknown output fields selected: %s"
1306 % ",".join(delta), errors.ECODE_INVAL)
1307
1308
1310 """Logical unit for getting volumes on node(s).
1311
1312 """
1313 REQ_BGL = False
1314
1320
1333
1334 - def Exec(self, feedback_fn):
1385
1386
1388 """Logical unit for getting information on storage units on node(s).
1389
1390 """
1391 REQ_BGL = False
1392
1396
1409
1411 """Determines the default storage type of the cluster.
1412
1413 """
1414 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1415 default_storage_type = \
1416 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1417 return default_storage_type
1418
1420 """Check prerequisites.
1421
1422 """
1423 if self.op.storage_type:
1424 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1425 self.storage_type = self.op.storage_type
1426 else:
1427 self.storage_type = self._DetermineStorageType()
1428 supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1429 if self.storage_type not in supported_storage_types:
1430 raise errors.OpPrereqError(
1431 "Storage reporting for storage type '%s' is not supported. Please"
1432 " use the --storage-type option to specify one of the supported"
1433 " storage types (%s) or set the default disk template to one that"
1434 " supports storage reporting." %
1435 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1436
1437 - def Exec(self, feedback_fn):
1438 """Computes the list of nodes and their attributes.
1439
1440 """
1441 if self.op.storage_type:
1442 self.storage_type = self.op.storage_type
1443 else:
1444 self.storage_type = self._DetermineStorageType()
1445
1446 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1447
1448
1449 if constants.SF_NAME in self.op.output_fields:
1450 fields = self.op.output_fields[:]
1451 else:
1452 fields = [constants.SF_NAME] + self.op.output_fields
1453
1454
1455 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1456 while extra in fields:
1457 fields.remove(extra)
1458
1459 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1460 name_idx = field_idx[constants.SF_NAME]
1461
1462 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1463 data = self.rpc.call_storage_list(self.node_uuids,
1464 self.storage_type, st_args,
1465 self.op.name, fields)
1466
1467 result = []
1468
1469 for node_uuid in utils.NiceSort(self.node_uuids):
1470 node_name = self.cfg.GetNodeName(node_uuid)
1471 nresult = data[node_uuid]
1472 if nresult.offline:
1473 continue
1474
1475 msg = nresult.fail_msg
1476 if msg:
1477 self.LogWarning("Can't get storage data from node %s: %s",
1478 node_name, msg)
1479 continue
1480
1481 rows = dict([(row[name_idx], row) for row in nresult.payload])
1482
1483 for name in utils.NiceSort(rows.keys()):
1484 row = rows[name]
1485
1486 out = []
1487
1488 for field in self.op.output_fields:
1489 if field == constants.SF_NODE:
1490 val = node_name
1491 elif field == constants.SF_TYPE:
1492 val = self.storage_type
1493 elif field in field_idx:
1494 val = row[field_idx[field]]
1495 else:
1496 raise errors.ParameterError(field)
1497
1498 out.append(val)
1499
1500 result.append(out)
1501
1502 return result
1503
1504
1506 """Logical unit for removing a node.
1507
1508 """
1509 HPATH = "node-remove"
1510 HTYPE = constants.HTYPE_NODE
1511
1513 """Build hooks env.
1514
1515 """
1516 return {
1517 "OP_TARGET": self.op.node_name,
1518 "NODE_NAME": self.op.node_name,
1519 }
1520
1522 """Build hooks nodes.
1523
1524 This doesn't run on the target node in the pre phase as a failed
1525 node would then be impossible to remove.
1526
1527 """
1528 all_nodes = self.cfg.GetNodeList()
1529 try:
1530 all_nodes.remove(self.op.node_uuid)
1531 except ValueError:
1532 pass
1533 return (all_nodes, all_nodes)
1534
1563
1564 - def Exec(self, feedback_fn):
1565 """Removes the node from the cluster.
1566
1567 """
1568 logging.info("Stopping the node daemon and removing configs from node %s",
1569 self.node.name)
1570
1571 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1572
1573 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1574 "Not owning BGL"
1575
1576 if modify_ssh_setup:
1577
1578
1579 potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
1580 potential_master_candidate = \
1581 self.op.node_name in potential_master_candidates
1582 master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
1583 master_node = self.cfg.GetMasterNode()
1584 result = self.rpc.call_node_ssh_key_remove(
1585 [master_node],
1586 self.node.uuid, self.op.node_name,
1587 master_candidate_uuids, potential_master_candidates,
1588 self.node.master_candidate,
1589 potential_master_candidate,
1590 True,
1591 True,
1592 False)
1593 result[master_node].Raise(
1594 "Could not remove the SSH key of node '%s' (UUID: %s)." %
1595 (self.op.node_name, self.node.uuid))
1596 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
1597
1598
1599 AdjustCandidatePool(self, [self.node.uuid])
1600 self.context.RemoveNode(self.cfg, self.node)
1601
1602
1603 RunPostHook(self, self.node.name)
1604
1605
1606
1607 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1608 msg = result.fail_msg
1609 if msg:
1610 self.LogWarning("Errors encountered on the remote node while leaving"
1611 " the cluster: %s", msg)
1612
1613 cluster = self.cfg.GetClusterInfo()
1614
1615
1616 if self.node.master_candidate:
1617 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid)
1618
1619
1620 if cluster.modify_etc_hosts:
1621 master_node_uuid = self.cfg.GetMasterNode()
1622 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1623 constants.ETC_HOSTS_REMOVE,
1624 self.node.name, None)
1625 result.Raise("Can't update hosts file with new host data")
1626 RedistributeAncillaryFiles(self)
1627
1628
1630 """Repairs the volume group on a node.
1631
1632 """
1633 REQ_BGL = False
1634
1646
1648 self.needed_locks = {
1649 locking.LEVEL_NODE: [self.op.node_uuid],
1650 }
1651
1667
1669 """Check prerequisites.
1670
1671 """
1672 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1673
1674
1675 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1676 if not inst.disks_active:
1677 continue
1678 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid))
1679 check_nodes.discard(self.op.node_uuid)
1680 for inst_node_uuid in check_nodes:
1681 self._CheckFaultyDisks(inst, inst_node_uuid)
1682
1683 - def Exec(self, feedback_fn):
1694