1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with nodes."""
32
33 import logging
34 import operator
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import netutils
40 from ganeti import objects
41 from ganeti import opcodes
42 import ganeti.rpc.node as rpc
43 from ganeti import utils
44 from ganeti.masterd import iallocator
45
46 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
47 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
48 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
49 IsExclusiveStorageEnabledNode, CheckNodePVs, \
50 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
51 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
52 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
53 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
54 FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \
55 AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \
56 EnsureKvmdOnNodes, WarnAboutFailedSshUpdates, AddMasterCandidateSshKey
57
58
68
69
71 """Ensure that a node has the given secondary ip.
72
73 @type lu: L{LogicalUnit}
74 @param lu: the LU on behalf of which we make the check
75 @type node: L{objects.Node}
76 @param node: the node to check
77 @type secondary_ip: string
78 @param secondary_ip: the ip to check
79 @type prereq: boolean
80 @param prereq: whether to throw a prerequisite or an execute error
81 @raise errors.OpPrereqError: if the node doesn't have the ip,
82 and prereq=True
83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
84
85 """
86
87
88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
89 result.Raise("Failure checking secondary ip on node %s" % node.name,
90 prereq=prereq, ecode=errors.ECODE_ENVIRON)
91 if not result.payload:
92 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
93 " please fix and re-run this command" % secondary_ip)
94 if prereq:
95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
96 else:
97 raise errors.OpExecError(msg)
98
99
101 """Logical unit for adding node to the cluster.
102
103 """
104 HPATH = "node-add"
105 HTYPE = constants.HTYPE_NODE
106 _NFLAGS = ["master_capable", "vm_capable"]
107
122
124 """Build hooks env.
125
126 This will run on all nodes before, and on all nodes + the new node after.
127
128 """
129 return {
130 "OP_TARGET": self.op.node_name,
131 "NODE_NAME": self.op.node_name,
132 "NODE_PIP": self.op.primary_ip,
133 "NODE_SIP": self.op.secondary_ip,
134 "MASTER_CAPABLE": str(self.op.master_capable),
135 "VM_CAPABLE": str(self.op.vm_capable),
136 }
137
139 """Build hooks nodes.
140
141 """
142 hook_nodes = self.cfg.GetNodeList()
143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
144 if new_node_info is not None:
145
146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
147
148
149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
155 """Cleans up if the hooks fail.
156
157 This function runs actions that necessary to bring the cluster into a
158 clean state again. This is necessary if for example the hooks of this
159 operation failed and leave the node in an inconsistent state.
160
161 """
162 if phase == constants.HOOKS_PHASE_PRE:
163 feedback_fn("Pre operation hook failed. Rolling back preparations.")
164
165 master_node = self.cfg.GetMasterNodeInfo().name
166 remove_result = self.rpc.call_node_ssh_key_remove_light(
167 [master_node],
168 self.op.node_name)
169 remove_result[master_node].Raise(
170 "Error removing SSH key of node '%s'." % self.op.node_name)
171
173 """Check prerequisites.
174
175 This checks:
176 - the new node is not already in the config
177 - it is resolvable
178 - its parameters (single/dual homed) matches the cluster
179
180 Any errors are signaled by raising errors.OpPrereqError.
181
182 """
183 node_name = self.hostname.name
184 self.op.primary_ip = self.hostname.ip
185 if self.op.secondary_ip is None:
186 if self.primary_ip_family == netutils.IP6Address.family:
187 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
188 " IPv4 address must be given as secondary",
189 errors.ECODE_INVAL)
190 self.op.secondary_ip = self.op.primary_ip
191
192 secondary_ip = self.op.secondary_ip
193 if not netutils.IP4Address.IsValid(secondary_ip):
194 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
195 " address" % secondary_ip, errors.ECODE_INVAL)
196
197 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
198 if not self.op.readd and existing_node_info is not None:
199 raise errors.OpPrereqError("Node %s is already in the configuration" %
200 node_name, errors.ECODE_EXISTS)
201 elif self.op.readd and existing_node_info is None:
202 raise errors.OpPrereqError("Node %s is not in the configuration" %
203 node_name, errors.ECODE_NOENT)
204
205 self.changed_primary_ip = False
206
207 for existing_node in self.cfg.GetAllNodesInfo().values():
208 if self.op.readd and node_name == existing_node.name:
209 if existing_node.secondary_ip != secondary_ip:
210 raise errors.OpPrereqError("Readded node doesn't have the same IP"
211 " address configuration as before",
212 errors.ECODE_INVAL)
213 if existing_node.primary_ip != self.op.primary_ip:
214 self.changed_primary_ip = True
215
216 continue
217
218 if (existing_node.primary_ip == self.op.primary_ip or
219 existing_node.secondary_ip == self.op.primary_ip or
220 existing_node.primary_ip == secondary_ip or
221 existing_node.secondary_ip == secondary_ip):
222 raise errors.OpPrereqError("New node ip address(es) conflict with"
223 " existing node %s" % existing_node.name,
224 errors.ECODE_NOTUNIQUE)
225
226
227
228 if self.op.readd:
229 assert existing_node_info is not None, \
230 "Can't retrieve locked node %s" % node_name
231 for attr in self._NFLAGS:
232 if getattr(self.op, attr) is None:
233 setattr(self.op, attr, getattr(existing_node_info, attr))
234 else:
235 for attr in self._NFLAGS:
236 if getattr(self.op, attr) is None:
237 setattr(self.op, attr, True)
238
239 if self.op.readd and not self.op.vm_capable:
240 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
241 if pri or sec:
242 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
243 " flag set to false, but it already holds"
244 " instances" % node_name,
245 errors.ECODE_STATE)
246
247
248
249 myself = self.cfg.GetMasterNodeInfo()
250 master_singlehomed = myself.secondary_ip == myself.primary_ip
251 newbie_singlehomed = secondary_ip == self.op.primary_ip
252 if master_singlehomed != newbie_singlehomed:
253 if master_singlehomed:
254 raise errors.OpPrereqError("The master has no secondary ip but the"
255 " new node has one",
256 errors.ECODE_INVAL)
257 else:
258 raise errors.OpPrereqError("The master has a secondary ip but the"
259 " new node doesn't have one",
260 errors.ECODE_INVAL)
261
262
263 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
264 raise errors.OpPrereqError("Node not reachable by ping",
265 errors.ECODE_ENVIRON)
266
267 if not newbie_singlehomed:
268
269 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
270 source=myself.secondary_ip):
271 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
272 " based ping to node daemon port",
273 errors.ECODE_ENVIRON)
274
275 if self.op.readd:
276 exceptions = [existing_node_info.uuid]
277 else:
278 exceptions = []
279
280 if self.op.master_capable:
281 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
282 else:
283 self.master_candidate = False
284
285 self.node_group = None
286 if self.op.readd:
287 self.new_node = existing_node_info
288 self.node_group = existing_node_info.group
289 else:
290 self.node_group = self.cfg.LookupNodeGroup(self.op.group)
291 self.new_node = objects.Node(name=node_name,
292 primary_ip=self.op.primary_ip,
293 secondary_ip=secondary_ip,
294 master_candidate=self.master_candidate,
295 offline=False, drained=False,
296 group=self.node_group, ndparams={})
297
298 if self.op.ndparams:
299 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
300 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
301 "node", "cluster or group")
302
303 if self.op.hv_state:
304 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
305
306 if self.op.disk_state:
307 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
308
309
310
311 rpcrunner = rpc.DnsOnlyRunner()
312 result = rpcrunner.call_version([node_name])[node_name]
313 result.Raise("Can't get version information from node %s" % node_name,
314 prereq=True, ecode=errors.ECODE_ENVIRON)
315 if constants.PROTOCOL_VERSION == result.payload:
316 logging.info("Communication to node %s fine, sw version %s match",
317 node_name, result.payload)
318 else:
319 raise errors.OpPrereqError("Version mismatch master version %s,"
320 " node version %s" %
321 (constants.PROTOCOL_VERSION, result.payload),
322 errors.ECODE_ENVIRON)
323
324 vg_name = self.cfg.GetVGName()
325 if vg_name is not None:
326 vparams = {constants.NV_PVLIST: [vg_name]}
327 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
328 cname = self.cfg.GetClusterName()
329 result = rpcrunner.call_node_verify_light(
330 [node_name], vparams, cname,
331 self.cfg.GetClusterInfo().hvparams,
332 )[node_name]
333 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
334 if errmsgs:
335 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
336 "; ".join(errmsgs), errors.ECODE_ENVIRON)
337
355
356 - def _SshUpdate(self, new_node_uuid, new_node_name, is_master_candidate,
357 is_potential_master_candidate, rpcrunner, readd, feedback_fn):
358 """Update the SSH setup of all nodes after adding a new node.
359
360 @type readd: boolean
361 @param readd: whether or not this node is readded
362
363 """
364 potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
365 master_node = self.cfg.GetMasterNode()
366
367 if readd:
368
369 master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
370 remove_result = rpcrunner.call_node_ssh_key_remove(
371 [master_node],
372 new_node_uuid, new_node_name,
373 master_candidate_uuids,
374 potential_master_candidates,
375 True,
376 True,
377 False,
378 True,
379 True,
380 self.op.debug,
381 self.op.verbose)
382 remove_result[master_node].Raise(
383 "Could not remove SSH keys of node %s before readding,"
384 " (UUID: %s)." % (new_node_name, new_node_uuid))
385 WarnAboutFailedSshUpdates(remove_result, master_node, feedback_fn)
386
387 result = rpcrunner.call_node_ssh_key_add(
388 [master_node], new_node_uuid, new_node_name,
389 potential_master_candidates,
390 is_master_candidate, is_potential_master_candidate,
391 is_potential_master_candidate, self.op.debug, self.op.verbose)
392
393 result[master_node].Raise("Could not update the node's SSH setup.")
394 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
395
396 - def Exec(self, feedback_fn):
397 """Adds the new node to the cluster.
398
399 """
400 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
401 "Not owning BGL"
402
403
404 self.new_node.powered = True
405
406
407
408
409
410 if self.op.readd:
411 self.new_node.offline = False
412 self.new_node.drained = False
413 self.LogInfo("Readding a node, the offline/drained flags were reset")
414
415 self.new_node.master_candidate = self.master_candidate
416 if self.changed_primary_ip:
417 self.new_node.primary_ip = self.op.primary_ip
418
419
420 for attr in self._NFLAGS:
421 setattr(self.new_node, attr, getattr(self.op, attr))
422
423
424 if self.new_node.master_candidate:
425 self.LogInfo("Node will be a master candidate")
426
427 if self.op.ndparams:
428 self.new_node.ndparams = self.op.ndparams
429 else:
430 self.new_node.ndparams = {}
431
432 if self.op.hv_state:
433 self.new_node.hv_state_static = self.new_hv_state
434
435 if self.op.disk_state:
436 self.new_node.disk_state_static = self.new_disk_state
437
438
439 if self.cfg.GetClusterInfo().modify_etc_hosts:
440 master_node = self.cfg.GetMasterNode()
441 result = self.rpc.call_etc_hosts_modify(
442 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
443 self.hostname.ip)
444 result.Raise("Can't update hosts file with new host data")
445
446 if self.new_node.secondary_ip != self.new_node.primary_ip:
447 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
448 False)
449
450 node_verifier_uuids = [self.cfg.GetMasterNode()]
451 node_verify_param = {
452 constants.NV_NODELIST: ([self.new_node.name], {}, []),
453
454 }
455
456 result = self.rpc.call_node_verify(
457 node_verifier_uuids, node_verify_param,
458 self.cfg.GetClusterName(),
459 self.cfg.GetClusterInfo().hvparams)
460 for verifier in node_verifier_uuids:
461 result[verifier].Raise("Cannot communicate with node %s" % verifier)
462 nl_payload = result[verifier].payload[constants.NV_NODELIST]
463 if nl_payload:
464 for failed in nl_payload:
465 feedback_fn("ssh/hostname verification failed"
466 " (checking from %s): %s" %
467 (verifier, nl_payload[failed]))
468 raise errors.OpExecError("ssh/hostname verification failed")
469
470 self._InitOpenVSwitch()
471
472 if self.op.readd:
473 RedistributeAncillaryFiles(self)
474
475 self.cfg.Update(self.new_node, feedback_fn)
476
477 if not self.new_node.master_candidate:
478 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
479 result.Warn("Node failed to demote itself from master candidate status",
480 self.LogWarning)
481 else:
482 self.cfg.AddNode(self.new_node, self.proc.GetECId())
483 RedistributeAncillaryFiles(self)
484
485
486 digest = GetClientCertDigest(self, self.new_node.uuid)
487 if self.new_node.master_candidate:
488 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest)
489 else:
490 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None)
491
492
493 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid],
494 silent_stop=True)
495
496
497 if self.op.node_setup:
498
499 self._SshUpdate(self.new_node.uuid, self.new_node.name,
500 self.new_node.master_candidate, True,
501 self.rpc, self.op.readd, feedback_fn)
502
503
505 """Modifies the parameters of a node.
506
507 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
508 to the node role (as _ROLE_*)
509 @cvar _R2F: a dictionary from node role to tuples of flags
510 @cvar _FLAGS: a list of attribute names corresponding to the flags
511
512 """
513 HPATH = "node-modify"
514 HTYPE = constants.HTYPE_NODE
515 REQ_BGL = False
516 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
517 _F2R = {
518 (True, False, False): _ROLE_CANDIDATE,
519 (False, True, False): _ROLE_DRAINED,
520 (False, False, True): _ROLE_OFFLINE,
521 (False, False, False): _ROLE_REGULAR,
522 }
523 _R2F = dict((v, k) for k, v in _F2R.items())
524 _FLAGS = ["master_candidate", "drained", "offline"]
525
527 (self.op.node_uuid, self.op.node_name) = \
528 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
529 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
530 self.op.master_capable, self.op.vm_capable,
531 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
532 self.op.disk_state]
533 if all_mods.count(None) == len(all_mods):
534 raise errors.OpPrereqError("Please pass at least one modification",
535 errors.ECODE_INVAL)
536 if all_mods.count(True) > 1:
537 raise errors.OpPrereqError("Can't set the node into more than one"
538 " state at the same time",
539 errors.ECODE_INVAL)
540
541
542 self.might_demote = (self.op.master_candidate is False or
543 self.op.offline is True or
544 self.op.drained is True or
545 self.op.master_capable is False)
546
547 if self.op.secondary_ip:
548 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
549 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
550 " address" % self.op.secondary_ip,
551 errors.ECODE_INVAL)
552
553 self.lock_all = self.op.auto_promote and self.might_demote
554 self.lock_instances = self.op.secondary_ip is not None
555
564
590
592 """Build hooks env.
593
594 This runs on the master node.
595
596 """
597 return {
598 "OP_TARGET": self.op.node_name,
599 "MASTER_CANDIDATE": str(self.op.master_candidate),
600 "OFFLINE": str(self.op.offline),
601 "DRAINED": str(self.op.drained),
602 "MASTER_CAPABLE": str(self.op.master_capable),
603 "VM_CAPABLE": str(self.op.vm_capable),
604 }
605
607 """Build hooks nodes.
608
609 """
610 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
611 return (nl, nl)
612
614 """Check prerequisites.
615
616 This only checks the instance list against the existing names.
617
618 """
619 node = self.cfg.GetNodeInfo(self.op.node_uuid)
620 if self.lock_instances:
621 affected_instances = \
622 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
623
624
625 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
626 wanted_instance_names = frozenset([inst.name for inst in
627 affected_instances.values()])
628 if wanted_instance_names - owned_instance_names:
629 raise errors.OpPrereqError("Instances affected by changing node %s's"
630 " secondary IP address have changed since"
631 " locks were acquired, wanted '%s', have"
632 " '%s'; retry the operation" %
633 (node.name,
634 utils.CommaJoin(wanted_instance_names),
635 utils.CommaJoin(owned_instance_names)),
636 errors.ECODE_STATE)
637 else:
638 affected_instances = None
639
640 if (self.op.master_candidate is not None or
641 self.op.drained is not None or
642 self.op.offline is not None):
643
644 if node.uuid == self.cfg.GetMasterNode():
645 raise errors.OpPrereqError("The master role can be changed"
646 " only via master-failover",
647 errors.ECODE_INVAL)
648
649 if self.op.master_candidate and not node.master_capable:
650 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
651 " it a master candidate" % node.name,
652 errors.ECODE_STATE)
653
654 if self.op.vm_capable is False:
655 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
656 if ipri or isec:
657 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
658 " the vm_capable flag" % node.name,
659 errors.ECODE_STATE)
660
661 if node.master_candidate and self.might_demote and not self.lock_all:
662 assert not self.op.auto_promote, "auto_promote set but lock_all not"
663
664
665 (mc_remaining, mc_should, _) = \
666 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
667 if mc_remaining < mc_should:
668 raise errors.OpPrereqError("Not enough master candidates, please"
669 " pass auto promote option to allow"
670 " promotion (--auto-promote or RAPI"
671 " auto_promote=True)", errors.ECODE_STATE)
672
673 self.old_flags = old_flags = (node.master_candidate,
674 node.drained, node.offline)
675 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
676 self.old_role = old_role = self._F2R[old_flags]
677
678
679 for attr in self._FLAGS:
680 if getattr(self.op, attr) is False and getattr(node, attr) is False:
681 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
682 setattr(self.op, attr, None)
683
684
685
686
687
688 if SupportsOob(self.cfg, node):
689 if self.op.offline is False and not (node.powered or
690 self.op.powered is True):
691 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
692 " offline status can be reset") %
693 self.op.node_name, errors.ECODE_STATE)
694 elif self.op.powered is not None:
695 raise errors.OpPrereqError(("Unable to change powered state for node %s"
696 " as it does not support out-of-band"
697 " handling") % self.op.node_name,
698 errors.ECODE_STATE)
699
700
701 if (self.op.drained is False or self.op.offline is False or
702 (self.op.master_capable and not node.master_capable)):
703 if _DecideSelfPromotion(self):
704 self.op.master_candidate = True
705 self.LogInfo("Auto-promoting node to master candidate")
706
707
708 if self.op.master_capable is False and node.master_candidate:
709 if self.op.node_uuid == self.cfg.GetMasterNode():
710 raise errors.OpPrereqError("Master must remain master capable",
711 errors.ECODE_STATE)
712 self.LogInfo("Demoting from master candidate")
713 self.op.master_candidate = False
714
715
716 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
717 if self.op.master_candidate:
718 new_role = self._ROLE_CANDIDATE
719 elif self.op.drained:
720 new_role = self._ROLE_DRAINED
721 elif self.op.offline:
722 new_role = self._ROLE_OFFLINE
723 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
724
725
726 new_role = self._ROLE_REGULAR
727 else:
728 new_role = old_role
729
730 self.new_role = new_role
731
732 if old_role == self._ROLE_OFFLINE and new_role != old_role:
733
734 result = self.rpc.call_version([node.uuid])[node.uuid]
735 if result.fail_msg:
736 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
737 " to report its version: %s" %
738 (node.name, result.fail_msg),
739 errors.ECODE_STATE)
740 else:
741 self.LogWarning("Transitioning node from offline to online state"
742 " without using re-add. Please make sure the node"
743 " is healthy!")
744
745
746
747
748 if self.op.secondary_ip:
749
750 master = self.cfg.GetMasterNodeInfo()
751 master_singlehomed = master.secondary_ip == master.primary_ip
752 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
753 if self.op.force and node.uuid == master.uuid:
754 self.LogWarning("Transitioning from single-homed to multi-homed"
755 " cluster; all nodes will require a secondary IP"
756 " address")
757 else:
758 raise errors.OpPrereqError("Changing the secondary ip on a"
759 " single-homed cluster requires the"
760 " --force option to be passed, and the"
761 " target node to be the master",
762 errors.ECODE_INVAL)
763 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
764 if self.op.force and node.uuid == master.uuid:
765 self.LogWarning("Transitioning from multi-homed to single-homed"
766 " cluster; secondary IP addresses will have to be"
767 " removed")
768 else:
769 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
770 " same as the primary IP on a multi-homed"
771 " cluster, unless the --force option is"
772 " passed, and the target node is the"
773 " master", errors.ECODE_INVAL)
774
775 assert not (set([inst.name for inst in affected_instances.values()]) -
776 self.owned_locks(locking.LEVEL_INSTANCE))
777
778 if node.offline:
779 if affected_instances:
780 msg = ("Cannot change secondary IP address: offline node has"
781 " instances (%s) configured to use it" %
782 utils.CommaJoin(
783 [inst.name for inst in affected_instances.values()]))
784 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
785 else:
786
787
788 for instance in affected_instances.values():
789 CheckInstanceState(self, instance, INSTANCE_DOWN,
790 msg="cannot change secondary ip")
791
792 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
793 if master.uuid != node.uuid:
794
795 if not netutils.TcpPing(self.op.secondary_ip,
796 constants.DEFAULT_NODED_PORT,
797 source=master.secondary_ip):
798 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
799 " based ping to node daemon port",
800 errors.ECODE_ENVIRON)
801
802 if self.op.ndparams:
803 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
804 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
805 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
806 "node", "cluster or group")
807 self.new_ndparams = new_ndparams
808
809 if self.op.hv_state:
810 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
811 node.hv_state_static)
812
813 if self.op.disk_state:
814 self.new_disk_state = \
815 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
816
817 - def Exec(self, feedback_fn):
818 """Modifies a node.
819
820 """
821 node = self.cfg.GetNodeInfo(self.op.node_uuid)
822 result = []
823
824 if self.op.ndparams:
825 node.ndparams = self.new_ndparams
826
827 if self.op.powered is not None:
828 node.powered = self.op.powered
829
830 if self.op.hv_state:
831 node.hv_state_static = self.new_hv_state
832
833 if self.op.disk_state:
834 node.disk_state_static = self.new_disk_state
835
836 for attr in ["master_capable", "vm_capable"]:
837 val = getattr(self.op, attr)
838 if val is not None:
839 setattr(node, attr, val)
840 result.append((attr, str(val)))
841
842 if self.op.secondary_ip:
843 node.secondary_ip = self.op.secondary_ip
844 result.append(("secondary_ip", self.op.secondary_ip))
845
846
847 self.cfg.Update(node, feedback_fn)
848 master_node = self.cfg.GetMasterNode()
849 potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
850 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
851
852 if self.new_role != self.old_role:
853 new_flags = self._R2F[self.new_role]
854 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
855 if of != nf:
856 result.append((desc, str(nf)))
857 (node.master_candidate, node.drained, node.offline) = new_flags
858 self.cfg.Update(node, feedback_fn)
859
860
861
862
863 if self.old_role == self._ROLE_CANDIDATE and \
864 self.new_role != self._ROLE_OFFLINE:
865 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
866 if msg:
867 self.LogWarning("Node failed to demote itself: %s", msg)
868
869
870 if self.lock_all:
871 AdjustCandidatePool(
872 self, [node.uuid], master_node, potential_master_candidates,
873 feedback_fn, modify_ssh_setup)
874
875
876 if self.new_role == self._ROLE_CANDIDATE:
877 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid)
878
879 if self.old_role == self._ROLE_CANDIDATE:
880 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid)
881
882
883 silent_stop = constants.HT_KVM not in \
884 self.cfg.GetClusterInfo().enabled_hypervisors
885 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid],
886 silent_stop=silent_stop)
887
888
889
890 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
891
892 if modify_ssh_setup:
893 if self.old_role == self._ROLE_CANDIDATE:
894 master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
895 ssh_result = self.rpc.call_node_ssh_key_remove(
896 [master_node],
897 node.uuid, node.name,
898 master_candidate_uuids, potential_master_candidates,
899 True,
900 False,
901 False,
902 False,
903 False,
904 self.op.debug,
905 self.op.verbose)
906 ssh_result[master_node].Raise(
907 "Could not adjust the SSH setup after demoting node '%s'"
908 " (UUID: %s)." % (node.name, node.uuid))
909 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn)
910
911 if self.new_role == self._ROLE_CANDIDATE:
912 AddMasterCandidateSshKey(
913 self, master_node, node, potential_master_candidates, feedback_fn)
914
915 return result
916
917
919 """Powercycles a node.
920
921 """
922 REQ_BGL = False
923
932
934 """Locking for PowercycleNode.
935
936 This is a last-resort option and shouldn't block on other
937 jobs. Therefore, we grab no locks.
938
939 """
940 self.needed_locks = {}
941
942 - def Exec(self, feedback_fn):
943 """Reboots a node.
944
945 """
946 default_hypervisor = self.cfg.GetHypervisorType()
947 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
948 result = self.rpc.call_node_powercycle(self.op.node_uuid,
949 default_hypervisor,
950 hvparams)
951 result.Raise("Failed to schedule the reboot")
952 return result.payload
953
954
957
958
960 """Returns primary instances on a node.
961
962 """
963 return _GetNodeInstancesInner(cfg,
964 lambda inst: node_uuid == inst.primary_node)
965
966
974
975
977 """Returns a list of all primary and secondary instances on a node.
978
979 """
980
981 return _GetNodeInstancesInner(cfg,
982 lambda inst: node_uuid in
983 cfg.GetInstanceNodes(inst.uuid))
984
985
987 """Evacuates instances off a list of nodes.
988
989 """
990 REQ_BGL = False
991
994
996 (self.op.node_uuid, self.op.node_name) = \
997 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
998
999 if self.op.remote_node is not None:
1000 (self.op.remote_node_uuid, self.op.remote_node) = \
1001 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1002 self.op.remote_node)
1003 assert self.op.remote_node
1004
1005 if self.op.node_uuid == self.op.remote_node_uuid:
1006 raise errors.OpPrereqError("Can not use evacuated node as a new"
1007 " secondary node", errors.ECODE_INVAL)
1008
1009 if self.op.mode != constants.NODE_EVAC_SEC:
1010 raise errors.OpPrereqError("Without the use of an iallocator only"
1011 " secondary instances can be evacuated",
1012 errors.ECODE_INVAL)
1013
1014
1015 self.share_locks = ShareAll()
1016 self.needed_locks = {
1017 locking.LEVEL_INSTANCE: [],
1018 locking.LEVEL_NODEGROUP: [],
1019 locking.LEVEL_NODE: [],
1020 }
1021
1022
1023
1024 self.lock_nodes = self._DetermineNodes()
1025
1027 """Gets the list of node UUIDs to operate on.
1028
1029 """
1030 if self.op.remote_node is None:
1031
1032 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
1033 else:
1034 group_nodes = frozenset([self.op.remote_node_uuid])
1035
1036
1037 return set([self.op.node_uuid]) | group_nodes
1038
1067
1083
1085
1086 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
1087 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
1088 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1089
1090 need_nodes = self._DetermineNodes()
1091
1092 if not owned_nodes.issuperset(need_nodes):
1093 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
1094 " locks were acquired, current nodes are"
1095 " are '%s', used to be '%s'; retry the"
1096 " operation" %
1097 (self.op.node_name,
1098 utils.CommaJoin(need_nodes),
1099 utils.CommaJoin(owned_nodes)),
1100 errors.ECODE_STATE)
1101
1102 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
1103 if owned_groups != wanted_groups:
1104 raise errors.OpExecError("Node groups changed since locks were acquired,"
1105 " current groups are '%s', used to be '%s';"
1106 " retry the operation" %
1107 (utils.CommaJoin(wanted_groups),
1108 utils.CommaJoin(owned_groups)))
1109
1110
1111 self.instances = self._DetermineInstances()
1112 self.instance_names = [i.name for i in self.instances]
1113
1114 if set(self.instance_names) != owned_instance_names:
1115 raise errors.OpExecError("Instances on node '%s' changed since locks"
1116 " were acquired, current instances are '%s',"
1117 " used to be '%s'; retry the operation" %
1118 (self.op.node_name,
1119 utils.CommaJoin(self.instance_names),
1120 utils.CommaJoin(owned_instance_names)))
1121
1122 if self.instance_names:
1123 self.LogInfo("Evacuating instances from node '%s': %s",
1124 self.op.node_name,
1125 utils.CommaJoin(utils.NiceSort(self.instance_names)))
1126 else:
1127 self.LogInfo("No instances to evacuate from node '%s'",
1128 self.op.node_name)
1129
1130 if self.op.remote_node is not None:
1131 for i in self.instances:
1132 if i.primary_node == self.op.remote_node_uuid:
1133 raise errors.OpPrereqError("Node %s is the primary node of"
1134 " instance %s, cannot use it as"
1135 " secondary" %
1136 (self.op.remote_node, i.name),
1137 errors.ECODE_INVAL)
1138
1139 - def Exec(self, feedback_fn):
1140 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1141
1142 if not self.instance_names:
1143
1144 jobs = []
1145
1146 elif self.op.iallocator is not None:
1147
1148 req = iallocator.IAReqNodeEvac(
1149 evac_mode=self.op.mode, instances=list(self.instance_names),
1150 ignore_soft_errors=self.op.ignore_soft_errors)
1151 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1152
1153 ial.Run(self.op.iallocator)
1154
1155 if not ial.success:
1156 raise errors.OpPrereqError("Can't compute node evacuation using"
1157 " iallocator '%s': %s" %
1158 (self.op.iallocator, ial.info),
1159 errors.ECODE_NORES)
1160
1161 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1162
1163 elif self.op.remote_node is not None:
1164 assert self.op.mode == constants.NODE_EVAC_SEC
1165 jobs = [
1166 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1167 remote_node=self.op.remote_node,
1168 disks=[],
1169 mode=constants.REPLACE_DISK_CHG,
1170 early_release=self.op.early_release)]
1171 for instance_name in self.instance_names]
1172
1173 else:
1174 raise errors.ProgrammerError("No iallocator or remote node")
1175
1176 return ResultWithJobs(jobs)
1177
1178
1180 """Migrate all instances from a node.
1181
1182 """
1183 HPATH = "node-migrate"
1184 HTYPE = constants.HTYPE_NODE
1185 REQ_BGL = False
1186
1189
1191 (self.op.node_uuid, self.op.node_name) = \
1192 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1193
1194 self.share_locks = ShareAll()
1195 self.needed_locks = {
1196 locking.LEVEL_NODE: [self.op.node_uuid],
1197 }
1198
1200 """Build hooks env.
1201
1202 This runs on the master, the primary and all the secondaries.
1203
1204 """
1205 return {
1206 "NODE_NAME": self.op.node_name,
1207 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1208 }
1209
1211 """Build hooks nodes.
1212
1213 """
1214 nl = [self.cfg.GetMasterNode()]
1215 return (nl, nl)
1216
1219
1220 - def Exec(self, feedback_fn):
1221
1222 jobs = [
1223 [opcodes.OpInstanceMigrate(
1224 instance_name=inst.name,
1225 mode=self.op.mode,
1226 live=self.op.live,
1227 iallocator=self.op.iallocator,
1228 target_node=self.op.target_node,
1229 allow_runtime_changes=self.op.allow_runtime_changes,
1230 ignore_ipolicy=self.op.ignore_ipolicy)]
1231 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1232
1233
1234
1235
1236
1237
1238 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1239 frozenset([self.op.node_uuid]))
1240
1241 return ResultWithJobs(jobs)
1242
1243
1258
1259
1261 """Logical unit for modifying a storage volume on a node.
1262
1263 """
1264 REQ_BGL = False
1265
1267 (self.op.node_uuid, self.op.node_name) = \
1268 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1269
1270 storage_type = self.op.storage_type
1271
1272 try:
1273 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1274 except KeyError:
1275 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1276 " modified" % storage_type,
1277 errors.ECODE_INVAL)
1278
1279 diff = set(self.op.changes.keys()) - modifiable
1280 if diff:
1281 raise errors.OpPrereqError("The following fields can not be modified for"
1282 " storage units of type '%s': %r" %
1283 (storage_type, list(diff)),
1284 errors.ECODE_INVAL)
1285
1291
1293 self.needed_locks = {
1294 locking.LEVEL_NODE: self.op.node_uuid,
1295 }
1296
1297 - def Exec(self, feedback_fn):
1298 """Computes the list of nodes and their attributes.
1299
1300 """
1301 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1302 result = self.rpc.call_storage_modify(self.op.node_uuid,
1303 self.op.storage_type, st_args,
1304 self.op.name, self.op.changes)
1305 result.Raise("Failed to modify storage unit '%s' on %s" %
1306 (self.op.name, self.op.node_name))
1307
1308
1310 """Checks whether all selected fields are valid according to fields.
1311
1312 @type fields: L{utils.FieldSet}
1313 @param fields: fields set
1314 @type selected: L{utils.FieldSet}
1315 @param selected: fields set
1316
1317 """
1318 delta = fields.NonMatching(selected)
1319 if delta:
1320 raise errors.OpPrereqError("Unknown output fields selected: %s"
1321 % ",".join(delta), errors.ECODE_INVAL)
1322
1323
1325 """Logical unit for getting volumes on node(s).
1326
1327 """
1328 REQ_BGL = False
1329
1335
1347
1348 - def Exec(self, feedback_fn):
1399
1400
1402 """Logical unit for getting information on storage units on node(s).
1403
1404 """
1405 REQ_BGL = False
1406
1410
1422
1424 """Determines the default storage type of the cluster.
1425
1426 """
1427 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1428 default_storage_type = \
1429 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1430 return default_storage_type
1431
1433 """Check prerequisites.
1434
1435 """
1436 if self.op.storage_type:
1437 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1438 self.storage_type = self.op.storage_type
1439 else:
1440 self.storage_type = self._DetermineStorageType()
1441 supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1442 if self.storage_type not in supported_storage_types:
1443 raise errors.OpPrereqError(
1444 "Storage reporting for storage type '%s' is not supported. Please"
1445 " use the --storage-type option to specify one of the supported"
1446 " storage types (%s) or set the default disk template to one that"
1447 " supports storage reporting." %
1448 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1449
1450 - def Exec(self, feedback_fn):
1451 """Computes the list of nodes and their attributes.
1452
1453 """
1454 if self.op.storage_type:
1455 self.storage_type = self.op.storage_type
1456 else:
1457 self.storage_type = self._DetermineStorageType()
1458
1459 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1460
1461
1462 if constants.SF_NAME in self.op.output_fields:
1463 fields = self.op.output_fields[:]
1464 else:
1465 fields = [constants.SF_NAME] + self.op.output_fields
1466
1467
1468 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1469 while extra in fields:
1470 fields.remove(extra)
1471
1472 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1473 name_idx = field_idx[constants.SF_NAME]
1474
1475 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1476 data = self.rpc.call_storage_list(self.node_uuids,
1477 self.storage_type, st_args,
1478 self.op.name, fields)
1479
1480 result = []
1481
1482 for node_uuid in utils.NiceSort(self.node_uuids):
1483 node_name = self.cfg.GetNodeName(node_uuid)
1484 nresult = data[node_uuid]
1485 if nresult.offline:
1486 continue
1487
1488 msg = nresult.fail_msg
1489 if msg:
1490 self.LogWarning("Can't get storage data from node %s: %s",
1491 node_name, msg)
1492 continue
1493
1494 rows = dict([(row[name_idx], row) for row in nresult.payload])
1495
1496 for name in utils.NiceSort(rows.keys()):
1497 row = rows[name]
1498
1499 out = []
1500
1501 for field in self.op.output_fields:
1502 if field == constants.SF_NODE:
1503 val = node_name
1504 elif field == constants.SF_TYPE:
1505 val = self.storage_type
1506 elif field in field_idx:
1507 val = row[field_idx[field]]
1508 else:
1509 raise errors.ParameterError(field)
1510
1511 out.append(val)
1512
1513 result.append(out)
1514
1515 return result
1516
1517
1519 """Logical unit for removing a node.
1520
1521 """
1522 HPATH = "node-remove"
1523 HTYPE = constants.HTYPE_NODE
1524
1526 """Build hooks env.
1527
1528 """
1529 return {
1530 "OP_TARGET": self.op.node_name,
1531 "NODE_NAME": self.op.node_name,
1532 }
1533
1535 """Build hooks nodes.
1536
1537 This doesn't run on the target node in the pre phase as a failed
1538 node would then be impossible to remove.
1539
1540 """
1541 all_nodes = self.cfg.GetNodeList()
1542 try:
1543 all_nodes.remove(self.op.node_uuid)
1544 except ValueError:
1545 pass
1546 return (all_nodes, all_nodes)
1547
1576
1577 - def Exec(self, feedback_fn):
1578 """Removes the node from the cluster.
1579
1580 """
1581 logging.info("Stopping the node daemon and removing configs from node %s",
1582 self.node.name)
1583
1584 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1585
1586 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1587 "Not owning BGL"
1588
1589 master_node = self.cfg.GetMasterNode()
1590 potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
1591 if modify_ssh_setup:
1592
1593
1594 potential_master_candidate = \
1595 self.op.node_name in potential_master_candidates
1596 master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
1597 result = self.rpc.call_node_ssh_key_remove(
1598 [master_node],
1599 self.node.uuid, self.op.node_name,
1600 master_candidate_uuids, potential_master_candidates,
1601 self.node.master_candidate,
1602 potential_master_candidate,
1603 True,
1604 True,
1605 False,
1606 self.op.debug,
1607 self.op.verbose)
1608 result[master_node].Raise(
1609 "Could not remove the SSH key of node '%s' (UUID: %s)." %
1610 (self.op.node_name, self.node.uuid))
1611 WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
1612
1613
1614 AdjustCandidatePool(
1615 self, [self.node.uuid], master_node, potential_master_candidates,
1616 feedback_fn, modify_ssh_setup)
1617 self.cfg.RemoveNode(self.node.uuid)
1618
1619
1620 RunPostHook(self, self.node.uuid)
1621
1622
1623
1624 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1625 msg = result.fail_msg
1626 if msg:
1627 self.LogWarning("Errors encountered on the remote node while leaving"
1628 " the cluster: %s", msg)
1629
1630 cluster = self.cfg.GetClusterInfo()
1631
1632
1633 if self.node.master_candidate:
1634 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid)
1635
1636
1637 if cluster.modify_etc_hosts:
1638 master_node_uuid = self.cfg.GetMasterNode()
1639 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1640 constants.ETC_HOSTS_REMOVE,
1641 self.node.name, None)
1642 result.Raise("Can't update hosts file with new host data")
1643 RedistributeAncillaryFiles(self)
1644
1645
1647 """Repairs the volume group on a node.
1648
1649 """
1650 REQ_BGL = False
1651
1663
1665 self.needed_locks = {
1666 locking.LEVEL_NODE: [self.op.node_uuid],
1667 }
1668
1684
1686 """Check prerequisites.
1687
1688 """
1689 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1690
1691
1692 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1693 if not inst.disks_active:
1694 continue
1695 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid))
1696 check_nodes.discard(self.op.node_uuid)
1697 for inst_node_uuid in check_nodes:
1698 self._CheckFaultyDisks(inst, inst_node_uuid)
1699
1700 - def Exec(self, feedback_fn):
1711