1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with nodes."""
32
33 import logging
34 import operator
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import netutils
40 from ganeti import objects
41 from ganeti import opcodes
42 from ganeti import qlang
43 from ganeti import query
44 from ganeti import rpc
45 from ganeti import utils
46 from ganeti.masterd import iallocator
47
48 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
49 ResultWithJobs
50 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
51 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
52 IsExclusiveStorageEnabledNode, CheckNodePVs, \
53 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
54 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
55 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
56 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
57 FindFaultyInstanceDisks, CheckStorageTypeEnabled
58
59
69
70
72 """Ensure that a node has the given secondary ip.
73
74 @type lu: L{LogicalUnit}
75 @param lu: the LU on behalf of which we make the check
76 @type node: L{objects.Node}
77 @param node: the node to check
78 @type secondary_ip: string
79 @param secondary_ip: the ip to check
80 @type prereq: boolean
81 @param prereq: whether to throw a prerequisite or an execute error
82 @raise errors.OpPrereqError: if the node doesn't have the ip,
83 and prereq=True
84 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
85
86 """
87
88
89 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
90 result.Raise("Failure checking secondary ip on node %s" % node.name,
91 prereq=prereq, ecode=errors.ECODE_ENVIRON)
92 if not result.payload:
93 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
94 " please fix and re-run this command" % secondary_ip)
95 if prereq:
96 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
97 else:
98 raise errors.OpExecError(msg)
99
100
102 """Logical unit for adding node to the cluster.
103
104 """
105 HPATH = "node-add"
106 HTYPE = constants.HTYPE_NODE
107 _NFLAGS = ["master_capable", "vm_capable"]
108
123
125 """Build hooks env.
126
127 This will run on all nodes before, and on all nodes + the new node after.
128
129 """
130 return {
131 "OP_TARGET": self.op.node_name,
132 "NODE_NAME": self.op.node_name,
133 "NODE_PIP": self.op.primary_ip,
134 "NODE_SIP": self.op.secondary_ip,
135 "MASTER_CAPABLE": str(self.op.master_capable),
136 "VM_CAPABLE": str(self.op.vm_capable),
137 }
138
140 """Build hooks nodes.
141
142 """
143 hook_nodes = self.cfg.GetNodeList()
144 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
145 if new_node_info is not None:
146
147 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
148
149
150 return (hook_nodes, hook_nodes)
151
152 - def PreparePostHookNodes(self, post_hook_node_uuids):
153 return post_hook_node_uuids + [self.new_node.uuid]
154
156 """Check prerequisites.
157
158 This checks:
159 - the new node is not already in the config
160 - it is resolvable
161 - its parameters (single/dual homed) matches the cluster
162
163 Any errors are signaled by raising errors.OpPrereqError.
164
165 """
166 node_name = self.hostname.name
167 self.op.primary_ip = self.hostname.ip
168 if self.op.secondary_ip is None:
169 if self.primary_ip_family == netutils.IP6Address.family:
170 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
171 " IPv4 address must be given as secondary",
172 errors.ECODE_INVAL)
173 self.op.secondary_ip = self.op.primary_ip
174
175 secondary_ip = self.op.secondary_ip
176 if not netutils.IP4Address.IsValid(secondary_ip):
177 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
178 " address" % secondary_ip, errors.ECODE_INVAL)
179
180 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
181 if not self.op.readd and existing_node_info is not None:
182 raise errors.OpPrereqError("Node %s is already in the configuration" %
183 node_name, errors.ECODE_EXISTS)
184 elif self.op.readd and existing_node_info is None:
185 raise errors.OpPrereqError("Node %s is not in the configuration" %
186 node_name, errors.ECODE_NOENT)
187
188 self.changed_primary_ip = False
189
190 for existing_node in self.cfg.GetAllNodesInfo().values():
191 if self.op.readd and node_name == existing_node.name:
192 if existing_node.secondary_ip != secondary_ip:
193 raise errors.OpPrereqError("Readded node doesn't have the same IP"
194 " address configuration as before",
195 errors.ECODE_INVAL)
196 if existing_node.primary_ip != self.op.primary_ip:
197 self.changed_primary_ip = True
198
199 continue
200
201 if (existing_node.primary_ip == self.op.primary_ip or
202 existing_node.secondary_ip == self.op.primary_ip or
203 existing_node.primary_ip == secondary_ip or
204 existing_node.secondary_ip == secondary_ip):
205 raise errors.OpPrereqError("New node ip address(es) conflict with"
206 " existing node %s" % existing_node.name,
207 errors.ECODE_NOTUNIQUE)
208
209
210
211 if self.op.readd:
212 assert existing_node_info is not None, \
213 "Can't retrieve locked node %s" % node_name
214 for attr in self._NFLAGS:
215 if getattr(self.op, attr) is None:
216 setattr(self.op, attr, getattr(existing_node_info, attr))
217 else:
218 for attr in self._NFLAGS:
219 if getattr(self.op, attr) is None:
220 setattr(self.op, attr, True)
221
222 if self.op.readd and not self.op.vm_capable:
223 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
224 if pri or sec:
225 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
226 " flag set to false, but it already holds"
227 " instances" % node_name,
228 errors.ECODE_STATE)
229
230
231
232 myself = self.cfg.GetMasterNodeInfo()
233 master_singlehomed = myself.secondary_ip == myself.primary_ip
234 newbie_singlehomed = secondary_ip == self.op.primary_ip
235 if master_singlehomed != newbie_singlehomed:
236 if master_singlehomed:
237 raise errors.OpPrereqError("The master has no secondary ip but the"
238 " new node has one",
239 errors.ECODE_INVAL)
240 else:
241 raise errors.OpPrereqError("The master has a secondary ip but the"
242 " new node doesn't have one",
243 errors.ECODE_INVAL)
244
245
246 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
247 raise errors.OpPrereqError("Node not reachable by ping",
248 errors.ECODE_ENVIRON)
249
250 if not newbie_singlehomed:
251
252 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
253 source=myself.secondary_ip):
254 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
255 " based ping to node daemon port",
256 errors.ECODE_ENVIRON)
257
258 if self.op.readd:
259 exceptions = [existing_node_info.uuid]
260 else:
261 exceptions = []
262
263 if self.op.master_capable:
264 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
265 else:
266 self.master_candidate = False
267
268 if self.op.readd:
269 self.new_node = existing_node_info
270 else:
271 node_group = self.cfg.LookupNodeGroup(self.op.group)
272 self.new_node = objects.Node(name=node_name,
273 primary_ip=self.op.primary_ip,
274 secondary_ip=secondary_ip,
275 master_candidate=self.master_candidate,
276 offline=False, drained=False,
277 group=node_group, ndparams={})
278
279 if self.op.ndparams:
280 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
281 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
282 "node", "cluster or group")
283
284 if self.op.hv_state:
285 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
286
287 if self.op.disk_state:
288 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
289
290
291
292 rpcrunner = rpc.DnsOnlyRunner()
293 result = rpcrunner.call_version([node_name])[node_name]
294 result.Raise("Can't get version information from node %s" % node_name,
295 prereq=True)
296 if constants.PROTOCOL_VERSION == result.payload:
297 logging.info("Communication to node %s fine, sw version %s match",
298 node_name, result.payload)
299 else:
300 raise errors.OpPrereqError("Version mismatch master version %s,"
301 " node version %s" %
302 (constants.PROTOCOL_VERSION, result.payload),
303 errors.ECODE_ENVIRON)
304
305 vg_name = self.cfg.GetVGName()
306 if vg_name is not None:
307 vparams = {constants.NV_PVLIST: [vg_name]}
308 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
309 cname = self.cfg.GetClusterName()
310 result = rpcrunner.call_node_verify_light(
311 [node_name], vparams, cname,
312 self.cfg.GetClusterInfo().hvparams)[node_name]
313 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
314 if errmsgs:
315 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
316 "; ".join(errmsgs), errors.ECODE_ENVIRON)
317
335
336 - def Exec(self, feedback_fn):
337 """Adds the new node to the cluster.
338
339 """
340 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
341 "Not owning BGL"
342
343
344 self.new_node.powered = True
345
346
347
348
349
350 if self.op.readd:
351 self.new_node.offline = False
352 self.new_node.drained = False
353 self.LogInfo("Readding a node, the offline/drained flags were reset")
354
355 self.new_node.master_candidate = self.master_candidate
356 if self.changed_primary_ip:
357 self.new_node.primary_ip = self.op.primary_ip
358
359
360 for attr in self._NFLAGS:
361 setattr(self.new_node, attr, getattr(self.op, attr))
362
363
364 if self.new_node.master_candidate:
365 self.LogInfo("Node will be a master candidate")
366
367 if self.op.ndparams:
368 self.new_node.ndparams = self.op.ndparams
369 else:
370 self.new_node.ndparams = {}
371
372 if self.op.hv_state:
373 self.new_node.hv_state_static = self.new_hv_state
374
375 if self.op.disk_state:
376 self.new_node.disk_state_static = self.new_disk_state
377
378
379 if self.cfg.GetClusterInfo().modify_etc_hosts:
380 master_node = self.cfg.GetMasterNode()
381 result = self.rpc.call_etc_hosts_modify(
382 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
383 self.hostname.ip)
384 result.Raise("Can't update hosts file with new host data")
385
386 if self.new_node.secondary_ip != self.new_node.primary_ip:
387 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
388 False)
389
390 node_verifier_uuids = [self.cfg.GetMasterNode()]
391 node_verify_param = {
392 constants.NV_NODELIST: ([self.new_node.name], {}),
393
394 }
395
396 result = self.rpc.call_node_verify(
397 node_verifier_uuids, node_verify_param,
398 self.cfg.GetClusterName(),
399 self.cfg.GetClusterInfo().hvparams)
400 for verifier in node_verifier_uuids:
401 result[verifier].Raise("Cannot communicate with node %s" % verifier)
402 nl_payload = result[verifier].payload[constants.NV_NODELIST]
403 if nl_payload:
404 for failed in nl_payload:
405 feedback_fn("ssh/hostname verification failed"
406 " (checking from %s): %s" %
407 (verifier, nl_payload[failed]))
408 raise errors.OpExecError("ssh/hostname verification failed")
409
410 self._InitOpenVSwitch()
411
412 if self.op.readd:
413 self.context.ReaddNode(self.new_node)
414 RedistributeAncillaryFiles(self)
415
416 self.cfg.Update(self.new_node, feedback_fn)
417
418 if not self.new_node.master_candidate:
419 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
420 result.Warn("Node failed to demote itself from master candidate status",
421 self.LogWarning)
422 else:
423 self.context.AddNode(self.new_node, self.proc.GetECId())
424 RedistributeAncillaryFiles(self)
425
426
428 """Modifies the parameters of a node.
429
430 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
431 to the node role (as _ROLE_*)
432 @cvar _R2F: a dictionary from node role to tuples of flags
433 @cvar _FLAGS: a list of attribute names corresponding to the flags
434
435 """
436 HPATH = "node-modify"
437 HTYPE = constants.HTYPE_NODE
438 REQ_BGL = False
439 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
440 _F2R = {
441 (True, False, False): _ROLE_CANDIDATE,
442 (False, True, False): _ROLE_DRAINED,
443 (False, False, True): _ROLE_OFFLINE,
444 (False, False, False): _ROLE_REGULAR,
445 }
446 _R2F = dict((v, k) for k, v in _F2R.items())
447 _FLAGS = ["master_candidate", "drained", "offline"]
448
450 (self.op.node_uuid, self.op.node_name) = \
451 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
452 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
453 self.op.master_capable, self.op.vm_capable,
454 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
455 self.op.disk_state]
456 if all_mods.count(None) == len(all_mods):
457 raise errors.OpPrereqError("Please pass at least one modification",
458 errors.ECODE_INVAL)
459 if all_mods.count(True) > 1:
460 raise errors.OpPrereqError("Can't set the node into more than one"
461 " state at the same time",
462 errors.ECODE_INVAL)
463
464
465 self.might_demote = (self.op.master_candidate is False or
466 self.op.offline is True or
467 self.op.drained is True or
468 self.op.master_capable is False)
469
470 if self.op.secondary_ip:
471 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
472 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
473 " address" % self.op.secondary_ip,
474 errors.ECODE_INVAL)
475
476 self.lock_all = self.op.auto_promote and self.might_demote
477 self.lock_instances = self.op.secondary_ip is not None
478
485
515
517 """Build hooks env.
518
519 This runs on the master node.
520
521 """
522 return {
523 "OP_TARGET": self.op.node_name,
524 "MASTER_CANDIDATE": str(self.op.master_candidate),
525 "OFFLINE": str(self.op.offline),
526 "DRAINED": str(self.op.drained),
527 "MASTER_CAPABLE": str(self.op.master_capable),
528 "VM_CAPABLE": str(self.op.vm_capable),
529 }
530
532 """Build hooks nodes.
533
534 """
535 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
536 return (nl, nl)
537
539 """Check prerequisites.
540
541 This only checks the instance list against the existing names.
542
543 """
544 node = self.cfg.GetNodeInfo(self.op.node_uuid)
545 if self.lock_instances:
546 affected_instances = \
547 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
548
549
550 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
551 wanted_instance_names = frozenset([inst.name for inst in
552 affected_instances.values()])
553 if wanted_instance_names - owned_instance_names:
554 raise errors.OpPrereqError("Instances affected by changing node %s's"
555 " secondary IP address have changed since"
556 " locks were acquired, wanted '%s', have"
557 " '%s'; retry the operation" %
558 (node.name,
559 utils.CommaJoin(wanted_instance_names),
560 utils.CommaJoin(owned_instance_names)),
561 errors.ECODE_STATE)
562 else:
563 affected_instances = None
564
565 if (self.op.master_candidate is not None or
566 self.op.drained is not None or
567 self.op.offline is not None):
568
569 if node.uuid == self.cfg.GetMasterNode():
570 raise errors.OpPrereqError("The master role can be changed"
571 " only via master-failover",
572 errors.ECODE_INVAL)
573
574 if self.op.master_candidate and not node.master_capable:
575 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
576 " it a master candidate" % node.name,
577 errors.ECODE_STATE)
578
579 if self.op.vm_capable is False:
580 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
581 if ipri or isec:
582 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
583 " the vm_capable flag" % node.name,
584 errors.ECODE_STATE)
585
586 if node.master_candidate and self.might_demote and not self.lock_all:
587 assert not self.op.auto_promote, "auto_promote set but lock_all not"
588
589
590 (mc_remaining, mc_should, _) = \
591 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
592 if mc_remaining < mc_should:
593 raise errors.OpPrereqError("Not enough master candidates, please"
594 " pass auto promote option to allow"
595 " promotion (--auto-promote or RAPI"
596 " auto_promote=True)", errors.ECODE_STATE)
597
598 self.old_flags = old_flags = (node.master_candidate,
599 node.drained, node.offline)
600 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
601 self.old_role = old_role = self._F2R[old_flags]
602
603
604 for attr in self._FLAGS:
605 if getattr(self.op, attr) is False and getattr(node, attr) is False:
606 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
607 setattr(self.op, attr, None)
608
609
610
611
612
613 if SupportsOob(self.cfg, node):
614 if self.op.offline is False and not (node.powered or
615 self.op.powered is True):
616 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
617 " offline status can be reset") %
618 self.op.node_name, errors.ECODE_STATE)
619 elif self.op.powered is not None:
620 raise errors.OpPrereqError(("Unable to change powered state for node %s"
621 " as it does not support out-of-band"
622 " handling") % self.op.node_name,
623 errors.ECODE_STATE)
624
625
626 if (self.op.drained is False or self.op.offline is False or
627 (self.op.master_capable and not node.master_capable)):
628 if _DecideSelfPromotion(self):
629 self.op.master_candidate = True
630 self.LogInfo("Auto-promoting node to master candidate")
631
632
633 if self.op.master_capable is False and node.master_candidate:
634 self.LogInfo("Demoting from master candidate")
635 self.op.master_candidate = False
636
637
638 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
639 if self.op.master_candidate:
640 new_role = self._ROLE_CANDIDATE
641 elif self.op.drained:
642 new_role = self._ROLE_DRAINED
643 elif self.op.offline:
644 new_role = self._ROLE_OFFLINE
645 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
646
647
648 new_role = self._ROLE_REGULAR
649 else:
650 new_role = old_role
651
652 self.new_role = new_role
653
654 if old_role == self._ROLE_OFFLINE and new_role != old_role:
655
656 result = self.rpc.call_version([node.uuid])[node.uuid]
657 if result.fail_msg:
658 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
659 " to report its version: %s" %
660 (node.name, result.fail_msg),
661 errors.ECODE_STATE)
662 else:
663 self.LogWarning("Transitioning node from offline to online state"
664 " without using re-add. Please make sure the node"
665 " is healthy!")
666
667
668
669
670 if self.op.secondary_ip:
671
672 master = self.cfg.GetMasterNodeInfo()
673 master_singlehomed = master.secondary_ip == master.primary_ip
674 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
675 if self.op.force and node.uuid == master.uuid:
676 self.LogWarning("Transitioning from single-homed to multi-homed"
677 " cluster; all nodes will require a secondary IP"
678 " address")
679 else:
680 raise errors.OpPrereqError("Changing the secondary ip on a"
681 " single-homed cluster requires the"
682 " --force option to be passed, and the"
683 " target node to be the master",
684 errors.ECODE_INVAL)
685 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
686 if self.op.force and node.uuid == master.uuid:
687 self.LogWarning("Transitioning from multi-homed to single-homed"
688 " cluster; secondary IP addresses will have to be"
689 " removed")
690 else:
691 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
692 " same as the primary IP on a multi-homed"
693 " cluster, unless the --force option is"
694 " passed, and the target node is the"
695 " master", errors.ECODE_INVAL)
696
697 assert not (set([inst.name for inst in affected_instances.values()]) -
698 self.owned_locks(locking.LEVEL_INSTANCE))
699
700 if node.offline:
701 if affected_instances:
702 msg = ("Cannot change secondary IP address: offline node has"
703 " instances (%s) configured to use it" %
704 utils.CommaJoin(
705 [inst.name for inst in affected_instances.values()]))
706 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
707 else:
708
709
710 for instance in affected_instances.values():
711 CheckInstanceState(self, instance, INSTANCE_DOWN,
712 msg="cannot change secondary ip")
713
714 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
715 if master.uuid != node.uuid:
716
717 if not netutils.TcpPing(self.op.secondary_ip,
718 constants.DEFAULT_NODED_PORT,
719 source=master.secondary_ip):
720 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
721 " based ping to node daemon port",
722 errors.ECODE_ENVIRON)
723
724 if self.op.ndparams:
725 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
726 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
727 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
728 "node", "cluster or group")
729 self.new_ndparams = new_ndparams
730
731 if self.op.hv_state:
732 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
733 node.hv_state_static)
734
735 if self.op.disk_state:
736 self.new_disk_state = \
737 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
738
739 - def Exec(self, feedback_fn):
740 """Modifies a node.
741
742 """
743 node = self.cfg.GetNodeInfo(self.op.node_uuid)
744 result = []
745
746 if self.op.ndparams:
747 node.ndparams = self.new_ndparams
748
749 if self.op.powered is not None:
750 node.powered = self.op.powered
751
752 if self.op.hv_state:
753 node.hv_state_static = self.new_hv_state
754
755 if self.op.disk_state:
756 node.disk_state_static = self.new_disk_state
757
758 for attr in ["master_capable", "vm_capable"]:
759 val = getattr(self.op, attr)
760 if val is not None:
761 setattr(node, attr, val)
762 result.append((attr, str(val)))
763
764 if self.new_role != self.old_role:
765
766 if self.old_role == self._ROLE_CANDIDATE and \
767 self.new_role != self._ROLE_OFFLINE:
768 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
769 if msg:
770 self.LogWarning("Node failed to demote itself: %s", msg)
771
772 new_flags = self._R2F[self.new_role]
773 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
774 if of != nf:
775 result.append((desc, str(nf)))
776 (node.master_candidate, node.drained, node.offline) = new_flags
777
778
779 if self.lock_all:
780 AdjustCandidatePool(self, [node.uuid])
781
782 if self.op.secondary_ip:
783 node.secondary_ip = self.op.secondary_ip
784 result.append(("secondary_ip", self.op.secondary_ip))
785
786
787 self.cfg.Update(node, feedback_fn)
788
789
790
791 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
792 self.context.ReaddNode(node)
793
794 return result
795
796
798 """Powercycles a node.
799
800 """
801 REQ_BGL = False
802
811
813 """Locking for PowercycleNode.
814
815 This is a last-resort option and shouldn't block on other
816 jobs. Therefore, we grab no locks.
817
818 """
819 self.needed_locks = {}
820
821 - def Exec(self, feedback_fn):
822 """Reboots a node.
823
824 """
825 default_hypervisor = self.cfg.GetHypervisorType()
826 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
827 result = self.rpc.call_node_powercycle(self.op.node_uuid,
828 default_hypervisor,
829 hvparams)
830 result.Raise("Failed to schedule the reboot")
831 return result.payload
832
833
836
837
839 """Returns primary instances on a node.
840
841 """
842 return _GetNodeInstancesInner(cfg,
843 lambda inst: node_uuid == inst.primary_node)
844
845
852
853
855 """Returns a list of all primary and secondary instances on a node.
856
857 """
858
859 return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
860
861
863 """Evacuates instances off a list of nodes.
864
865 """
866 REQ_BGL = False
867
870
872 (self.op.node_uuid, self.op.node_name) = \
873 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
874
875 if self.op.remote_node is not None:
876 (self.op.remote_node_uuid, self.op.remote_node) = \
877 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
878 self.op.remote_node)
879 assert self.op.remote_node
880
881 if self.op.node_uuid == self.op.remote_node_uuid:
882 raise errors.OpPrereqError("Can not use evacuated node as a new"
883 " secondary node", errors.ECODE_INVAL)
884
885 if self.op.mode != constants.NODE_EVAC_SEC:
886 raise errors.OpPrereqError("Without the use of an iallocator only"
887 " secondary instances can be evacuated",
888 errors.ECODE_INVAL)
889
890
891 self.share_locks = ShareAll()
892 self.needed_locks = {
893 locking.LEVEL_INSTANCE: [],
894 locking.LEVEL_NODEGROUP: [],
895 locking.LEVEL_NODE: [],
896 }
897
898
899
900 self.lock_nodes = self._DetermineNodes()
901
903 """Gets the list of node UUIDs to operate on.
904
905 """
906 if self.op.remote_node is None:
907
908 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
909 else:
910 group_nodes = frozenset([self.op.remote_node_uuid])
911
912
913 return set([self.op.node_uuid]) | group_nodes
914
943
959
961
962 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
963 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
964 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
965
966 need_nodes = self._DetermineNodes()
967
968 if not owned_nodes.issuperset(need_nodes):
969 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
970 " locks were acquired, current nodes are"
971 " are '%s', used to be '%s'; retry the"
972 " operation" %
973 (self.op.node_name,
974 utils.CommaJoin(need_nodes),
975 utils.CommaJoin(owned_nodes)),
976 errors.ECODE_STATE)
977
978 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
979 if owned_groups != wanted_groups:
980 raise errors.OpExecError("Node groups changed since locks were acquired,"
981 " current groups are '%s', used to be '%s';"
982 " retry the operation" %
983 (utils.CommaJoin(wanted_groups),
984 utils.CommaJoin(owned_groups)))
985
986
987 self.instances = self._DetermineInstances()
988 self.instance_names = [i.name for i in self.instances]
989
990 if set(self.instance_names) != owned_instance_names:
991 raise errors.OpExecError("Instances on node '%s' changed since locks"
992 " were acquired, current instances are '%s',"
993 " used to be '%s'; retry the operation" %
994 (self.op.node_name,
995 utils.CommaJoin(self.instance_names),
996 utils.CommaJoin(owned_instance_names)))
997
998 if self.instance_names:
999 self.LogInfo("Evacuating instances from node '%s': %s",
1000 self.op.node_name,
1001 utils.CommaJoin(utils.NiceSort(self.instance_names)))
1002 else:
1003 self.LogInfo("No instances to evacuate from node '%s'",
1004 self.op.node_name)
1005
1006 if self.op.remote_node is not None:
1007 for i in self.instances:
1008 if i.primary_node == self.op.remote_node_uuid:
1009 raise errors.OpPrereqError("Node %s is the primary node of"
1010 " instance %s, cannot use it as"
1011 " secondary" %
1012 (self.op.remote_node, i.name),
1013 errors.ECODE_INVAL)
1014
1015 - def Exec(self, feedback_fn):
1016 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1017
1018 if not self.instance_names:
1019
1020 jobs = []
1021
1022 elif self.op.iallocator is not None:
1023
1024 req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1025 instances=list(self.instance_names))
1026 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1027
1028 ial.Run(self.op.iallocator)
1029
1030 if not ial.success:
1031 raise errors.OpPrereqError("Can't compute node evacuation using"
1032 " iallocator '%s': %s" %
1033 (self.op.iallocator, ial.info),
1034 errors.ECODE_NORES)
1035
1036 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1037
1038 elif self.op.remote_node is not None:
1039 assert self.op.mode == constants.NODE_EVAC_SEC
1040 jobs = [
1041 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1042 remote_node=self.op.remote_node,
1043 disks=[],
1044 mode=constants.REPLACE_DISK_CHG,
1045 early_release=self.op.early_release)]
1046 for instance_name in self.instance_names]
1047
1048 else:
1049 raise errors.ProgrammerError("No iallocator or remote node")
1050
1051 return ResultWithJobs(jobs)
1052
1053
1055 """Migrate all instances from a node.
1056
1057 """
1058 HPATH = "node-migrate"
1059 HTYPE = constants.HTYPE_NODE
1060 REQ_BGL = False
1061
1064
1066 (self.op.node_uuid, self.op.node_name) = \
1067 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1068
1069 self.share_locks = ShareAll()
1070 self.needed_locks = {
1071 locking.LEVEL_NODE: [self.op.node_uuid],
1072 }
1073
1075 """Build hooks env.
1076
1077 This runs on the master, the primary and all the secondaries.
1078
1079 """
1080 return {
1081 "NODE_NAME": self.op.node_name,
1082 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1083 }
1084
1086 """Build hooks nodes.
1087
1088 """
1089 nl = [self.cfg.GetMasterNode()]
1090 return (nl, nl)
1091
1094
1095 - def Exec(self, feedback_fn):
1096
1097 jobs = [
1098 [opcodes.OpInstanceMigrate(
1099 instance_name=inst.name,
1100 mode=self.op.mode,
1101 live=self.op.live,
1102 iallocator=self.op.iallocator,
1103 target_node=self.op.target_node,
1104 allow_runtime_changes=self.op.allow_runtime_changes,
1105 ignore_ipolicy=self.op.ignore_ipolicy)]
1106 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1107
1108
1109
1110
1111
1112
1113 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1114 frozenset([self.op.node_uuid]))
1115
1116 return ResultWithJobs(jobs)
1117
1118
1129
1130
1132 """Logical unit for modifying a storage volume on a node.
1133
1134 """
1135 REQ_BGL = False
1136
1138 (self.op.node_uuid, self.op.node_name) = \
1139 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1140
1141 storage_type = self.op.storage_type
1142
1143 try:
1144 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1145 except KeyError:
1146 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1147 " modified" % storage_type,
1148 errors.ECODE_INVAL)
1149
1150 diff = set(self.op.changes.keys()) - modifiable
1151 if diff:
1152 raise errors.OpPrereqError("The following fields can not be modified for"
1153 " storage units of type '%s': %r" %
1154 (storage_type, list(diff)),
1155 errors.ECODE_INVAL)
1156
1162
1164 self.needed_locks = {
1165 locking.LEVEL_NODE: self.op.node_uuid,
1166 }
1167
1168 - def Exec(self, feedback_fn):
1169 """Computes the list of nodes and their attributes.
1170
1171 """
1172 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1173 result = self.rpc.call_storage_modify(self.op.node_uuid,
1174 self.op.storage_type, st_args,
1175 self.op.name, self.op.changes)
1176 result.Raise("Failed to modify storage unit '%s' on %s" %
1177 (self.op.name, self.op.node_name))
1178
1179
1181 FIELDS = query.NODE_FIELDS
1182
1184 lu.needed_locks = {}
1185 lu.share_locks = ShareAll()
1186
1187 if self.names:
1188 (self.wanted, _) = GetWantedNodes(lu, self.names)
1189 else:
1190 self.wanted = locking.ALL_SET
1191
1192 self.do_locking = (self.use_locking and
1193 query.NQ_LIVE in self.requested_data)
1194
1195 if self.do_locking:
1196
1197 lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1198 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1199
1202
1204 """Computes the list of nodes and their attributes.
1205
1206 """
1207 all_info = lu.cfg.GetAllNodesInfo()
1208
1209 node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1210
1211
1212 if query.NQ_LIVE in self.requested_data:
1213
1214 toquery_node_uuids = [node.uuid for node in all_info.values()
1215 if node.vm_capable and node.uuid in node_uuids]
1216 default_template = lu.cfg.GetClusterInfo().enabled_disk_templates[0]
1217 raw_storage_units = utils.storage.GetStorageUnits(
1218 lu.cfg, [default_template])
1219 storage_units = rpc.PrepareStorageUnitsForNodes(
1220 lu.cfg, raw_storage_units, toquery_node_uuids)
1221 default_hypervisor = lu.cfg.GetHypervisorType()
1222 hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1223 hvspecs = [(default_hypervisor, hvparams)]
1224 node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1225 hvspecs)
1226 live_data = dict(
1227 (uuid, rpc.MakeLegacyNodeInfo(nresult.payload, default_template))
1228 for (uuid, nresult) in node_data.items()
1229 if not nresult.fail_msg and nresult.payload)
1230 else:
1231 live_data = None
1232
1233 if query.NQ_INST in self.requested_data:
1234 node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1235 node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1236
1237 inst_data = lu.cfg.GetAllInstancesInfo()
1238 inst_uuid_to_inst_name = {}
1239
1240 for inst in inst_data.values():
1241 inst_uuid_to_inst_name[inst.uuid] = inst.name
1242 if inst.primary_node in node_to_primary:
1243 node_to_primary[inst.primary_node].add(inst.uuid)
1244 for secnode in inst.secondary_nodes:
1245 if secnode in node_to_secondary:
1246 node_to_secondary[secnode].add(inst.uuid)
1247 else:
1248 node_to_primary = None
1249 node_to_secondary = None
1250 inst_uuid_to_inst_name = None
1251
1252 if query.NQ_OOB in self.requested_data:
1253 oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1254 for uuid, node in all_info.iteritems())
1255 else:
1256 oob_support = None
1257
1258 if query.NQ_GROUP in self.requested_data:
1259 groups = lu.cfg.GetAllNodeGroupsInfo()
1260 else:
1261 groups = {}
1262
1263 return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1264 live_data, lu.cfg.GetMasterNode(),
1265 node_to_primary, node_to_secondary,
1266 inst_uuid_to_inst_name, groups, oob_support,
1267 lu.cfg.GetClusterInfo())
1268
1269
1271 """Logical unit for querying nodes.
1272
1273 """
1274
1275 REQ_BGL = False
1276
1280
1283
1286
1287 - def Exec(self, feedback_fn):
1289
1290
1292 """Checks whether all selected fields are valid according to fields.
1293
1294 @type fields: L{utils.FieldSet}
1295 @param fields: fields set
1296 @type selected: L{utils.FieldSet}
1297 @param selected: fields set
1298
1299 """
1300 delta = fields.NonMatching(selected)
1301 if delta:
1302 raise errors.OpPrereqError("Unknown output fields selected: %s"
1303 % ",".join(delta), errors.ECODE_INVAL)
1304
1305
1307 """Logical unit for getting volumes on node(s).
1308
1309 """
1310 REQ_BGL = False
1311
1317
1330
1331 - def Exec(self, feedback_fn):
1382
1383
1385 """Logical unit for getting information on storage units on node(s).
1386
1387 """
1388 REQ_BGL = False
1389
1393
1406
1408 """Determines the default storage type of the cluster.
1409
1410 """
1411 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1412 default_storage_type = \
1413 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1414 return default_storage_type
1415
1417 """Check prerequisites.
1418
1419 """
1420 if self.op.storage_type:
1421 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1422 self.storage_type = self.op.storage_type
1423 else:
1424 self.storage_type = self._DetermineStorageType()
1425 if self.storage_type not in constants.STS_REPORT:
1426 raise errors.OpPrereqError(
1427 "Storage reporting for storage type '%s' is not supported. Please"
1428 " use the --storage-type option to specify one of the supported"
1429 " storage types (%s) or set the default disk template to one that"
1430 " supports storage reporting." %
1431 (self.storage_type, utils.CommaJoin(constants.STS_REPORT)))
1432
1433 - def Exec(self, feedback_fn):
1434 """Computes the list of nodes and their attributes.
1435
1436 """
1437 if self.op.storage_type:
1438 self.storage_type = self.op.storage_type
1439 else:
1440 self.storage_type = self._DetermineStorageType()
1441
1442 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1443
1444
1445 if constants.SF_NAME in self.op.output_fields:
1446 fields = self.op.output_fields[:]
1447 else:
1448 fields = [constants.SF_NAME] + self.op.output_fields
1449
1450
1451 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1452 while extra in fields:
1453 fields.remove(extra)
1454
1455 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1456 name_idx = field_idx[constants.SF_NAME]
1457
1458 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1459 data = self.rpc.call_storage_list(self.node_uuids,
1460 self.storage_type, st_args,
1461 self.op.name, fields)
1462
1463 result = []
1464
1465 for node_uuid in utils.NiceSort(self.node_uuids):
1466 node_name = self.cfg.GetNodeName(node_uuid)
1467 nresult = data[node_uuid]
1468 if nresult.offline:
1469 continue
1470
1471 msg = nresult.fail_msg
1472 if msg:
1473 self.LogWarning("Can't get storage data from node %s: %s",
1474 node_name, msg)
1475 continue
1476
1477 rows = dict([(row[name_idx], row) for row in nresult.payload])
1478
1479 for name in utils.NiceSort(rows.keys()):
1480 row = rows[name]
1481
1482 out = []
1483
1484 for field in self.op.output_fields:
1485 if field == constants.SF_NODE:
1486 val = node_name
1487 elif field == constants.SF_TYPE:
1488 val = self.storage_type
1489 elif field in field_idx:
1490 val = row[field_idx[field]]
1491 else:
1492 raise errors.ParameterError(field)
1493
1494 out.append(val)
1495
1496 result.append(out)
1497
1498 return result
1499
1500
1502 """Logical unit for removing a node.
1503
1504 """
1505 HPATH = "node-remove"
1506 HTYPE = constants.HTYPE_NODE
1507
1509 """Build hooks env.
1510
1511 """
1512 return {
1513 "OP_TARGET": self.op.node_name,
1514 "NODE_NAME": self.op.node_name,
1515 }
1516
1518 """Build hooks nodes.
1519
1520 This doesn't run on the target node in the pre phase as a failed
1521 node would then be impossible to remove.
1522
1523 """
1524 all_nodes = self.cfg.GetNodeList()
1525 try:
1526 all_nodes.remove(self.op.node_uuid)
1527 except ValueError:
1528 pass
1529 return (all_nodes, all_nodes)
1530
1559
1560 - def Exec(self, feedback_fn):
1595
1596
1598 """Repairs the volume group on a node.
1599
1600 """
1601 REQ_BGL = False
1602
1614
1616 self.needed_locks = {
1617 locking.LEVEL_NODE: [self.op.node_uuid],
1618 }
1619
1635
1637 """Check prerequisites.
1638
1639 """
1640 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1641
1642
1643 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1644 if not inst.disks_active:
1645 continue
1646 check_nodes = set(inst.all_nodes)
1647 check_nodes.discard(self.op.node_uuid)
1648 for inst_node_uuid in check_nodes:
1649 self._CheckFaultyDisks(inst, inst_node_uuid)
1650
1651 - def Exec(self, feedback_fn):
1662