1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with nodes."""
32
33 import logging
34 import operator
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import netutils
40 from ganeti import objects
41 from ganeti import opcodes
42 import ganeti.rpc.node as rpc
43 from ganeti import utils
44 from ganeti.masterd import iallocator
45
46 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
47 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
48 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
49 IsExclusiveStorageEnabledNode, CheckNodePVs, \
50 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
51 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
52 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
53 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
54 FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \
55 AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \
56 EnsureKvmdOnNodes, WarnAboutFailedSshUpdates, AddMasterCandidateSshKey
57
58
68
69
71 """Ensure that a node has the given secondary ip.
72
73 @type lu: L{LogicalUnit}
74 @param lu: the LU on behalf of which we make the check
75 @type node: L{objects.Node}
76 @param node: the node to check
77 @type secondary_ip: string
78 @param secondary_ip: the ip to check
79 @type prereq: boolean
80 @param prereq: whether to throw a prerequisite or an execute error
81 @raise errors.OpPrereqError: if the node doesn't have the ip,
82 and prereq=True
83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
84
85 """
86
87
88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
89 result.Raise("Failure checking secondary ip on node %s" % node.name,
90 prereq=prereq, ecode=errors.ECODE_ENVIRON)
91 if not result.payload:
92 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
93 " please fix and re-run this command" % secondary_ip)
94 if prereq:
95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
96 else:
97 raise errors.OpExecError(msg)
98
99
101 """Logical unit for adding node to the cluster.
102
103 """
104 HPATH = "node-add"
105 HTYPE = constants.HTYPE_NODE
106 _NFLAGS = ["master_capable", "vm_capable"]
107
122
124 """Build hooks env.
125
126 This will run on all nodes before, and on all nodes + the new node after.
127
128 """
129 return {
130 "OP_TARGET": self.op.node_name,
131 "NODE_NAME": self.op.node_name,
132 "NODE_PIP": self.op.primary_ip,
133 "NODE_SIP": self.op.secondary_ip,
134 "MASTER_CAPABLE": str(self.op.master_capable),
135 "VM_CAPABLE": str(self.op.vm_capable),
136 }
137
139 """Build hooks nodes.
140
141 """
142 hook_nodes = self.cfg.GetNodeList()
143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
144 if new_node_info is not None:
145
146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
147
148
149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
155 """Cleans up if the hooks fail.
156
157 This function runs actions that necessary to bring the cluster into a
158 clean state again. This is necessary if for example the hooks of this
159 operation failed and leave the node in an inconsistent state.
160
161 """
162 if phase == constants.HOOKS_PHASE_PRE:
163 feedback_fn("Pre operation hook failed. Rolling back preparations.")
164
165 master_node = self.cfg.GetMasterNodeInfo().name
166 remove_result = self.rpc.call_node_ssh_key_remove_light(
167 [master_node],
168 self.op.node_name)
169 remove_result[master_node].Raise(
170 "Error removing SSH key of node '%s'." % self.op.node_name)
171
173 """Check prerequisites.
174
175 This checks:
176 - the new node is not already in the config
177 - it is resolvable
178 - its parameters (single/dual homed) matches the cluster
179
180 Any errors are signaled by raising errors.OpPrereqError.
181
182 """
183 node_name = self.hostname.name
184 self.op.primary_ip = self.hostname.ip
185 if self.op.secondary_ip is None:
186 if self.primary_ip_family == netutils.IP6Address.family:
187 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
188 " IPv4 address must be given as secondary",
189 errors.ECODE_INVAL)
190 self.op.secondary_ip = self.op.primary_ip
191
192 secondary_ip = self.op.secondary_ip
193 if not netutils.IP4Address.IsValid(secondary_ip):
194 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
195 " address" % secondary_ip, errors.ECODE_INVAL)
196
197 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
198 if not self.op.readd and existing_node_info is not None:
199 raise errors.OpPrereqError("Node %s is already in the configuration" %
200 node_name, errors.ECODE_EXISTS)
201 elif self.op.readd and existing_node_info is None:
202 raise errors.OpPrereqError("Node %s is not in the configuration" %
203 node_name, errors.ECODE_NOENT)
204
205 self.changed_primary_ip = False
206
207 for existing_node in self.cfg.GetAllNodesInfo().values():
208 if self.op.readd and node_name == existing_node.name:
209 if existing_node.secondary_ip != secondary_ip:
210 raise errors.OpPrereqError("Readded node doesn't have the same IP"
211 " address configuration as before",
212 errors.ECODE_INVAL)
213 if existing_node.primary_ip != self.op.primary_ip:
214 self.changed_primary_ip = True
215
216 continue
217
218 if (existing_node.primary_ip == self.op.primary_ip or
219 existing_node.secondary_ip == self.op.primary_ip or
220 existing_node.primary_ip == secondary_ip or
221 existing_node.secondary_ip == secondary_ip):
222 raise errors.OpPrereqError("New node ip address(es) conflict with"
223 " existing node %s" % existing_node.name,
224 errors.ECODE_NOTUNIQUE)
225
226
227
228 if self.op.readd:
229 assert existing_node_info is not None, \
230 "Can't retrieve locked node %s" % node_name
231 for attr in self._NFLAGS:
232 if getattr(self.op, attr) is None:
233 setattr(self.op, attr, getattr(existing_node_info, attr))
234 else:
235 for attr in self._NFLAGS:
236 if getattr(self.op, attr) is None:
237 setattr(self.op, attr, True)
238
239 if self.op.readd and not self.op.vm_capable:
240 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
241 if pri or sec:
242 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
243 " flag set to false, but it already holds"
244 " instances" % node_name,
245 errors.ECODE_STATE)
246
247
248
249 myself = self.cfg.GetMasterNodeInfo()
250 master_singlehomed = myself.secondary_ip == myself.primary_ip
251 newbie_singlehomed = secondary_ip == self.op.primary_ip
252 if master_singlehomed != newbie_singlehomed:
253 if master_singlehomed:
254 raise errors.OpPrereqError("The master has no secondary ip but the"
255 " new node has one",
256 errors.ECODE_INVAL)
257 else:
258 raise errors.OpPrereqError("The master has a secondary ip but the"
259 " new node doesn't have one",
260 errors.ECODE_INVAL)
261
262
263 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
264 raise errors.OpPrereqError("Node not reachable by ping",
265 errors.ECODE_ENVIRON)
266
267 if not newbie_singlehomed:
268
269 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
270 source=myself.secondary_ip):
271 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
272 " based ping to node daemon port",
273 errors.ECODE_ENVIRON)
274
275 if self.op.readd:
276 exceptions = [existing_node_info.uuid]
277 else:
278 exceptions = []
279
280 if self.op.master_capable:
281 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
282 else:
283 self.master_candidate = False
284
285 self.node_group = None
286 if self.op.readd:
287 self.new_node = existing_node_info
288 self.node_group = existing_node_info.group
289 else:
290 self.node_group = self.cfg.LookupNodeGroup(self.op.group)
291 self.new_node = objects.Node(name=node_name,
292 primary_ip=self.op.primary_ip,
293 secondary_ip=secondary_ip,
294 master_candidate=self.master_candidate,
295 offline=False, drained=False,
296 group=self.node_group, ndparams={})
297
298 if self.op.ndparams:
299 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
300 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
301 "node", "cluster or group")
302
303 if self.op.hv_state:
304 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
305
306 if self.op.disk_state:
307 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
308
309
310
311 rpcrunner = rpc.DnsOnlyRunner()
312 result = rpcrunner.call_version([node_name])[node_name]
313 result.Raise("Can't get version information from node %s" % node_name,
314 prereq=True, ecode=errors.ECODE_ENVIRON)
315 if constants.PROTOCOL_VERSION == result.payload:
316 logging.info("Communication to node %s fine, sw version %s match",
317 node_name, result.payload)
318 else:
319 raise errors.OpPrereqError("Version mismatch master version %s,"
320 " node version %s" %
321 (constants.PROTOCOL_VERSION, result.payload),
322 errors.ECODE_ENVIRON)
323
324 vg_name = self.cfg.GetVGName()
325 if vg_name is not None:
326 vparams = {constants.NV_PVLIST: [vg_name]}
327 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
328 cname = self.cfg.GetClusterName()
329 result = rpcrunner.call_node_verify_light(
330 [node_name], vparams, cname,
331 self.cfg.GetClusterInfo().hvparams,
332 )[node_name]
333 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
334 if errmsgs:
335 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
336 "; ".join(errmsgs), errors.ECODE_ENVIRON)
337
355
356 - def _SshUpdate(self, new_node_uuid, new_node_name, is_master_candidate,
357 is_potential_master_candidate, rpcrunner, readd, feedback_fn):
358 """Update the SSH setup of all nodes after adding a new node.
359
360 @type readd: boolean
361 @param readd: whether or not this node is readded
362
363 """
364 potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
365 master_node = self.cfg.GetMasterNode()
366
367 if readd:
368
369 master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
370 remove_result = rpcrunner.call_node_ssh_key_remove(
371 [master_node],
372 new_node_uuid, new_node_name,
373 master_candidate_uuids,
374 potential_master_candidates,
375 True,
376 True,
377 False,
378 True,
379 True,
380 self.op.debug,
381 self.op.verbose)
382 remove_result[master_node].Raise(
383 "Could not remove SSH keys of node %s before readding,"
384 " (UUID: %s)." % (new_node_name, new_node_uuid))
385 WarnAboutFailedSshUpdates(remove_result, master_node, feedback_fn)
386
387 result = rpcrunner.call_node_ssh_key_add(
388 [master_node], new_node_uuid, new_node_name,
389 potential_master_candidates,
390 is_master_candidate, is_potential_master_candidate,
391 is_potential_master_candidate, self.op.debug, self.op.verbose)
392
393 result[master_node].Raise("Could not update the node's SSH setup.")
394 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
395
396 - def Exec(self, feedback_fn):
397 """Adds the new node to the cluster.
398
399 """
400 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
401 "Not owning BGL"
402
403
404 self.new_node.powered = True
405
406
407
408
409
410 if self.op.readd:
411 self.new_node.offline = False
412 self.new_node.drained = False
413 self.LogInfo("Readding a node, the offline/drained flags were reset")
414
415 self.new_node.master_candidate = self.master_candidate
416 if self.changed_primary_ip:
417 self.new_node.primary_ip = self.op.primary_ip
418
419
420 for attr in self._NFLAGS:
421 setattr(self.new_node, attr, getattr(self.op, attr))
422
423
424 if self.new_node.master_candidate:
425 self.LogInfo("Node will be a master candidate")
426
427 if self.op.ndparams:
428 self.new_node.ndparams = self.op.ndparams
429 else:
430 self.new_node.ndparams = {}
431
432 if self.op.hv_state:
433 self.new_node.hv_state_static = self.new_hv_state
434
435 if self.op.disk_state:
436 self.new_node.disk_state_static = self.new_disk_state
437
438
439 if self.cfg.GetClusterInfo().modify_etc_hosts:
440 master_node = self.cfg.GetMasterNode()
441 result = self.rpc.call_etc_hosts_modify(
442 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
443 self.hostname.ip)
444 result.Raise("Can't update hosts file with new host data")
445
446 if self.new_node.secondary_ip != self.new_node.primary_ip:
447 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
448 False)
449
450 node_verifier_uuids = [self.cfg.GetMasterNode()]
451 node_verify_param = {
452 constants.NV_NODELIST: ([self.new_node.name], {}, []),
453
454 }
455
456 result = self.rpc.call_node_verify(
457 node_verifier_uuids, node_verify_param,
458 self.cfg.GetClusterName(),
459 self.cfg.GetClusterInfo().hvparams)
460 for verifier in node_verifier_uuids:
461 result[verifier].Raise("Cannot communicate with node %s" % verifier)
462 nl_payload = result[verifier].payload[constants.NV_NODELIST]
463 if nl_payload:
464 for failed in nl_payload:
465 feedback_fn("ssh/hostname verification failed"
466 " (checking from %s): %s" %
467 (verifier, nl_payload[failed]))
468 raise errors.OpExecError("ssh/hostname verification failed")
469
470 self._InitOpenVSwitch()
471
472 if self.op.readd:
473 RedistributeAncillaryFiles(self)
474
475 self.cfg.Update(self.new_node, feedback_fn)
476
477 if not self.new_node.master_candidate:
478 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
479 result.Warn("Node failed to demote itself from master candidate status",
480 self.LogWarning)
481 else:
482 self.cfg.AddNode(self.new_node, self.proc.GetECId())
483 RedistributeAncillaryFiles(self)
484
485
486 digest = GetClientCertDigest(self, self.new_node.uuid)
487 if self.new_node.master_candidate:
488 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest)
489 else:
490 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None)
491
492 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid])
493
494
495 if self.op.node_setup:
496
497 self._SshUpdate(self.new_node.uuid, self.new_node.name,
498 self.new_node.master_candidate, True,
499 self.rpc, self.op.readd, feedback_fn)
500
501
503 """Modifies the parameters of a node.
504
505 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
506 to the node role (as _ROLE_*)
507 @cvar _R2F: a dictionary from node role to tuples of flags
508 @cvar _FLAGS: a list of attribute names corresponding to the flags
509
510 """
511 HPATH = "node-modify"
512 HTYPE = constants.HTYPE_NODE
513 REQ_BGL = False
514 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
515 _F2R = {
516 (True, False, False): _ROLE_CANDIDATE,
517 (False, True, False): _ROLE_DRAINED,
518 (False, False, True): _ROLE_OFFLINE,
519 (False, False, False): _ROLE_REGULAR,
520 }
521 _R2F = dict((v, k) for k, v in _F2R.items())
522 _FLAGS = ["master_candidate", "drained", "offline"]
523
525 (self.op.node_uuid, self.op.node_name) = \
526 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
527 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
528 self.op.master_capable, self.op.vm_capable,
529 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
530 self.op.disk_state]
531 if all_mods.count(None) == len(all_mods):
532 raise errors.OpPrereqError("Please pass at least one modification",
533 errors.ECODE_INVAL)
534 if all_mods.count(True) > 1:
535 raise errors.OpPrereqError("Can't set the node into more than one"
536 " state at the same time",
537 errors.ECODE_INVAL)
538
539
540 self.might_demote = (self.op.master_candidate is False or
541 self.op.offline is True or
542 self.op.drained is True or
543 self.op.master_capable is False)
544
545 if self.op.secondary_ip:
546 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
547 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
548 " address" % self.op.secondary_ip,
549 errors.ECODE_INVAL)
550
551 self.lock_all = self.op.auto_promote and self.might_demote
552 self.lock_instances = self.op.secondary_ip is not None
553
562
588
590 """Build hooks env.
591
592 This runs on the master node.
593
594 """
595 return {
596 "OP_TARGET": self.op.node_name,
597 "MASTER_CANDIDATE": str(self.op.master_candidate),
598 "OFFLINE": str(self.op.offline),
599 "DRAINED": str(self.op.drained),
600 "MASTER_CAPABLE": str(self.op.master_capable),
601 "VM_CAPABLE": str(self.op.vm_capable),
602 }
603
605 """Build hooks nodes.
606
607 """
608 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
609 return (nl, nl)
610
612 """Check prerequisites.
613
614 This only checks the instance list against the existing names.
615
616 """
617 node = self.cfg.GetNodeInfo(self.op.node_uuid)
618 if self.lock_instances:
619 affected_instances = \
620 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
621
622
623 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
624 wanted_instance_names = frozenset([inst.name for inst in
625 affected_instances.values()])
626 if wanted_instance_names - owned_instance_names:
627 raise errors.OpPrereqError("Instances affected by changing node %s's"
628 " secondary IP address have changed since"
629 " locks were acquired, wanted '%s', have"
630 " '%s'; retry the operation" %
631 (node.name,
632 utils.CommaJoin(wanted_instance_names),
633 utils.CommaJoin(owned_instance_names)),
634 errors.ECODE_STATE)
635 else:
636 affected_instances = None
637
638 if (self.op.master_candidate is not None or
639 self.op.drained is not None or
640 self.op.offline is not None):
641
642 if node.uuid == self.cfg.GetMasterNode():
643 raise errors.OpPrereqError("The master role can be changed"
644 " only via master-failover",
645 errors.ECODE_INVAL)
646
647 if self.op.master_candidate and not node.master_capable:
648 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
649 " it a master candidate" % node.name,
650 errors.ECODE_STATE)
651
652 if self.op.vm_capable is False:
653 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
654 if ipri or isec:
655 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
656 " the vm_capable flag" % node.name,
657 errors.ECODE_STATE)
658
659 if node.master_candidate and self.might_demote and not self.lock_all:
660 assert not self.op.auto_promote, "auto_promote set but lock_all not"
661
662
663 (mc_remaining, mc_should, _) = \
664 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
665 if mc_remaining < mc_should:
666 raise errors.OpPrereqError("Not enough master candidates, please"
667 " pass auto promote option to allow"
668 " promotion (--auto-promote or RAPI"
669 " auto_promote=True)", errors.ECODE_STATE)
670
671 self.old_flags = old_flags = (node.master_candidate,
672 node.drained, node.offline)
673 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
674 self.old_role = old_role = self._F2R[old_flags]
675
676
677 for attr in self._FLAGS:
678 if getattr(self.op, attr) is False and getattr(node, attr) is False:
679 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
680 setattr(self.op, attr, None)
681
682
683
684
685
686 if SupportsOob(self.cfg, node):
687 if self.op.offline is False and not (node.powered or
688 self.op.powered is True):
689 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
690 " offline status can be reset") %
691 self.op.node_name, errors.ECODE_STATE)
692 elif self.op.powered is not None:
693 raise errors.OpPrereqError(("Unable to change powered state for node %s"
694 " as it does not support out-of-band"
695 " handling") % self.op.node_name,
696 errors.ECODE_STATE)
697
698
699 if (self.op.drained is False or self.op.offline is False or
700 (self.op.master_capable and not node.master_capable)):
701 if _DecideSelfPromotion(self):
702 self.op.master_candidate = True
703 self.LogInfo("Auto-promoting node to master candidate")
704
705
706 if self.op.master_capable is False and node.master_candidate:
707 if self.op.node_uuid == self.cfg.GetMasterNode():
708 raise errors.OpPrereqError("Master must remain master capable",
709 errors.ECODE_STATE)
710 self.LogInfo("Demoting from master candidate")
711 self.op.master_candidate = False
712
713
714 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
715 if self.op.master_candidate:
716 new_role = self._ROLE_CANDIDATE
717 elif self.op.drained:
718 new_role = self._ROLE_DRAINED
719 elif self.op.offline:
720 new_role = self._ROLE_OFFLINE
721 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
722
723
724 new_role = self._ROLE_REGULAR
725 else:
726 new_role = old_role
727
728 self.new_role = new_role
729
730 if old_role == self._ROLE_OFFLINE and new_role != old_role:
731
732 result = self.rpc.call_version([node.uuid])[node.uuid]
733 if result.fail_msg:
734 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
735 " to report its version: %s" %
736 (node.name, result.fail_msg),
737 errors.ECODE_STATE)
738 else:
739 self.LogWarning("Transitioning node from offline to online state"
740 " without using re-add. Please make sure the node"
741 " is healthy!")
742
743
744
745
746 if self.op.secondary_ip:
747
748 master = self.cfg.GetMasterNodeInfo()
749 master_singlehomed = master.secondary_ip == master.primary_ip
750 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
751 if self.op.force and node.uuid == master.uuid:
752 self.LogWarning("Transitioning from single-homed to multi-homed"
753 " cluster; all nodes will require a secondary IP"
754 " address")
755 else:
756 raise errors.OpPrereqError("Changing the secondary ip on a"
757 " single-homed cluster requires the"
758 " --force option to be passed, and the"
759 " target node to be the master",
760 errors.ECODE_INVAL)
761 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
762 if self.op.force and node.uuid == master.uuid:
763 self.LogWarning("Transitioning from multi-homed to single-homed"
764 " cluster; secondary IP addresses will have to be"
765 " removed")
766 else:
767 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
768 " same as the primary IP on a multi-homed"
769 " cluster, unless the --force option is"
770 " passed, and the target node is the"
771 " master", errors.ECODE_INVAL)
772
773 assert not (set([inst.name for inst in affected_instances.values()]) -
774 self.owned_locks(locking.LEVEL_INSTANCE))
775
776 if node.offline:
777 if affected_instances:
778 msg = ("Cannot change secondary IP address: offline node has"
779 " instances (%s) configured to use it" %
780 utils.CommaJoin(
781 [inst.name for inst in affected_instances.values()]))
782 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
783 else:
784
785
786 for instance in affected_instances.values():
787 CheckInstanceState(self, instance, INSTANCE_DOWN,
788 msg="cannot change secondary ip")
789
790 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
791 if master.uuid != node.uuid:
792
793 if not netutils.TcpPing(self.op.secondary_ip,
794 constants.DEFAULT_NODED_PORT,
795 source=master.secondary_ip):
796 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
797 " based ping to node daemon port",
798 errors.ECODE_ENVIRON)
799
800 if self.op.ndparams:
801 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
802 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
803 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
804 "node", "cluster or group")
805 self.new_ndparams = new_ndparams
806
807 if self.op.hv_state:
808 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
809 node.hv_state_static)
810
811 if self.op.disk_state:
812 self.new_disk_state = \
813 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
814
815 - def Exec(self, feedback_fn):
816 """Modifies a node.
817
818 """
819 node = self.cfg.GetNodeInfo(self.op.node_uuid)
820 result = []
821
822 if self.op.ndparams:
823 node.ndparams = self.new_ndparams
824
825 if self.op.powered is not None:
826 node.powered = self.op.powered
827
828 if self.op.hv_state:
829 node.hv_state_static = self.new_hv_state
830
831 if self.op.disk_state:
832 node.disk_state_static = self.new_disk_state
833
834 for attr in ["master_capable", "vm_capable"]:
835 val = getattr(self.op, attr)
836 if val is not None:
837 setattr(node, attr, val)
838 result.append((attr, str(val)))
839
840 if self.op.secondary_ip:
841 node.secondary_ip = self.op.secondary_ip
842 result.append(("secondary_ip", self.op.secondary_ip))
843
844
845 self.cfg.Update(node, feedback_fn)
846 master_node = self.cfg.GetMasterNode()
847 potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
848 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
849
850 if self.new_role != self.old_role:
851 new_flags = self._R2F[self.new_role]
852 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
853 if of != nf:
854 result.append((desc, str(nf)))
855 (node.master_candidate, node.drained, node.offline) = new_flags
856 self.cfg.Update(node, feedback_fn)
857
858
859
860
861 if self.old_role == self._ROLE_CANDIDATE and \
862 self.new_role != self._ROLE_OFFLINE:
863 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
864 if msg:
865 self.LogWarning("Node failed to demote itself: %s", msg)
866
867
868 if self.lock_all:
869 AdjustCandidatePool(
870 self, [node.uuid], master_node, potential_master_candidates,
871 feedback_fn, modify_ssh_setup)
872
873
874 if self.new_role == self._ROLE_CANDIDATE:
875 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid)
876
877 if self.old_role == self._ROLE_CANDIDATE:
878 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid)
879
880 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid])
881
882
883
884 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
885
886 if modify_ssh_setup:
887 if self.old_role == self._ROLE_CANDIDATE:
888 master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
889 ssh_result = self.rpc.call_node_ssh_key_remove(
890 [master_node],
891 node.uuid, node.name,
892 master_candidate_uuids, potential_master_candidates,
893 True,
894 False,
895 False,
896 False,
897 False,
898 self.op.debug,
899 self.op.verbose)
900 ssh_result[master_node].Raise(
901 "Could not adjust the SSH setup after demoting node '%s'"
902 " (UUID: %s)." % (node.name, node.uuid))
903 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn)
904
905 if self.new_role == self._ROLE_CANDIDATE:
906 AddMasterCandidateSshKey(
907 self, master_node, node, potential_master_candidates, feedback_fn)
908
909 return result
910
911
913 """Powercycles a node.
914
915 """
916 REQ_BGL = False
917
926
928 """Locking for PowercycleNode.
929
930 This is a last-resort option and shouldn't block on other
931 jobs. Therefore, we grab no locks.
932
933 """
934 self.needed_locks = {}
935
936 - def Exec(self, feedback_fn):
937 """Reboots a node.
938
939 """
940 default_hypervisor = self.cfg.GetHypervisorType()
941 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
942 result = self.rpc.call_node_powercycle(self.op.node_uuid,
943 default_hypervisor,
944 hvparams)
945 result.Raise("Failed to schedule the reboot")
946 return result.payload
947
948
951
952
954 """Returns primary instances on a node.
955
956 """
957 return _GetNodeInstancesInner(cfg,
958 lambda inst: node_uuid == inst.primary_node)
959
960
968
969
971 """Returns a list of all primary and secondary instances on a node.
972
973 """
974
975 return _GetNodeInstancesInner(cfg,
976 lambda inst: node_uuid in
977 cfg.GetInstanceNodes(inst.uuid))
978
979
981 """Evacuates instances off a list of nodes.
982
983 """
984 REQ_BGL = False
985
988
990 (self.op.node_uuid, self.op.node_name) = \
991 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
992
993 if self.op.remote_node is not None:
994 (self.op.remote_node_uuid, self.op.remote_node) = \
995 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
996 self.op.remote_node)
997 assert self.op.remote_node
998
999 if self.op.node_uuid == self.op.remote_node_uuid:
1000 raise errors.OpPrereqError("Can not use evacuated node as a new"
1001 " secondary node", errors.ECODE_INVAL)
1002
1003 if self.op.mode != constants.NODE_EVAC_SEC:
1004 raise errors.OpPrereqError("Without the use of an iallocator only"
1005 " secondary instances can be evacuated",
1006 errors.ECODE_INVAL)
1007
1008
1009 self.share_locks = ShareAll()
1010 self.needed_locks = {
1011 locking.LEVEL_INSTANCE: [],
1012 locking.LEVEL_NODEGROUP: [],
1013 locking.LEVEL_NODE: [],
1014 }
1015
1016
1017
1018 self.lock_nodes = self._DetermineNodes()
1019
1021 """Gets the list of node UUIDs to operate on.
1022
1023 """
1024 if self.op.remote_node is None:
1025
1026 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
1027 else:
1028 group_nodes = frozenset([self.op.remote_node_uuid])
1029
1030
1031 return set([self.op.node_uuid]) | group_nodes
1032
1061
1077
1079
1080 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
1081 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
1082 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1083
1084 need_nodes = self._DetermineNodes()
1085
1086 if not owned_nodes.issuperset(need_nodes):
1087 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
1088 " locks were acquired, current nodes are"
1089 " are '%s', used to be '%s'; retry the"
1090 " operation" %
1091 (self.op.node_name,
1092 utils.CommaJoin(need_nodes),
1093 utils.CommaJoin(owned_nodes)),
1094 errors.ECODE_STATE)
1095
1096 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
1097 if owned_groups != wanted_groups:
1098 raise errors.OpExecError("Node groups changed since locks were acquired,"
1099 " current groups are '%s', used to be '%s';"
1100 " retry the operation" %
1101 (utils.CommaJoin(wanted_groups),
1102 utils.CommaJoin(owned_groups)))
1103
1104
1105 self.instances = self._DetermineInstances()
1106 self.instance_names = [i.name for i in self.instances]
1107
1108 if set(self.instance_names) != owned_instance_names:
1109 raise errors.OpExecError("Instances on node '%s' changed since locks"
1110 " were acquired, current instances are '%s',"
1111 " used to be '%s'; retry the operation" %
1112 (self.op.node_name,
1113 utils.CommaJoin(self.instance_names),
1114 utils.CommaJoin(owned_instance_names)))
1115
1116 if self.instance_names:
1117 self.LogInfo("Evacuating instances from node '%s': %s",
1118 self.op.node_name,
1119 utils.CommaJoin(utils.NiceSort(self.instance_names)))
1120 else:
1121 self.LogInfo("No instances to evacuate from node '%s'",
1122 self.op.node_name)
1123
1124 if self.op.remote_node is not None:
1125 for i in self.instances:
1126 if i.primary_node == self.op.remote_node_uuid:
1127 raise errors.OpPrereqError("Node %s is the primary node of"
1128 " instance %s, cannot use it as"
1129 " secondary" %
1130 (self.op.remote_node, i.name),
1131 errors.ECODE_INVAL)
1132
1133 - def Exec(self, feedback_fn):
1134 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1135
1136 if not self.instance_names:
1137
1138 jobs = []
1139
1140 elif self.op.iallocator is not None:
1141
1142 req = iallocator.IAReqNodeEvac(
1143 evac_mode=self.op.mode, instances=list(self.instance_names),
1144 ignore_soft_errors=self.op.ignore_soft_errors)
1145 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1146
1147 ial.Run(self.op.iallocator)
1148
1149 if not ial.success:
1150 raise errors.OpPrereqError("Can't compute node evacuation using"
1151 " iallocator '%s': %s" %
1152 (self.op.iallocator, ial.info),
1153 errors.ECODE_NORES)
1154
1155 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1156
1157 elif self.op.remote_node is not None:
1158 assert self.op.mode == constants.NODE_EVAC_SEC
1159 jobs = [
1160 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1161 remote_node=self.op.remote_node,
1162 disks=[],
1163 mode=constants.REPLACE_DISK_CHG,
1164 early_release=self.op.early_release)]
1165 for instance_name in self.instance_names]
1166
1167 else:
1168 raise errors.ProgrammerError("No iallocator or remote node")
1169
1170 return ResultWithJobs(jobs)
1171
1172
1174 """Migrate all instances from a node.
1175
1176 """
1177 HPATH = "node-migrate"
1178 HTYPE = constants.HTYPE_NODE
1179 REQ_BGL = False
1180
1183
1185 (self.op.node_uuid, self.op.node_name) = \
1186 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1187
1188 self.share_locks = ShareAll()
1189 self.needed_locks = {
1190 locking.LEVEL_NODE: [self.op.node_uuid],
1191 }
1192
1194 """Build hooks env.
1195
1196 This runs on the master, the primary and all the secondaries.
1197
1198 """
1199 return {
1200 "NODE_NAME": self.op.node_name,
1201 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1202 }
1203
1205 """Build hooks nodes.
1206
1207 """
1208 nl = [self.cfg.GetMasterNode()]
1209 return (nl, nl)
1210
1213
1214 - def Exec(self, feedback_fn):
1215
1216 jobs = [
1217 [opcodes.OpInstanceMigrate(
1218 instance_name=inst.name,
1219 mode=self.op.mode,
1220 live=self.op.live,
1221 iallocator=self.op.iallocator,
1222 target_node=self.op.target_node,
1223 allow_runtime_changes=self.op.allow_runtime_changes,
1224 ignore_ipolicy=self.op.ignore_ipolicy)]
1225 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1226
1227
1228
1229
1230
1231
1232 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1233 frozenset([self.op.node_uuid]))
1234
1235 return ResultWithJobs(jobs)
1236
1237
1252
1253
1255 """Logical unit for modifying a storage volume on a node.
1256
1257 """
1258 REQ_BGL = False
1259
1261 (self.op.node_uuid, self.op.node_name) = \
1262 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1263
1264 storage_type = self.op.storage_type
1265
1266 try:
1267 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1268 except KeyError:
1269 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1270 " modified" % storage_type,
1271 errors.ECODE_INVAL)
1272
1273 diff = set(self.op.changes.keys()) - modifiable
1274 if diff:
1275 raise errors.OpPrereqError("The following fields can not be modified for"
1276 " storage units of type '%s': %r" %
1277 (storage_type, list(diff)),
1278 errors.ECODE_INVAL)
1279
1285
1287 self.needed_locks = {
1288 locking.LEVEL_NODE: self.op.node_uuid,
1289 }
1290
1291 - def Exec(self, feedback_fn):
1292 """Computes the list of nodes and their attributes.
1293
1294 """
1295 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1296 result = self.rpc.call_storage_modify(self.op.node_uuid,
1297 self.op.storage_type, st_args,
1298 self.op.name, self.op.changes)
1299 result.Raise("Failed to modify storage unit '%s' on %s" %
1300 (self.op.name, self.op.node_name))
1301
1302
1304 """Checks whether all selected fields are valid according to fields.
1305
1306 @type fields: L{utils.FieldSet}
1307 @param fields: fields set
1308 @type selected: L{utils.FieldSet}
1309 @param selected: fields set
1310
1311 """
1312 delta = fields.NonMatching(selected)
1313 if delta:
1314 raise errors.OpPrereqError("Unknown output fields selected: %s"
1315 % ",".join(delta), errors.ECODE_INVAL)
1316
1317
1319 """Logical unit for getting volumes on node(s).
1320
1321 """
1322 REQ_BGL = False
1323
1329
1341
1342 - def Exec(self, feedback_fn):
1393
1394
1396 """Logical unit for getting information on storage units on node(s).
1397
1398 """
1399 REQ_BGL = False
1400
1404
1416
1418 """Determines the default storage type of the cluster.
1419
1420 """
1421 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1422 default_storage_type = \
1423 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1424 return default_storage_type
1425
1427 """Check prerequisites.
1428
1429 """
1430 if self.op.storage_type:
1431 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1432 self.storage_type = self.op.storage_type
1433 else:
1434 self.storage_type = self._DetermineStorageType()
1435 supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1436 if self.storage_type not in supported_storage_types:
1437 raise errors.OpPrereqError(
1438 "Storage reporting for storage type '%s' is not supported. Please"
1439 " use the --storage-type option to specify one of the supported"
1440 " storage types (%s) or set the default disk template to one that"
1441 " supports storage reporting." %
1442 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1443
1444 - def Exec(self, feedback_fn):
1445 """Computes the list of nodes and their attributes.
1446
1447 """
1448 if self.op.storage_type:
1449 self.storage_type = self.op.storage_type
1450 else:
1451 self.storage_type = self._DetermineStorageType()
1452
1453 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1454
1455
1456 if constants.SF_NAME in self.op.output_fields:
1457 fields = self.op.output_fields[:]
1458 else:
1459 fields = [constants.SF_NAME] + self.op.output_fields
1460
1461
1462 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1463 while extra in fields:
1464 fields.remove(extra)
1465
1466 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1467 name_idx = field_idx[constants.SF_NAME]
1468
1469 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1470 data = self.rpc.call_storage_list(self.node_uuids,
1471 self.storage_type, st_args,
1472 self.op.name, fields)
1473
1474 result = []
1475
1476 for node_uuid in utils.NiceSort(self.node_uuids):
1477 node_name = self.cfg.GetNodeName(node_uuid)
1478 nresult = data[node_uuid]
1479 if nresult.offline:
1480 continue
1481
1482 msg = nresult.fail_msg
1483 if msg:
1484 self.LogWarning("Can't get storage data from node %s: %s",
1485 node_name, msg)
1486 continue
1487
1488 rows = dict([(row[name_idx], row) for row in nresult.payload])
1489
1490 for name in utils.NiceSort(rows.keys()):
1491 row = rows[name]
1492
1493 out = []
1494
1495 for field in self.op.output_fields:
1496 if field == constants.SF_NODE:
1497 val = node_name
1498 elif field == constants.SF_TYPE:
1499 val = self.storage_type
1500 elif field in field_idx:
1501 val = row[field_idx[field]]
1502 else:
1503 raise errors.ParameterError(field)
1504
1505 out.append(val)
1506
1507 result.append(out)
1508
1509 return result
1510
1511
1513 """Logical unit for removing a node.
1514
1515 """
1516 HPATH = "node-remove"
1517 HTYPE = constants.HTYPE_NODE
1518
1520 """Build hooks env.
1521
1522 """
1523 return {
1524 "OP_TARGET": self.op.node_name,
1525 "NODE_NAME": self.op.node_name,
1526 }
1527
1529 """Build hooks nodes.
1530
1531 This doesn't run on the target node in the pre phase as a failed
1532 node would then be impossible to remove.
1533
1534 """
1535 all_nodes = self.cfg.GetNodeList()
1536 try:
1537 all_nodes.remove(self.op.node_uuid)
1538 except ValueError:
1539 pass
1540 return (all_nodes, all_nodes)
1541
1570
1571 - def Exec(self, feedback_fn):
1572 """Removes the node from the cluster.
1573
1574 """
1575 logging.info("Stopping the node daemon and removing configs from node %s",
1576 self.node.name)
1577
1578 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1579
1580 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1581 "Not owning BGL"
1582
1583 master_node = self.cfg.GetMasterNode()
1584 potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
1585 if modify_ssh_setup:
1586
1587
1588 potential_master_candidate = \
1589 self.op.node_name in potential_master_candidates
1590 master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
1591 result = self.rpc.call_node_ssh_key_remove(
1592 [master_node],
1593 self.node.uuid, self.op.node_name,
1594 master_candidate_uuids, potential_master_candidates,
1595 self.node.master_candidate,
1596 potential_master_candidate,
1597 True,
1598 True,
1599 False,
1600 self.op.debug,
1601 self.op.verbose)
1602 result[master_node].Raise(
1603 "Could not remove the SSH key of node '%s' (UUID: %s)." %
1604 (self.op.node_name, self.node.uuid))
1605 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
1606
1607
1608 AdjustCandidatePool(
1609 self, [self.node.uuid], master_node, potential_master_candidates,
1610 feedback_fn, modify_ssh_setup)
1611 self.cfg.RemoveNode(self.node.uuid)
1612
1613
1614 RunPostHook(self, self.node.name)
1615
1616
1617
1618 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1619 msg = result.fail_msg
1620 if msg:
1621 self.LogWarning("Errors encountered on the remote node while leaving"
1622 " the cluster: %s", msg)
1623
1624 cluster = self.cfg.GetClusterInfo()
1625
1626
1627 if self.node.master_candidate:
1628 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid)
1629
1630
1631 if cluster.modify_etc_hosts:
1632 master_node_uuid = self.cfg.GetMasterNode()
1633 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1634 constants.ETC_HOSTS_REMOVE,
1635 self.node.name, None)
1636 result.Raise("Can't update hosts file with new host data")
1637 RedistributeAncillaryFiles(self)
1638
1639
1641 """Repairs the volume group on a node.
1642
1643 """
1644 REQ_BGL = False
1645
1657
1659 self.needed_locks = {
1660 locking.LEVEL_NODE: [self.op.node_uuid],
1661 }
1662
1678
1680 """Check prerequisites.
1681
1682 """
1683 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1684
1685
1686 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1687 if not inst.disks_active:
1688 continue
1689 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid))
1690 check_nodes.discard(self.op.node_uuid)
1691 for inst_node_uuid in check_nodes:
1692 self._CheckFaultyDisks(inst, inst_node_uuid)
1693
1694 - def Exec(self, feedback_fn):
1705