1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with nodes."""
32
33 import logging
34 import operator
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import netutils
40 from ganeti import objects
41 from ganeti import opcodes
42 import ganeti.rpc.node as rpc
43 from ganeti import utils
44 from ganeti.masterd import iallocator
45
46 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
47 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
48 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
49 IsExclusiveStorageEnabledNode, CheckNodePVs, \
50 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
51 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
52 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
53 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
54 FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \
55 AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \
56 EnsureKvmdOnNodes
57
58
68
69
71 """Ensure that a node has the given secondary ip.
72
73 @type lu: L{LogicalUnit}
74 @param lu: the LU on behalf of which we make the check
75 @type node: L{objects.Node}
76 @param node: the node to check
77 @type secondary_ip: string
78 @param secondary_ip: the ip to check
79 @type prereq: boolean
80 @param prereq: whether to throw a prerequisite or an execute error
81 @raise errors.OpPrereqError: if the node doesn't have the ip,
82 and prereq=True
83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
84
85 """
86
87
88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
89 result.Raise("Failure checking secondary ip on node %s" % node.name,
90 prereq=prereq, ecode=errors.ECODE_ENVIRON)
91 if not result.payload:
92 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
93 " please fix and re-run this command" % secondary_ip)
94 if prereq:
95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
96 else:
97 raise errors.OpExecError(msg)
98
99
101 """Logical unit for adding node to the cluster.
102
103 """
104 HPATH = "node-add"
105 HTYPE = constants.HTYPE_NODE
106 _NFLAGS = ["master_capable", "vm_capable"]
107
122
124 """Build hooks env.
125
126 This will run on all nodes before, and on all nodes + the new node after.
127
128 """
129 return {
130 "OP_TARGET": self.op.node_name,
131 "NODE_NAME": self.op.node_name,
132 "NODE_PIP": self.op.primary_ip,
133 "NODE_SIP": self.op.secondary_ip,
134 "MASTER_CAPABLE": str(self.op.master_capable),
135 "VM_CAPABLE": str(self.op.vm_capable),
136 }
137
139 """Build hooks nodes.
140
141 """
142 hook_nodes = self.cfg.GetNodeList()
143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
144 if new_node_info is not None:
145
146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
147
148
149 return (hook_nodes, hook_nodes)
150
151 - def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
155 """Check prerequisites.
156
157 This checks:
158 - the new node is not already in the config
159 - it is resolvable
160 - its parameters (single/dual homed) matches the cluster
161
162 Any errors are signaled by raising errors.OpPrereqError.
163
164 """
165 node_name = self.hostname.name
166 self.op.primary_ip = self.hostname.ip
167 if self.op.secondary_ip is None:
168 if self.primary_ip_family == netutils.IP6Address.family:
169 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
170 " IPv4 address must be given as secondary",
171 errors.ECODE_INVAL)
172 self.op.secondary_ip = self.op.primary_ip
173
174 secondary_ip = self.op.secondary_ip
175 if not netutils.IP4Address.IsValid(secondary_ip):
176 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
177 " address" % secondary_ip, errors.ECODE_INVAL)
178
179 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
180 if not self.op.readd and existing_node_info is not None:
181 raise errors.OpPrereqError("Node %s is already in the configuration" %
182 node_name, errors.ECODE_EXISTS)
183 elif self.op.readd and existing_node_info is None:
184 raise errors.OpPrereqError("Node %s is not in the configuration" %
185 node_name, errors.ECODE_NOENT)
186
187 self.changed_primary_ip = False
188
189 for existing_node in self.cfg.GetAllNodesInfo().values():
190 if self.op.readd and node_name == existing_node.name:
191 if existing_node.secondary_ip != secondary_ip:
192 raise errors.OpPrereqError("Readded node doesn't have the same IP"
193 " address configuration as before",
194 errors.ECODE_INVAL)
195 if existing_node.primary_ip != self.op.primary_ip:
196 self.changed_primary_ip = True
197
198 continue
199
200 if (existing_node.primary_ip == self.op.primary_ip or
201 existing_node.secondary_ip == self.op.primary_ip or
202 existing_node.primary_ip == secondary_ip or
203 existing_node.secondary_ip == secondary_ip):
204 raise errors.OpPrereqError("New node ip address(es) conflict with"
205 " existing node %s" % existing_node.name,
206 errors.ECODE_NOTUNIQUE)
207
208
209
210 if self.op.readd:
211 assert existing_node_info is not None, \
212 "Can't retrieve locked node %s" % node_name
213 for attr in self._NFLAGS:
214 if getattr(self.op, attr) is None:
215 setattr(self.op, attr, getattr(existing_node_info, attr))
216 else:
217 for attr in self._NFLAGS:
218 if getattr(self.op, attr) is None:
219 setattr(self.op, attr, True)
220
221 if self.op.readd and not self.op.vm_capable:
222 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
223 if pri or sec:
224 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
225 " flag set to false, but it already holds"
226 " instances" % node_name,
227 errors.ECODE_STATE)
228
229
230
231 myself = self.cfg.GetMasterNodeInfo()
232 master_singlehomed = myself.secondary_ip == myself.primary_ip
233 newbie_singlehomed = secondary_ip == self.op.primary_ip
234 if master_singlehomed != newbie_singlehomed:
235 if master_singlehomed:
236 raise errors.OpPrereqError("The master has no secondary ip but the"
237 " new node has one",
238 errors.ECODE_INVAL)
239 else:
240 raise errors.OpPrereqError("The master has a secondary ip but the"
241 " new node doesn't have one",
242 errors.ECODE_INVAL)
243
244
245 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
246 raise errors.OpPrereqError("Node not reachable by ping",
247 errors.ECODE_ENVIRON)
248
249 if not newbie_singlehomed:
250
251 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
252 source=myself.secondary_ip):
253 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
254 " based ping to node daemon port",
255 errors.ECODE_ENVIRON)
256
257 if self.op.readd:
258 exceptions = [existing_node_info.uuid]
259 else:
260 exceptions = []
261
262 if self.op.master_capable:
263 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
264 else:
265 self.master_candidate = False
266
267 self.node_group = None
268 if self.op.readd:
269 self.new_node = existing_node_info
270 self.node_group = existing_node_info.group
271 else:
272 self.node_group = self.cfg.LookupNodeGroup(self.op.group)
273 self.new_node = objects.Node(name=node_name,
274 primary_ip=self.op.primary_ip,
275 secondary_ip=secondary_ip,
276 master_candidate=self.master_candidate,
277 offline=False, drained=False,
278 group=self.node_group, ndparams={})
279
280 if self.op.ndparams:
281 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
282 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
283 "node", "cluster or group")
284
285 if self.op.hv_state:
286 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
287
288 if self.op.disk_state:
289 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
290
291
292
293 rpcrunner = rpc.DnsOnlyRunner()
294 result = rpcrunner.call_version([node_name])[node_name]
295 result.Raise("Can't get version information from node %s" % node_name,
296 prereq=True)
297 if constants.PROTOCOL_VERSION == result.payload:
298 logging.info("Communication to node %s fine, sw version %s match",
299 node_name, result.payload)
300 else:
301 raise errors.OpPrereqError("Version mismatch master version %s,"
302 " node version %s" %
303 (constants.PROTOCOL_VERSION, result.payload),
304 errors.ECODE_ENVIRON)
305
306 vg_name = self.cfg.GetVGName()
307 if vg_name is not None:
308 vparams = {constants.NV_PVLIST: [vg_name]}
309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
310 cname = self.cfg.GetClusterName()
311 result = rpcrunner.call_node_verify_light(
312 [node_name], vparams, cname,
313 self.cfg.GetClusterInfo().hvparams,
314 {node_name: self.node_group},
315 self.cfg.GetAllNodeGroupsInfoDict()
316 )[node_name]
317 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
318 if errmsgs:
319 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
320 "; ".join(errmsgs), errors.ECODE_ENVIRON)
321
339
340 - def Exec(self, feedback_fn):
341 """Adds the new node to the cluster.
342
343 """
344 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
345 "Not owning BGL"
346
347
348 self.new_node.powered = True
349
350
351
352
353
354 if self.op.readd:
355 self.new_node.offline = False
356 self.new_node.drained = False
357 self.LogInfo("Readding a node, the offline/drained flags were reset")
358
359 self.new_node.master_candidate = self.master_candidate
360 if self.changed_primary_ip:
361 self.new_node.primary_ip = self.op.primary_ip
362
363
364 for attr in self._NFLAGS:
365 setattr(self.new_node, attr, getattr(self.op, attr))
366
367
368 if self.new_node.master_candidate:
369 self.LogInfo("Node will be a master candidate")
370
371 if self.op.ndparams:
372 self.new_node.ndparams = self.op.ndparams
373 else:
374 self.new_node.ndparams = {}
375
376 if self.op.hv_state:
377 self.new_node.hv_state_static = self.new_hv_state
378
379 if self.op.disk_state:
380 self.new_node.disk_state_static = self.new_disk_state
381
382
383 if self.cfg.GetClusterInfo().modify_etc_hosts:
384 master_node = self.cfg.GetMasterNode()
385 result = self.rpc.call_etc_hosts_modify(
386 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
387 self.hostname.ip)
388 result.Raise("Can't update hosts file with new host data")
389
390 if self.new_node.secondary_ip != self.new_node.primary_ip:
391 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
392 False)
393
394 node_verifier_uuids = [self.cfg.GetMasterNode()]
395 node_verify_param = {
396 constants.NV_NODELIST: ([self.new_node.name], {}),
397
398 }
399
400 result = self.rpc.call_node_verify(
401 node_verifier_uuids, node_verify_param,
402 self.cfg.GetClusterName(),
403 self.cfg.GetClusterInfo().hvparams,
404 {self.new_node.name: self.cfg.LookupNodeGroup(self.node_group)},
405 self.cfg.GetAllNodeGroupsInfoDict()
406 )
407 for verifier in node_verifier_uuids:
408 result[verifier].Raise("Cannot communicate with node %s" % verifier)
409 nl_payload = result[verifier].payload[constants.NV_NODELIST]
410 if nl_payload:
411 for failed in nl_payload:
412 feedback_fn("ssh/hostname verification failed"
413 " (checking from %s): %s" %
414 (verifier, nl_payload[failed]))
415 raise errors.OpExecError("ssh/hostname verification failed")
416
417 self._InitOpenVSwitch()
418
419 if self.op.readd:
420 self.context.ReaddNode(self.new_node)
421 RedistributeAncillaryFiles(self)
422
423 self.cfg.Update(self.new_node, feedback_fn)
424
425 if not self.new_node.master_candidate:
426 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
427 result.Warn("Node failed to demote itself from master candidate status",
428 self.LogWarning)
429 else:
430 self.context.AddNode(self.cfg, self.new_node, self.proc.GetECId())
431 RedistributeAncillaryFiles(self)
432
433
434 digest = GetClientCertDigest(self, self.new_node.uuid)
435 if self.new_node.master_candidate:
436 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest)
437 else:
438 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None)
439
440 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid])
441
442
444 """Modifies the parameters of a node.
445
446 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
447 to the node role (as _ROLE_*)
448 @cvar _R2F: a dictionary from node role to tuples of flags
449 @cvar _FLAGS: a list of attribute names corresponding to the flags
450
451 """
452 HPATH = "node-modify"
453 HTYPE = constants.HTYPE_NODE
454 REQ_BGL = False
455 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
456 _F2R = {
457 (True, False, False): _ROLE_CANDIDATE,
458 (False, True, False): _ROLE_DRAINED,
459 (False, False, True): _ROLE_OFFLINE,
460 (False, False, False): _ROLE_REGULAR,
461 }
462 _R2F = dict((v, k) for k, v in _F2R.items())
463 _FLAGS = ["master_candidate", "drained", "offline"]
464
466 (self.op.node_uuid, self.op.node_name) = \
467 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
468 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
469 self.op.master_capable, self.op.vm_capable,
470 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
471 self.op.disk_state]
472 if all_mods.count(None) == len(all_mods):
473 raise errors.OpPrereqError("Please pass at least one modification",
474 errors.ECODE_INVAL)
475 if all_mods.count(True) > 1:
476 raise errors.OpPrereqError("Can't set the node into more than one"
477 " state at the same time",
478 errors.ECODE_INVAL)
479
480
481 self.might_demote = (self.op.master_candidate is False or
482 self.op.offline is True or
483 self.op.drained is True or
484 self.op.master_capable is False)
485
486 if self.op.secondary_ip:
487 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
488 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
489 " address" % self.op.secondary_ip,
490 errors.ECODE_INVAL)
491
492 self.lock_all = self.op.auto_promote and self.might_demote
493 self.lock_instances = self.op.secondary_ip is not None
494
501
531
533 """Build hooks env.
534
535 This runs on the master node.
536
537 """
538 return {
539 "OP_TARGET": self.op.node_name,
540 "MASTER_CANDIDATE": str(self.op.master_candidate),
541 "OFFLINE": str(self.op.offline),
542 "DRAINED": str(self.op.drained),
543 "MASTER_CAPABLE": str(self.op.master_capable),
544 "VM_CAPABLE": str(self.op.vm_capable),
545 }
546
548 """Build hooks nodes.
549
550 """
551 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
552 return (nl, nl)
553
555 """Check prerequisites.
556
557 This only checks the instance list against the existing names.
558
559 """
560 node = self.cfg.GetNodeInfo(self.op.node_uuid)
561 if self.lock_instances:
562 affected_instances = \
563 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
564
565
566 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
567 wanted_instance_names = frozenset([inst.name for inst in
568 affected_instances.values()])
569 if wanted_instance_names - owned_instance_names:
570 raise errors.OpPrereqError("Instances affected by changing node %s's"
571 " secondary IP address have changed since"
572 " locks were acquired, wanted '%s', have"
573 " '%s'; retry the operation" %
574 (node.name,
575 utils.CommaJoin(wanted_instance_names),
576 utils.CommaJoin(owned_instance_names)),
577 errors.ECODE_STATE)
578 else:
579 affected_instances = None
580
581 if (self.op.master_candidate is not None or
582 self.op.drained is not None or
583 self.op.offline is not None):
584
585 if node.uuid == self.cfg.GetMasterNode():
586 raise errors.OpPrereqError("The master role can be changed"
587 " only via master-failover",
588 errors.ECODE_INVAL)
589
590 if self.op.master_candidate and not node.master_capable:
591 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
592 " it a master candidate" % node.name,
593 errors.ECODE_STATE)
594
595 if self.op.vm_capable is False:
596 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
597 if ipri or isec:
598 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
599 " the vm_capable flag" % node.name,
600 errors.ECODE_STATE)
601
602 if node.master_candidate and self.might_demote and not self.lock_all:
603 assert not self.op.auto_promote, "auto_promote set but lock_all not"
604
605
606 (mc_remaining, mc_should, _) = \
607 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
608 if mc_remaining < mc_should:
609 raise errors.OpPrereqError("Not enough master candidates, please"
610 " pass auto promote option to allow"
611 " promotion (--auto-promote or RAPI"
612 " auto_promote=True)", errors.ECODE_STATE)
613
614 self.old_flags = old_flags = (node.master_candidate,
615 node.drained, node.offline)
616 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
617 self.old_role = old_role = self._F2R[old_flags]
618
619
620 for attr in self._FLAGS:
621 if getattr(self.op, attr) is False and getattr(node, attr) is False:
622 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
623 setattr(self.op, attr, None)
624
625
626
627
628
629 if SupportsOob(self.cfg, node):
630 if self.op.offline is False and not (node.powered or
631 self.op.powered is True):
632 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
633 " offline status can be reset") %
634 self.op.node_name, errors.ECODE_STATE)
635 elif self.op.powered is not None:
636 raise errors.OpPrereqError(("Unable to change powered state for node %s"
637 " as it does not support out-of-band"
638 " handling") % self.op.node_name,
639 errors.ECODE_STATE)
640
641
642 if (self.op.drained is False or self.op.offline is False or
643 (self.op.master_capable and not node.master_capable)):
644 if _DecideSelfPromotion(self):
645 self.op.master_candidate = True
646 self.LogInfo("Auto-promoting node to master candidate")
647
648
649 if self.op.master_capable is False and node.master_candidate:
650 if self.op.node_uuid == self.cfg.GetMasterNode():
651 raise errors.OpPrereqError("Master must remain master capable",
652 errors.ECODE_STATE)
653 self.LogInfo("Demoting from master candidate")
654 self.op.master_candidate = False
655
656
657 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
658 if self.op.master_candidate:
659 new_role = self._ROLE_CANDIDATE
660 elif self.op.drained:
661 new_role = self._ROLE_DRAINED
662 elif self.op.offline:
663 new_role = self._ROLE_OFFLINE
664 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
665
666
667 new_role = self._ROLE_REGULAR
668 else:
669 new_role = old_role
670
671 self.new_role = new_role
672
673 if old_role == self._ROLE_OFFLINE and new_role != old_role:
674
675 result = self.rpc.call_version([node.uuid])[node.uuid]
676 if result.fail_msg:
677 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
678 " to report its version: %s" %
679 (node.name, result.fail_msg),
680 errors.ECODE_STATE)
681 else:
682 self.LogWarning("Transitioning node from offline to online state"
683 " without using re-add. Please make sure the node"
684 " is healthy!")
685
686
687
688
689 if self.op.secondary_ip:
690
691 master = self.cfg.GetMasterNodeInfo()
692 master_singlehomed = master.secondary_ip == master.primary_ip
693 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
694 if self.op.force and node.uuid == master.uuid:
695 self.LogWarning("Transitioning from single-homed to multi-homed"
696 " cluster; all nodes will require a secondary IP"
697 " address")
698 else:
699 raise errors.OpPrereqError("Changing the secondary ip on a"
700 " single-homed cluster requires the"
701 " --force option to be passed, and the"
702 " target node to be the master",
703 errors.ECODE_INVAL)
704 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
705 if self.op.force and node.uuid == master.uuid:
706 self.LogWarning("Transitioning from multi-homed to single-homed"
707 " cluster; secondary IP addresses will have to be"
708 " removed")
709 else:
710 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
711 " same as the primary IP on a multi-homed"
712 " cluster, unless the --force option is"
713 " passed, and the target node is the"
714 " master", errors.ECODE_INVAL)
715
716 assert not (set([inst.name for inst in affected_instances.values()]) -
717 self.owned_locks(locking.LEVEL_INSTANCE))
718
719 if node.offline:
720 if affected_instances:
721 msg = ("Cannot change secondary IP address: offline node has"
722 " instances (%s) configured to use it" %
723 utils.CommaJoin(
724 [inst.name for inst in affected_instances.values()]))
725 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
726 else:
727
728
729 for instance in affected_instances.values():
730 CheckInstanceState(self, instance, INSTANCE_DOWN,
731 msg="cannot change secondary ip")
732
733 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
734 if master.uuid != node.uuid:
735
736 if not netutils.TcpPing(self.op.secondary_ip,
737 constants.DEFAULT_NODED_PORT,
738 source=master.secondary_ip):
739 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
740 " based ping to node daemon port",
741 errors.ECODE_ENVIRON)
742
743 if self.op.ndparams:
744 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
745 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
746 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
747 "node", "cluster or group")
748 self.new_ndparams = new_ndparams
749
750 if self.op.hv_state:
751 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
752 node.hv_state_static)
753
754 if self.op.disk_state:
755 self.new_disk_state = \
756 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
757
758 - def Exec(self, feedback_fn):
759 """Modifies a node.
760
761 """
762 node = self.cfg.GetNodeInfo(self.op.node_uuid)
763 result = []
764
765 if self.op.ndparams:
766 node.ndparams = self.new_ndparams
767
768 if self.op.powered is not None:
769 node.powered = self.op.powered
770
771 if self.op.hv_state:
772 node.hv_state_static = self.new_hv_state
773
774 if self.op.disk_state:
775 node.disk_state_static = self.new_disk_state
776
777 for attr in ["master_capable", "vm_capable"]:
778 val = getattr(self.op, attr)
779 if val is not None:
780 setattr(node, attr, val)
781 result.append((attr, str(val)))
782
783 if self.op.secondary_ip:
784 node.secondary_ip = self.op.secondary_ip
785 result.append(("secondary_ip", self.op.secondary_ip))
786
787
788 self.cfg.Update(node, feedback_fn)
789
790 if self.new_role != self.old_role:
791 new_flags = self._R2F[self.new_role]
792 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
793 if of != nf:
794 result.append((desc, str(nf)))
795 (node.master_candidate, node.drained, node.offline) = new_flags
796 self.cfg.Update(node, feedback_fn)
797
798
799
800
801 if self.old_role == self._ROLE_CANDIDATE and \
802 self.new_role != self._ROLE_OFFLINE:
803 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
804 if msg:
805 self.LogWarning("Node failed to demote itself: %s", msg)
806
807
808 if self.lock_all:
809 AdjustCandidatePool(self, [node.uuid])
810
811
812 if self.new_role == self._ROLE_CANDIDATE:
813 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid)
814
815 if self.old_role == self._ROLE_CANDIDATE:
816 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid)
817
818
819
820 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
821 self.context.ReaddNode(node)
822
823 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid])
824
825 return result
826
827
829 """Powercycles a node.
830
831 """
832 REQ_BGL = False
833
842
844 """Locking for PowercycleNode.
845
846 This is a last-resort option and shouldn't block on other
847 jobs. Therefore, we grab no locks.
848
849 """
850 self.needed_locks = {}
851
852 - def Exec(self, feedback_fn):
853 """Reboots a node.
854
855 """
856 default_hypervisor = self.cfg.GetHypervisorType()
857 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
858 result = self.rpc.call_node_powercycle(self.op.node_uuid,
859 default_hypervisor,
860 hvparams)
861 result.Raise("Failed to schedule the reboot")
862 return result.payload
863
864
867
868
870 """Returns primary instances on a node.
871
872 """
873 return _GetNodeInstancesInner(cfg,
874 lambda inst: node_uuid == inst.primary_node)
875
876
884
885
887 """Returns a list of all primary and secondary instances on a node.
888
889 """
890
891 return _GetNodeInstancesInner(cfg,
892 lambda inst: node_uuid in
893 cfg.GetInstanceNodes(inst.uuid.uuid))
894
895
897 """Evacuates instances off a list of nodes.
898
899 """
900 REQ_BGL = False
901
904
906 (self.op.node_uuid, self.op.node_name) = \
907 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
908
909 if self.op.remote_node is not None:
910 (self.op.remote_node_uuid, self.op.remote_node) = \
911 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
912 self.op.remote_node)
913 assert self.op.remote_node
914
915 if self.op.node_uuid == self.op.remote_node_uuid:
916 raise errors.OpPrereqError("Can not use evacuated node as a new"
917 " secondary node", errors.ECODE_INVAL)
918
919 if self.op.mode != constants.NODE_EVAC_SEC:
920 raise errors.OpPrereqError("Without the use of an iallocator only"
921 " secondary instances can be evacuated",
922 errors.ECODE_INVAL)
923
924
925 self.share_locks = ShareAll()
926 self.needed_locks = {
927 locking.LEVEL_INSTANCE: [],
928 locking.LEVEL_NODEGROUP: [],
929 locking.LEVEL_NODE: [],
930 }
931
932
933
934 self.lock_nodes = self._DetermineNodes()
935
937 """Gets the list of node UUIDs to operate on.
938
939 """
940 if self.op.remote_node is None:
941
942 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
943 else:
944 group_nodes = frozenset([self.op.remote_node_uuid])
945
946
947 return set([self.op.node_uuid]) | group_nodes
948
977
993
995
996 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
997 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
998 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
999
1000 need_nodes = self._DetermineNodes()
1001
1002 if not owned_nodes.issuperset(need_nodes):
1003 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
1004 " locks were acquired, current nodes are"
1005 " are '%s', used to be '%s'; retry the"
1006 " operation" %
1007 (self.op.node_name,
1008 utils.CommaJoin(need_nodes),
1009 utils.CommaJoin(owned_nodes)),
1010 errors.ECODE_STATE)
1011
1012 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
1013 if owned_groups != wanted_groups:
1014 raise errors.OpExecError("Node groups changed since locks were acquired,"
1015 " current groups are '%s', used to be '%s';"
1016 " retry the operation" %
1017 (utils.CommaJoin(wanted_groups),
1018 utils.CommaJoin(owned_groups)))
1019
1020
1021 self.instances = self._DetermineInstances()
1022 self.instance_names = [i.name for i in self.instances]
1023
1024 if set(self.instance_names) != owned_instance_names:
1025 raise errors.OpExecError("Instances on node '%s' changed since locks"
1026 " were acquired, current instances are '%s',"
1027 " used to be '%s'; retry the operation" %
1028 (self.op.node_name,
1029 utils.CommaJoin(self.instance_names),
1030 utils.CommaJoin(owned_instance_names)))
1031
1032 if self.instance_names:
1033 self.LogInfo("Evacuating instances from node '%s': %s",
1034 self.op.node_name,
1035 utils.CommaJoin(utils.NiceSort(self.instance_names)))
1036 else:
1037 self.LogInfo("No instances to evacuate from node '%s'",
1038 self.op.node_name)
1039
1040 if self.op.remote_node is not None:
1041 for i in self.instances:
1042 if i.primary_node == self.op.remote_node_uuid:
1043 raise errors.OpPrereqError("Node %s is the primary node of"
1044 " instance %s, cannot use it as"
1045 " secondary" %
1046 (self.op.remote_node, i.name),
1047 errors.ECODE_INVAL)
1048
1049 - def Exec(self, feedback_fn):
1050 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1051
1052 if not self.instance_names:
1053
1054 jobs = []
1055
1056 elif self.op.iallocator is not None:
1057
1058 req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1059 instances=list(self.instance_names))
1060 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1061
1062 ial.Run(self.op.iallocator)
1063
1064 if not ial.success:
1065 raise errors.OpPrereqError("Can't compute node evacuation using"
1066 " iallocator '%s': %s" %
1067 (self.op.iallocator, ial.info),
1068 errors.ECODE_NORES)
1069
1070 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1071
1072 elif self.op.remote_node is not None:
1073 assert self.op.mode == constants.NODE_EVAC_SEC
1074 jobs = [
1075 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1076 remote_node=self.op.remote_node,
1077 disks=[],
1078 mode=constants.REPLACE_DISK_CHG,
1079 early_release=self.op.early_release)]
1080 for instance_name in self.instance_names]
1081
1082 else:
1083 raise errors.ProgrammerError("No iallocator or remote node")
1084
1085 return ResultWithJobs(jobs)
1086
1087
1089 """Migrate all instances from a node.
1090
1091 """
1092 HPATH = "node-migrate"
1093 HTYPE = constants.HTYPE_NODE
1094 REQ_BGL = False
1095
1098
1100 (self.op.node_uuid, self.op.node_name) = \
1101 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1102
1103 self.share_locks = ShareAll()
1104 self.needed_locks = {
1105 locking.LEVEL_NODE: [self.op.node_uuid],
1106 }
1107
1109 """Build hooks env.
1110
1111 This runs on the master, the primary and all the secondaries.
1112
1113 """
1114 return {
1115 "NODE_NAME": self.op.node_name,
1116 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1117 }
1118
1120 """Build hooks nodes.
1121
1122 """
1123 nl = [self.cfg.GetMasterNode()]
1124 return (nl, nl)
1125
1128
1129 - def Exec(self, feedback_fn):
1130
1131 jobs = [
1132 [opcodes.OpInstanceMigrate(
1133 instance_name=inst.name,
1134 mode=self.op.mode,
1135 live=self.op.live,
1136 iallocator=self.op.iallocator,
1137 target_node=self.op.target_node,
1138 allow_runtime_changes=self.op.allow_runtime_changes,
1139 ignore_ipolicy=self.op.ignore_ipolicy)]
1140 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1141
1142
1143
1144
1145
1146
1147 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1148 frozenset([self.op.node_uuid]))
1149
1150 return ResultWithJobs(jobs)
1151
1152
1167
1168
1170 """Logical unit for modifying a storage volume on a node.
1171
1172 """
1173 REQ_BGL = False
1174
1176 (self.op.node_uuid, self.op.node_name) = \
1177 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1178
1179 storage_type = self.op.storage_type
1180
1181 try:
1182 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1183 except KeyError:
1184 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1185 " modified" % storage_type,
1186 errors.ECODE_INVAL)
1187
1188 diff = set(self.op.changes.keys()) - modifiable
1189 if diff:
1190 raise errors.OpPrereqError("The following fields can not be modified for"
1191 " storage units of type '%s': %r" %
1192 (storage_type, list(diff)),
1193 errors.ECODE_INVAL)
1194
1200
1202 self.needed_locks = {
1203 locking.LEVEL_NODE: self.op.node_uuid,
1204 }
1205
1206 - def Exec(self, feedback_fn):
1207 """Computes the list of nodes and their attributes.
1208
1209 """
1210 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1211 result = self.rpc.call_storage_modify(self.op.node_uuid,
1212 self.op.storage_type, st_args,
1213 self.op.name, self.op.changes)
1214 result.Raise("Failed to modify storage unit '%s' on %s" %
1215 (self.op.name, self.op.node_name))
1216
1217
1219 """Checks whether all selected fields are valid according to fields.
1220
1221 @type fields: L{utils.FieldSet}
1222 @param fields: fields set
1223 @type selected: L{utils.FieldSet}
1224 @param selected: fields set
1225
1226 """
1227 delta = fields.NonMatching(selected)
1228 if delta:
1229 raise errors.OpPrereqError("Unknown output fields selected: %s"
1230 % ",".join(delta), errors.ECODE_INVAL)
1231
1232
1234 """Logical unit for getting volumes on node(s).
1235
1236 """
1237 REQ_BGL = False
1238
1244
1257
1258 - def Exec(self, feedback_fn):
1309
1310
1312 """Logical unit for getting information on storage units on node(s).
1313
1314 """
1315 REQ_BGL = False
1316
1320
1333
1335 """Determines the default storage type of the cluster.
1336
1337 """
1338 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1339 default_storage_type = \
1340 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1341 return default_storage_type
1342
1344 """Check prerequisites.
1345
1346 """
1347 if self.op.storage_type:
1348 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1349 self.storage_type = self.op.storage_type
1350 else:
1351 self.storage_type = self._DetermineStorageType()
1352 supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1353 if self.storage_type not in supported_storage_types:
1354 raise errors.OpPrereqError(
1355 "Storage reporting for storage type '%s' is not supported. Please"
1356 " use the --storage-type option to specify one of the supported"
1357 " storage types (%s) or set the default disk template to one that"
1358 " supports storage reporting." %
1359 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1360
1361 - def Exec(self, feedback_fn):
1362 """Computes the list of nodes and their attributes.
1363
1364 """
1365 if self.op.storage_type:
1366 self.storage_type = self.op.storage_type
1367 else:
1368 self.storage_type = self._DetermineStorageType()
1369
1370 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1371
1372
1373 if constants.SF_NAME in self.op.output_fields:
1374 fields = self.op.output_fields[:]
1375 else:
1376 fields = [constants.SF_NAME] + self.op.output_fields
1377
1378
1379 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1380 while extra in fields:
1381 fields.remove(extra)
1382
1383 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1384 name_idx = field_idx[constants.SF_NAME]
1385
1386 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1387 data = self.rpc.call_storage_list(self.node_uuids,
1388 self.storage_type, st_args,
1389 self.op.name, fields)
1390
1391 result = []
1392
1393 for node_uuid in utils.NiceSort(self.node_uuids):
1394 node_name = self.cfg.GetNodeName(node_uuid)
1395 nresult = data[node_uuid]
1396 if nresult.offline:
1397 continue
1398
1399 msg = nresult.fail_msg
1400 if msg:
1401 self.LogWarning("Can't get storage data from node %s: %s",
1402 node_name, msg)
1403 continue
1404
1405 rows = dict([(row[name_idx], row) for row in nresult.payload])
1406
1407 for name in utils.NiceSort(rows.keys()):
1408 row = rows[name]
1409
1410 out = []
1411
1412 for field in self.op.output_fields:
1413 if field == constants.SF_NODE:
1414 val = node_name
1415 elif field == constants.SF_TYPE:
1416 val = self.storage_type
1417 elif field in field_idx:
1418 val = row[field_idx[field]]
1419 else:
1420 raise errors.ParameterError(field)
1421
1422 out.append(val)
1423
1424 result.append(out)
1425
1426 return result
1427
1428
1430 """Logical unit for removing a node.
1431
1432 """
1433 HPATH = "node-remove"
1434 HTYPE = constants.HTYPE_NODE
1435
1437 """Build hooks env.
1438
1439 """
1440 return {
1441 "OP_TARGET": self.op.node_name,
1442 "NODE_NAME": self.op.node_name,
1443 }
1444
1446 """Build hooks nodes.
1447
1448 This doesn't run on the target node in the pre phase as a failed
1449 node would then be impossible to remove.
1450
1451 """
1452 all_nodes = self.cfg.GetNodeList()
1453 try:
1454 all_nodes.remove(self.op.node_uuid)
1455 except ValueError:
1456 pass
1457 return (all_nodes, all_nodes)
1458
1487
1488 - def Exec(self, feedback_fn):
1489 """Removes the node from the cluster.
1490
1491 """
1492 logging.info("Stopping the node daemon and removing configs from node %s",
1493 self.node.name)
1494
1495 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1496
1497 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1498 "Not owning BGL"
1499
1500
1501 AdjustCandidatePool(self, [self.node.uuid])
1502 self.context.RemoveNode(self.cfg, self.node)
1503
1504
1505 RunPostHook(self, self.node.name)
1506
1507
1508
1509 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1510 msg = result.fail_msg
1511 if msg:
1512 self.LogWarning("Errors encountered on the remote node while leaving"
1513 " the cluster: %s", msg)
1514
1515 cluster = self.cfg.GetClusterInfo()
1516
1517
1518 if self.node.master_candidate:
1519 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid)
1520
1521
1522 if cluster.modify_etc_hosts:
1523 master_node_uuid = self.cfg.GetMasterNode()
1524 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1525 constants.ETC_HOSTS_REMOVE,
1526 self.node.name, None)
1527 result.Raise("Can't update hosts file with new host data")
1528 RedistributeAncillaryFiles(self)
1529
1530
1532 """Repairs the volume group on a node.
1533
1534 """
1535 REQ_BGL = False
1536
1548
1550 self.needed_locks = {
1551 locking.LEVEL_NODE: [self.op.node_uuid],
1552 }
1553
1569
1571 """Check prerequisites.
1572
1573 """
1574 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1575
1576
1577 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1578 if not inst.disks_active:
1579 continue
1580 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid))
1581 check_nodes.discard(self.op.node_uuid)
1582 for inst_node_uuid in check_nodes:
1583 self._CheckFaultyDisks(inst, inst_node_uuid)
1584
1585 - def Exec(self, feedback_fn):
1596