1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with nodes."""
32
33 import logging
34 import operator
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import netutils
40 from ganeti import objects
41 from ganeti import opcodes
42 import ganeti.rpc.node as rpc
43 from ganeti import utils
44 from ganeti.masterd import iallocator
45
46 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
47 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
48 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
49 IsExclusiveStorageEnabledNode, CheckNodePVs, \
50 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
51 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
52 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
53 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
54 FindFaultyInstanceDisks, CheckStorageTypeEnabled, CreateNewClientCert, \
55 AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \
56 EnsureKvmdOnNodes
57
58
68
69
71 """Ensure that a node has the given secondary ip.
72
73 @type lu: L{LogicalUnit}
74 @param lu: the LU on behalf of which we make the check
75 @type node: L{objects.Node}
76 @param node: the node to check
77 @type secondary_ip: string
78 @param secondary_ip: the ip to check
79 @type prereq: boolean
80 @param prereq: whether to throw a prerequisite or an execute error
81 @raise errors.OpPrereqError: if the node doesn't have the ip,
82 and prereq=True
83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
84
85 """
86
87
88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
89 result.Raise("Failure checking secondary ip on node %s" % node.name,
90 prereq=prereq, ecode=errors.ECODE_ENVIRON)
91 if not result.payload:
92 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
93 " please fix and re-run this command" % secondary_ip)
94 if prereq:
95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
96 else:
97 raise errors.OpExecError(msg)
98
99
101 """Logical unit for adding node to the cluster.
102
103 """
104 HPATH = "node-add"
105 HTYPE = constants.HTYPE_NODE
106 _NFLAGS = ["master_capable", "vm_capable"]
107
122
124 """Build hooks env.
125
126 This will run on all nodes before, and on all nodes + the new node after.
127
128 """
129 return {
130 "OP_TARGET": self.op.node_name,
131 "NODE_NAME": self.op.node_name,
132 "NODE_PIP": self.op.primary_ip,
133 "NODE_SIP": self.op.secondary_ip,
134 "MASTER_CAPABLE": str(self.op.master_capable),
135 "VM_CAPABLE": str(self.op.vm_capable),
136 }
137
139 """Build hooks nodes.
140
141 """
142 hook_nodes = self.cfg.GetNodeList()
143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
144 if new_node_info is not None:
145
146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
147
148
149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
155 """Check prerequisites.
156
157 This checks:
158 - the new node is not already in the config
159 - it is resolvable
160 - its parameters (single/dual homed) matches the cluster
161
162 Any errors are signaled by raising errors.OpPrereqError.
163
164 """
165 node_name = self.hostname.name
166 self.op.primary_ip = self.hostname.ip
167 if self.op.secondary_ip is None:
168 if self.primary_ip_family == netutils.IP6Address.family:
169 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
170 " IPv4 address must be given as secondary",
171 errors.ECODE_INVAL)
172 self.op.secondary_ip = self.op.primary_ip
173
174 secondary_ip = self.op.secondary_ip
175 if not netutils.IP4Address.IsValid(secondary_ip):
176 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
177 " address" % secondary_ip, errors.ECODE_INVAL)
178
179 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
180 if not self.op.readd and existing_node_info is not None:
181 raise errors.OpPrereqError("Node %s is already in the configuration" %
182 node_name, errors.ECODE_EXISTS)
183 elif self.op.readd and existing_node_info is None:
184 raise errors.OpPrereqError("Node %s is not in the configuration" %
185 node_name, errors.ECODE_NOENT)
186
187 self.changed_primary_ip = False
188
189 for existing_node in self.cfg.GetAllNodesInfo().values():
190 if self.op.readd and node_name == existing_node.name:
191 if existing_node.secondary_ip != secondary_ip:
192 raise errors.OpPrereqError("Readded node doesn't have the same IP"
193 " address configuration as before",
194 errors.ECODE_INVAL)
195 if existing_node.primary_ip != self.op.primary_ip:
196 self.changed_primary_ip = True
197
198 continue
199
200 if (existing_node.primary_ip == self.op.primary_ip or
201 existing_node.secondary_ip == self.op.primary_ip or
202 existing_node.primary_ip == secondary_ip or
203 existing_node.secondary_ip == secondary_ip):
204 raise errors.OpPrereqError("New node ip address(es) conflict with"
205 " existing node %s" % existing_node.name,
206 errors.ECODE_NOTUNIQUE)
207
208
209
210 if self.op.readd:
211 assert existing_node_info is not None, \
212 "Can't retrieve locked node %s" % node_name
213 for attr in self._NFLAGS:
214 if getattr(self.op, attr) is None:
215 setattr(self.op, attr, getattr(existing_node_info, attr))
216 else:
217 for attr in self._NFLAGS:
218 if getattr(self.op, attr) is None:
219 setattr(self.op, attr, True)
220
221 if self.op.readd and not self.op.vm_capable:
222 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
223 if pri or sec:
224 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
225 " flag set to false, but it already holds"
226 " instances" % node_name,
227 errors.ECODE_STATE)
228
229
230
231 myself = self.cfg.GetMasterNodeInfo()
232 master_singlehomed = myself.secondary_ip == myself.primary_ip
233 newbie_singlehomed = secondary_ip == self.op.primary_ip
234 if master_singlehomed != newbie_singlehomed:
235 if master_singlehomed:
236 raise errors.OpPrereqError("The master has no secondary ip but the"
237 " new node has one",
238 errors.ECODE_INVAL)
239 else:
240 raise errors.OpPrereqError("The master has a secondary ip but the"
241 " new node doesn't have one",
242 errors.ECODE_INVAL)
243
244
245 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
246 raise errors.OpPrereqError("Node not reachable by ping",
247 errors.ECODE_ENVIRON)
248
249 if not newbie_singlehomed:
250
251 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
252 source=myself.secondary_ip):
253 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
254 " based ping to node daemon port",
255 errors.ECODE_ENVIRON)
256
257 if self.op.readd:
258 exceptions = [existing_node_info.uuid]
259 else:
260 exceptions = []
261
262 if self.op.master_capable:
263 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
264 else:
265 self.master_candidate = False
266
267 self.node_group = None
268 if self.op.readd:
269 self.new_node = existing_node_info
270 self.node_group = existing_node_info.group
271 else:
272 self.node_group = self.cfg.LookupNodeGroup(self.op.group)
273 self.new_node = objects.Node(name=node_name,
274 primary_ip=self.op.primary_ip,
275 secondary_ip=secondary_ip,
276 master_candidate=self.master_candidate,
277 offline=False, drained=False,
278 group=self.node_group, ndparams={})
279
280 if self.op.ndparams:
281 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
282 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
283 "node", "cluster or group")
284
285 if self.op.hv_state:
286 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
287
288 if self.op.disk_state:
289 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
290
291
292
293 rpcrunner = rpc.DnsOnlyRunner()
294 result = rpcrunner.call_version([node_name])[node_name]
295 result.Raise("Can't get version information from node %s" % node_name,
296 prereq=True)
297 if constants.PROTOCOL_VERSION == result.payload:
298 logging.info("Communication to node %s fine, sw version %s match",
299 node_name, result.payload)
300 else:
301 raise errors.OpPrereqError("Version mismatch master version %s,"
302 " node version %s" %
303 (constants.PROTOCOL_VERSION, result.payload),
304 errors.ECODE_ENVIRON)
305
306 vg_name = self.cfg.GetVGName()
307 if vg_name is not None:
308 vparams = {constants.NV_PVLIST: [vg_name]}
309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
310 cname = self.cfg.GetClusterName()
311 result = rpcrunner.call_node_verify_light(
312 [node_name], vparams, cname,
313 self.cfg.GetClusterInfo().hvparams,
314 {node_name: self.node_group},
315 self.cfg.GetAllNodeGroupsInfoDict()
316 )[node_name]
317 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
318 if errmsgs:
319 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
320 "; ".join(errmsgs), errors.ECODE_ENVIRON)
321
339
340 - def Exec(self, feedback_fn):
341 """Adds the new node to the cluster.
342
343 """
344 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
345 "Not owning BGL"
346
347
348 self.new_node.powered = True
349
350
351
352
353
354 if self.op.readd:
355 self.new_node.offline = False
356 self.new_node.drained = False
357 self.LogInfo("Readding a node, the offline/drained flags were reset")
358
359 self.new_node.master_candidate = self.master_candidate
360 if self.changed_primary_ip:
361 self.new_node.primary_ip = self.op.primary_ip
362
363
364 for attr in self._NFLAGS:
365 setattr(self.new_node, attr, getattr(self.op, attr))
366
367
368 if self.new_node.master_candidate:
369 self.LogInfo("Node will be a master candidate")
370
371 if self.op.ndparams:
372 self.new_node.ndparams = self.op.ndparams
373 else:
374 self.new_node.ndparams = {}
375
376 if self.op.hv_state:
377 self.new_node.hv_state_static = self.new_hv_state
378
379 if self.op.disk_state:
380 self.new_node.disk_state_static = self.new_disk_state
381
382
383 if self.cfg.GetClusterInfo().modify_etc_hosts:
384 master_node = self.cfg.GetMasterNode()
385 result = self.rpc.call_etc_hosts_modify(
386 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
387 self.hostname.ip)
388 result.Raise("Can't update hosts file with new host data")
389
390 if self.new_node.secondary_ip != self.new_node.primary_ip:
391 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
392 False)
393
394 node_verifier_uuids = [self.cfg.GetMasterNode()]
395 node_verify_param = {
396 constants.NV_NODELIST: ([self.new_node.name], {}),
397
398 }
399
400 result = self.rpc.call_node_verify(
401 node_verifier_uuids, node_verify_param,
402 self.cfg.GetClusterName(),
403 self.cfg.GetClusterInfo().hvparams,
404 {self.new_node.name: self.cfg.LookupNodeGroup(self.node_group)},
405 self.cfg.GetAllNodeGroupsInfoDict()
406 )
407 for verifier in node_verifier_uuids:
408 result[verifier].Raise("Cannot communicate with node %s" % verifier)
409 nl_payload = result[verifier].payload[constants.NV_NODELIST]
410 if nl_payload:
411 for failed in nl_payload:
412 feedback_fn("ssh/hostname verification failed"
413 " (checking from %s): %s" %
414 (verifier, nl_payload[failed]))
415 raise errors.OpExecError("ssh/hostname verification failed")
416
417 self._InitOpenVSwitch()
418
419 if self.op.readd:
420 self.context.ReaddNode(self.new_node)
421 RedistributeAncillaryFiles(self)
422
423 self.cfg.Update(self.new_node, feedback_fn)
424
425 if not self.new_node.master_candidate:
426 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
427 result.Warn("Node failed to demote itself from master candidate status",
428 self.LogWarning)
429 else:
430 self.context.AddNode(self.new_node, self.proc.GetECId())
431 RedistributeAncillaryFiles(self)
432
433 cluster = self.cfg.GetClusterInfo()
434
435 digest = CreateNewClientCert(self, self.new_node.uuid)
436 if self.new_node.master_candidate:
437 utils.AddNodeToCandidateCerts(self.new_node.uuid, digest,
438 cluster.candidate_certs)
439 self.cfg.Update(cluster, feedback_fn)
440 else:
441 if self.new_node.uuid in cluster.candidate_certs:
442 utils.RemoveNodeFromCandidateCerts(self.new_node.uuid,
443 cluster.candidate_certs)
444 self.cfg.Update(cluster, feedback_fn)
445
446 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid])
447
448
450 """Modifies the parameters of a node.
451
452 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
453 to the node role (as _ROLE_*)
454 @cvar _R2F: a dictionary from node role to tuples of flags
455 @cvar _FLAGS: a list of attribute names corresponding to the flags
456
457 """
458 HPATH = "node-modify"
459 HTYPE = constants.HTYPE_NODE
460 REQ_BGL = False
461 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
462 _F2R = {
463 (True, False, False): _ROLE_CANDIDATE,
464 (False, True, False): _ROLE_DRAINED,
465 (False, False, True): _ROLE_OFFLINE,
466 (False, False, False): _ROLE_REGULAR,
467 }
468 _R2F = dict((v, k) for k, v in _F2R.items())
469 _FLAGS = ["master_candidate", "drained", "offline"]
470
472 (self.op.node_uuid, self.op.node_name) = \
473 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
474 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
475 self.op.master_capable, self.op.vm_capable,
476 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
477 self.op.disk_state]
478 if all_mods.count(None) == len(all_mods):
479 raise errors.OpPrereqError("Please pass at least one modification",
480 errors.ECODE_INVAL)
481 if all_mods.count(True) > 1:
482 raise errors.OpPrereqError("Can't set the node into more than one"
483 " state at the same time",
484 errors.ECODE_INVAL)
485
486
487 self.might_demote = (self.op.master_candidate is False or
488 self.op.offline is True or
489 self.op.drained is True or
490 self.op.master_capable is False)
491
492 if self.op.secondary_ip:
493 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
494 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
495 " address" % self.op.secondary_ip,
496 errors.ECODE_INVAL)
497
498 self.lock_all = self.op.auto_promote and self.might_demote
499 self.lock_instances = self.op.secondary_ip is not None
500
507
537
539 """Build hooks env.
540
541 This runs on the master node.
542
543 """
544 return {
545 "OP_TARGET": self.op.node_name,
546 "MASTER_CANDIDATE": str(self.op.master_candidate),
547 "OFFLINE": str(self.op.offline),
548 "DRAINED": str(self.op.drained),
549 "MASTER_CAPABLE": str(self.op.master_capable),
550 "VM_CAPABLE": str(self.op.vm_capable),
551 }
552
554 """Build hooks nodes.
555
556 """
557 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
558 return (nl, nl)
559
561 """Check prerequisites.
562
563 This only checks the instance list against the existing names.
564
565 """
566 node = self.cfg.GetNodeInfo(self.op.node_uuid)
567 if self.lock_instances:
568 affected_instances = \
569 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
570
571
572 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
573 wanted_instance_names = frozenset([inst.name for inst in
574 affected_instances.values()])
575 if wanted_instance_names - owned_instance_names:
576 raise errors.OpPrereqError("Instances affected by changing node %s's"
577 " secondary IP address have changed since"
578 " locks were acquired, wanted '%s', have"
579 " '%s'; retry the operation" %
580 (node.name,
581 utils.CommaJoin(wanted_instance_names),
582 utils.CommaJoin(owned_instance_names)),
583 errors.ECODE_STATE)
584 else:
585 affected_instances = None
586
587 if (self.op.master_candidate is not None or
588 self.op.drained is not None or
589 self.op.offline is not None):
590
591 if node.uuid == self.cfg.GetMasterNode():
592 raise errors.OpPrereqError("The master role can be changed"
593 " only via master-failover",
594 errors.ECODE_INVAL)
595
596 if self.op.master_candidate and not node.master_capable:
597 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
598 " it a master candidate" % node.name,
599 errors.ECODE_STATE)
600
601 if self.op.vm_capable is False:
602 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
603 if ipri or isec:
604 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
605 " the vm_capable flag" % node.name,
606 errors.ECODE_STATE)
607
608 if node.master_candidate and self.might_demote and not self.lock_all:
609 assert not self.op.auto_promote, "auto_promote set but lock_all not"
610
611
612 (mc_remaining, mc_should, _) = \
613 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
614 if mc_remaining < mc_should:
615 raise errors.OpPrereqError("Not enough master candidates, please"
616 " pass auto promote option to allow"
617 " promotion (--auto-promote or RAPI"
618 " auto_promote=True)", errors.ECODE_STATE)
619
620 self.old_flags = old_flags = (node.master_candidate,
621 node.drained, node.offline)
622 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
623 self.old_role = old_role = self._F2R[old_flags]
624
625
626 for attr in self._FLAGS:
627 if getattr(self.op, attr) is False and getattr(node, attr) is False:
628 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
629 setattr(self.op, attr, None)
630
631
632
633
634
635 if SupportsOob(self.cfg, node):
636 if self.op.offline is False and not (node.powered or
637 self.op.powered is True):
638 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
639 " offline status can be reset") %
640 self.op.node_name, errors.ECODE_STATE)
641 elif self.op.powered is not None:
642 raise errors.OpPrereqError(("Unable to change powered state for node %s"
643 " as it does not support out-of-band"
644 " handling") % self.op.node_name,
645 errors.ECODE_STATE)
646
647
648 if (self.op.drained is False or self.op.offline is False or
649 (self.op.master_capable and not node.master_capable)):
650 if _DecideSelfPromotion(self):
651 self.op.master_candidate = True
652 self.LogInfo("Auto-promoting node to master candidate")
653
654
655 if self.op.master_capable is False and node.master_candidate:
656 self.LogInfo("Demoting from master candidate")
657 self.op.master_candidate = False
658
659
660 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
661 if self.op.master_candidate:
662 new_role = self._ROLE_CANDIDATE
663 elif self.op.drained:
664 new_role = self._ROLE_DRAINED
665 elif self.op.offline:
666 new_role = self._ROLE_OFFLINE
667 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
668
669
670 new_role = self._ROLE_REGULAR
671 else:
672 new_role = old_role
673
674 self.new_role = new_role
675
676 if old_role == self._ROLE_OFFLINE and new_role != old_role:
677
678 result = self.rpc.call_version([node.uuid])[node.uuid]
679 if result.fail_msg:
680 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
681 " to report its version: %s" %
682 (node.name, result.fail_msg),
683 errors.ECODE_STATE)
684 else:
685 self.LogWarning("Transitioning node from offline to online state"
686 " without using re-add. Please make sure the node"
687 " is healthy!")
688
689
690
691
692 if self.op.secondary_ip:
693
694 master = self.cfg.GetMasterNodeInfo()
695 master_singlehomed = master.secondary_ip == master.primary_ip
696 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
697 if self.op.force and node.uuid == master.uuid:
698 self.LogWarning("Transitioning from single-homed to multi-homed"
699 " cluster; all nodes will require a secondary IP"
700 " address")
701 else:
702 raise errors.OpPrereqError("Changing the secondary ip on a"
703 " single-homed cluster requires the"
704 " --force option to be passed, and the"
705 " target node to be the master",
706 errors.ECODE_INVAL)
707 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
708 if self.op.force and node.uuid == master.uuid:
709 self.LogWarning("Transitioning from multi-homed to single-homed"
710 " cluster; secondary IP addresses will have to be"
711 " removed")
712 else:
713 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
714 " same as the primary IP on a multi-homed"
715 " cluster, unless the --force option is"
716 " passed, and the target node is the"
717 " master", errors.ECODE_INVAL)
718
719 assert not (set([inst.name for inst in affected_instances.values()]) -
720 self.owned_locks(locking.LEVEL_INSTANCE))
721
722 if node.offline:
723 if affected_instances:
724 msg = ("Cannot change secondary IP address: offline node has"
725 " instances (%s) configured to use it" %
726 utils.CommaJoin(
727 [inst.name for inst in affected_instances.values()]))
728 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
729 else:
730
731
732 for instance in affected_instances.values():
733 CheckInstanceState(self, instance, INSTANCE_DOWN,
734 msg="cannot change secondary ip")
735
736 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
737 if master.uuid != node.uuid:
738
739 if not netutils.TcpPing(self.op.secondary_ip,
740 constants.DEFAULT_NODED_PORT,
741 source=master.secondary_ip):
742 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
743 " based ping to node daemon port",
744 errors.ECODE_ENVIRON)
745
746 if self.op.ndparams:
747 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
748 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
749 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
750 "node", "cluster or group")
751 self.new_ndparams = new_ndparams
752
753 if self.op.hv_state:
754 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
755 node.hv_state_static)
756
757 if self.op.disk_state:
758 self.new_disk_state = \
759 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
760
761 - def Exec(self, feedback_fn):
762 """Modifies a node.
763
764 """
765 node = self.cfg.GetNodeInfo(self.op.node_uuid)
766 result = []
767
768 if self.op.ndparams:
769 node.ndparams = self.new_ndparams
770
771 if self.op.powered is not None:
772 node.powered = self.op.powered
773
774 if self.op.hv_state:
775 node.hv_state_static = self.new_hv_state
776
777 if self.op.disk_state:
778 node.disk_state_static = self.new_disk_state
779
780 for attr in ["master_capable", "vm_capable"]:
781 val = getattr(self.op, attr)
782 if val is not None:
783 setattr(node, attr, val)
784 result.append((attr, str(val)))
785
786 if self.new_role != self.old_role:
787
788 if self.old_role == self._ROLE_CANDIDATE and \
789 self.new_role != self._ROLE_OFFLINE:
790 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
791 if msg:
792 self.LogWarning("Node failed to demote itself: %s", msg)
793
794 new_flags = self._R2F[self.new_role]
795 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
796 if of != nf:
797 result.append((desc, str(nf)))
798 (node.master_candidate, node.drained, node.offline) = new_flags
799
800
801 if self.lock_all:
802 AdjustCandidatePool(self, [node.uuid], feedback_fn)
803
804 cluster = self.cfg.GetClusterInfo()
805
806 if self.new_role == self._ROLE_CANDIDATE:
807 AddNodeCertToCandidateCerts(self, node.uuid, cluster)
808
809 if self.old_role == self._ROLE_CANDIDATE:
810 RemoveNodeCertFromCandidateCerts(node.uuid, cluster)
811
812 if self.op.secondary_ip:
813 node.secondary_ip = self.op.secondary_ip
814 result.append(("secondary_ip", self.op.secondary_ip))
815
816
817 self.cfg.Update(node, feedback_fn)
818
819
820
821 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
822 self.context.ReaddNode(node)
823
824 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid])
825
826 return result
827
828
830 """Powercycles a node.
831
832 """
833 REQ_BGL = False
834
843
845 """Locking for PowercycleNode.
846
847 This is a last-resort option and shouldn't block on other
848 jobs. Therefore, we grab no locks.
849
850 """
851 self.needed_locks = {}
852
853 - def Exec(self, feedback_fn):
854 """Reboots a node.
855
856 """
857 default_hypervisor = self.cfg.GetHypervisorType()
858 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
859 result = self.rpc.call_node_powercycle(self.op.node_uuid,
860 default_hypervisor,
861 hvparams)
862 result.Raise("Failed to schedule the reboot")
863 return result.payload
864
865
868
869
871 """Returns primary instances on a node.
872
873 """
874 return _GetNodeInstancesInner(cfg,
875 lambda inst: node_uuid == inst.primary_node)
876
877
884
885
887 """Returns a list of all primary and secondary instances on a node.
888
889 """
890
891 return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
892
893
895 """Evacuates instances off a list of nodes.
896
897 """
898 REQ_BGL = False
899
902
904 (self.op.node_uuid, self.op.node_name) = \
905 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
906
907 if self.op.remote_node is not None:
908 (self.op.remote_node_uuid, self.op.remote_node) = \
909 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
910 self.op.remote_node)
911 assert self.op.remote_node
912
913 if self.op.node_uuid == self.op.remote_node_uuid:
914 raise errors.OpPrereqError("Can not use evacuated node as a new"
915 " secondary node", errors.ECODE_INVAL)
916
917 if self.op.mode != constants.NODE_EVAC_SEC:
918 raise errors.OpPrereqError("Without the use of an iallocator only"
919 " secondary instances can be evacuated",
920 errors.ECODE_INVAL)
921
922
923 self.share_locks = ShareAll()
924 self.needed_locks = {
925 locking.LEVEL_INSTANCE: [],
926 locking.LEVEL_NODEGROUP: [],
927 locking.LEVEL_NODE: [],
928 }
929
930
931
932 self.lock_nodes = self._DetermineNodes()
933
935 """Gets the list of node UUIDs to operate on.
936
937 """
938 if self.op.remote_node is None:
939
940 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
941 else:
942 group_nodes = frozenset([self.op.remote_node_uuid])
943
944
945 return set([self.op.node_uuid]) | group_nodes
946
975
991
993
994 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
995 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
996 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
997
998 need_nodes = self._DetermineNodes()
999
1000 if not owned_nodes.issuperset(need_nodes):
1001 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
1002 " locks were acquired, current nodes are"
1003 " are '%s', used to be '%s'; retry the"
1004 " operation" %
1005 (self.op.node_name,
1006 utils.CommaJoin(need_nodes),
1007 utils.CommaJoin(owned_nodes)),
1008 errors.ECODE_STATE)
1009
1010 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
1011 if owned_groups != wanted_groups:
1012 raise errors.OpExecError("Node groups changed since locks were acquired,"
1013 " current groups are '%s', used to be '%s';"
1014 " retry the operation" %
1015 (utils.CommaJoin(wanted_groups),
1016 utils.CommaJoin(owned_groups)))
1017
1018
1019 self.instances = self._DetermineInstances()
1020 self.instance_names = [i.name for i in self.instances]
1021
1022 if set(self.instance_names) != owned_instance_names:
1023 raise errors.OpExecError("Instances on node '%s' changed since locks"
1024 " were acquired, current instances are '%s',"
1025 " used to be '%s'; retry the operation" %
1026 (self.op.node_name,
1027 utils.CommaJoin(self.instance_names),
1028 utils.CommaJoin(owned_instance_names)))
1029
1030 if self.instance_names:
1031 self.LogInfo("Evacuating instances from node '%s': %s",
1032 self.op.node_name,
1033 utils.CommaJoin(utils.NiceSort(self.instance_names)))
1034 else:
1035 self.LogInfo("No instances to evacuate from node '%s'",
1036 self.op.node_name)
1037
1038 if self.op.remote_node is not None:
1039 for i in self.instances:
1040 if i.primary_node == self.op.remote_node_uuid:
1041 raise errors.OpPrereqError("Node %s is the primary node of"
1042 " instance %s, cannot use it as"
1043 " secondary" %
1044 (self.op.remote_node, i.name),
1045 errors.ECODE_INVAL)
1046
1047 - def Exec(self, feedback_fn):
1048 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1049
1050 if not self.instance_names:
1051
1052 jobs = []
1053
1054 elif self.op.iallocator is not None:
1055
1056 req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1057 instances=list(self.instance_names))
1058 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1059
1060 ial.Run(self.op.iallocator)
1061
1062 if not ial.success:
1063 raise errors.OpPrereqError("Can't compute node evacuation using"
1064 " iallocator '%s': %s" %
1065 (self.op.iallocator, ial.info),
1066 errors.ECODE_NORES)
1067
1068 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1069
1070 elif self.op.remote_node is not None:
1071 assert self.op.mode == constants.NODE_EVAC_SEC
1072 jobs = [
1073 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1074 remote_node=self.op.remote_node,
1075 disks=[],
1076 mode=constants.REPLACE_DISK_CHG,
1077 early_release=self.op.early_release)]
1078 for instance_name in self.instance_names]
1079
1080 else:
1081 raise errors.ProgrammerError("No iallocator or remote node")
1082
1083 return ResultWithJobs(jobs)
1084
1085
1087 """Migrate all instances from a node.
1088
1089 """
1090 HPATH = "node-migrate"
1091 HTYPE = constants.HTYPE_NODE
1092 REQ_BGL = False
1093
1096
1098 (self.op.node_uuid, self.op.node_name) = \
1099 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1100
1101 self.share_locks = ShareAll()
1102 self.needed_locks = {
1103 locking.LEVEL_NODE: [self.op.node_uuid],
1104 }
1105
1107 """Build hooks env.
1108
1109 This runs on the master, the primary and all the secondaries.
1110
1111 """
1112 return {
1113 "NODE_NAME": self.op.node_name,
1114 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1115 }
1116
1118 """Build hooks nodes.
1119
1120 """
1121 nl = [self.cfg.GetMasterNode()]
1122 return (nl, nl)
1123
1126
1127 - def Exec(self, feedback_fn):
1128
1129 jobs = [
1130 [opcodes.OpInstanceMigrate(
1131 instance_name=inst.name,
1132 mode=self.op.mode,
1133 live=self.op.live,
1134 iallocator=self.op.iallocator,
1135 target_node=self.op.target_node,
1136 allow_runtime_changes=self.op.allow_runtime_changes,
1137 ignore_ipolicy=self.op.ignore_ipolicy)]
1138 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1139
1140
1141
1142
1143
1144
1145 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1146 frozenset([self.op.node_uuid]))
1147
1148 return ResultWithJobs(jobs)
1149
1150
1169
1170
1172 """Logical unit for modifying a storage volume on a node.
1173
1174 """
1175 REQ_BGL = False
1176
1178 (self.op.node_uuid, self.op.node_name) = \
1179 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1180
1181 storage_type = self.op.storage_type
1182
1183 try:
1184 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1185 except KeyError:
1186 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1187 " modified" % storage_type,
1188 errors.ECODE_INVAL)
1189
1190 diff = set(self.op.changes.keys()) - modifiable
1191 if diff:
1192 raise errors.OpPrereqError("The following fields can not be modified for"
1193 " storage units of type '%s': %r" %
1194 (storage_type, list(diff)),
1195 errors.ECODE_INVAL)
1196
1202
1204 self.needed_locks = {
1205 locking.LEVEL_NODE: self.op.node_uuid,
1206 }
1207
1208 - def Exec(self, feedback_fn):
1209 """Computes the list of nodes and their attributes.
1210
1211 """
1212 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1213 result = self.rpc.call_storage_modify(self.op.node_uuid,
1214 self.op.storage_type, st_args,
1215 self.op.name, self.op.changes)
1216 result.Raise("Failed to modify storage unit '%s' on %s" %
1217 (self.op.name, self.op.node_name))
1218
1219
1221 """Checks whether all selected fields are valid according to fields.
1222
1223 @type fields: L{utils.FieldSet}
1224 @param fields: fields set
1225 @type selected: L{utils.FieldSet}
1226 @param selected: fields set
1227
1228 """
1229 delta = fields.NonMatching(selected)
1230 if delta:
1231 raise errors.OpPrereqError("Unknown output fields selected: %s"
1232 % ",".join(delta), errors.ECODE_INVAL)
1233
1234
1236 """Logical unit for getting volumes on node(s).
1237
1238 """
1239 REQ_BGL = False
1240
1246
1259
1260 - def Exec(self, feedback_fn):
1311
1312
1314 """Logical unit for getting information on storage units on node(s).
1315
1316 """
1317 REQ_BGL = False
1318
1322
1335
1337 """Determines the default storage type of the cluster.
1338
1339 """
1340 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1341 default_storage_type = \
1342 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1343 return default_storage_type
1344
1346 """Check prerequisites.
1347
1348 """
1349 if self.op.storage_type:
1350 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1351 self.storage_type = self.op.storage_type
1352 else:
1353 self.storage_type = self._DetermineStorageType()
1354 supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1355 if self.storage_type not in supported_storage_types:
1356 raise errors.OpPrereqError(
1357 "Storage reporting for storage type '%s' is not supported. Please"
1358 " use the --storage-type option to specify one of the supported"
1359 " storage types (%s) or set the default disk template to one that"
1360 " supports storage reporting." %
1361 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1362
1363 - def Exec(self, feedback_fn):
1364 """Computes the list of nodes and their attributes.
1365
1366 """
1367 if self.op.storage_type:
1368 self.storage_type = self.op.storage_type
1369 else:
1370 self.storage_type = self._DetermineStorageType()
1371
1372 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1373
1374
1375 if constants.SF_NAME in self.op.output_fields:
1376 fields = self.op.output_fields[:]
1377 else:
1378 fields = [constants.SF_NAME] + self.op.output_fields
1379
1380
1381 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1382 while extra in fields:
1383 fields.remove(extra)
1384
1385 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1386 name_idx = field_idx[constants.SF_NAME]
1387
1388 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1389 data = self.rpc.call_storage_list(self.node_uuids,
1390 self.storage_type, st_args,
1391 self.op.name, fields)
1392
1393 result = []
1394
1395 for node_uuid in utils.NiceSort(self.node_uuids):
1396 node_name = self.cfg.GetNodeName(node_uuid)
1397 nresult = data[node_uuid]
1398 if nresult.offline:
1399 continue
1400
1401 msg = nresult.fail_msg
1402 if msg:
1403 self.LogWarning("Can't get storage data from node %s: %s",
1404 node_name, msg)
1405 continue
1406
1407 rows = dict([(row[name_idx], row) for row in nresult.payload])
1408
1409 for name in utils.NiceSort(rows.keys()):
1410 row = rows[name]
1411
1412 out = []
1413
1414 for field in self.op.output_fields:
1415 if field == constants.SF_NODE:
1416 val = node_name
1417 elif field == constants.SF_TYPE:
1418 val = self.storage_type
1419 elif field in field_idx:
1420 val = row[field_idx[field]]
1421 else:
1422 raise errors.ParameterError(field)
1423
1424 out.append(val)
1425
1426 result.append(out)
1427
1428 return result
1429
1430
1432 """Logical unit for removing a node.
1433
1434 """
1435 HPATH = "node-remove"
1436 HTYPE = constants.HTYPE_NODE
1437
1439 """Build hooks env.
1440
1441 """
1442 return {
1443 "OP_TARGET": self.op.node_name,
1444 "NODE_NAME": self.op.node_name,
1445 }
1446
1448 """Build hooks nodes.
1449
1450 This doesn't run on the target node in the pre phase as a failed
1451 node would then be impossible to remove.
1452
1453 """
1454 all_nodes = self.cfg.GetNodeList()
1455 try:
1456 all_nodes.remove(self.op.node_uuid)
1457 except ValueError:
1458 pass
1459 return (all_nodes, all_nodes)
1460
1489
1490 - def Exec(self, feedback_fn):
1491 """Removes the node from the cluster.
1492
1493 """
1494 logging.info("Stopping the node daemon and removing configs from node %s",
1495 self.node.name)
1496
1497 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1498
1499 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1500 "Not owning BGL"
1501
1502
1503 AdjustCandidatePool(self, [self.node.uuid], feedback_fn)
1504 self.context.RemoveNode(self.node)
1505
1506
1507 RunPostHook(self, self.node.name)
1508
1509
1510
1511 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1512 msg = result.fail_msg
1513 if msg:
1514 self.LogWarning("Errors encountered on the remote node while leaving"
1515 " the cluster: %s", msg)
1516
1517 cluster = self.cfg.GetClusterInfo()
1518
1519
1520 if self.node.master_candidate:
1521 utils.RemoveNodeFromCandidateCerts(self.node.uuid,
1522 cluster.candidate_certs)
1523 self.cfg.Update(cluster, feedback_fn)
1524
1525
1526 if cluster.modify_etc_hosts:
1527 master_node_uuid = self.cfg.GetMasterNode()
1528 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1529 constants.ETC_HOSTS_REMOVE,
1530 self.node.name, None)
1531 result.Raise("Can't update hosts file with new host data")
1532 RedistributeAncillaryFiles(self)
1533
1534
1536 """Repairs the volume group on a node.
1537
1538 """
1539 REQ_BGL = False
1540
1552
1554 self.needed_locks = {
1555 locking.LEVEL_NODE: [self.op.node_uuid],
1556 }
1557
1573
1575 """Check prerequisites.
1576
1577 """
1578 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1579
1580
1581 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1582 if not inst.disks_active:
1583 continue
1584 check_nodes = set(inst.all_nodes)
1585 check_nodes.discard(self.op.node_uuid)
1586 for inst_node_uuid in check_nodes:
1587 self._CheckFaultyDisks(inst, inst_node_uuid)
1588
1589 - def Exec(self, feedback_fn):
1600