1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Logical units dealing with nodes."""
23
24 import logging
25 import operator
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
38
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40 ResultWithJobs
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43 IsExclusiveStorageEnabledNode, CheckNodePVs, \
44 RedistributeAncillaryFiles, ExpandNodeName, ShareAll, SupportsOob, \
45 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47 GetWantedNodes, MapInstanceDisksToNodes, RunPostHook, \
48 FindFaultyInstanceDisks
49
50
60
61
63 """Ensure that a node has the given secondary ip.
64
65 @type lu: L{LogicalUnit}
66 @param lu: the LU on behalf of which we make the check
67 @type node: string
68 @param node: the node to check
69 @type secondary_ip: string
70 @param secondary_ip: the ip to check
71 @type prereq: boolean
72 @param prereq: whether to throw a prerequisite or an execute error
73 @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
74 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
75
76 """
77 result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
78 result.Raise("Failure checking secondary ip on node %s" % node,
79 prereq=prereq, ecode=errors.ECODE_ENVIRON)
80 if not result.payload:
81 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
82 " please fix and re-run this command" % secondary_ip)
83 if prereq:
84 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
85 else:
86 raise errors.OpExecError(msg)
87
88
90 """Logical unit for adding node to the cluster.
91
92 """
93 HPATH = "node-add"
94 HTYPE = constants.HTYPE_NODE
95 _NFLAGS = ["master_capable", "vm_capable"]
96
111
113 """Build hooks env.
114
115 This will run on all nodes before, and on all nodes + the new node after.
116
117 """
118 return {
119 "OP_TARGET": self.op.node_name,
120 "NODE_NAME": self.op.node_name,
121 "NODE_PIP": self.op.primary_ip,
122 "NODE_SIP": self.op.secondary_ip,
123 "MASTER_CAPABLE": str(self.op.master_capable),
124 "VM_CAPABLE": str(self.op.vm_capable),
125 }
126
128 """Build hooks nodes.
129
130 """
131
132 pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
133 post_nodes = pre_nodes + [self.op.node_name, ]
134
135 return (pre_nodes, post_nodes)
136
138 """Check prerequisites.
139
140 This checks:
141 - the new node is not already in the config
142 - it is resolvable
143 - its parameters (single/dual homed) matches the cluster
144
145 Any errors are signaled by raising errors.OpPrereqError.
146
147 """
148 cfg = self.cfg
149 hostname = self.hostname
150 node = hostname.name
151 primary_ip = self.op.primary_ip = hostname.ip
152 if self.op.secondary_ip is None:
153 if self.primary_ip_family == netutils.IP6Address.family:
154 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
155 " IPv4 address must be given as secondary",
156 errors.ECODE_INVAL)
157 self.op.secondary_ip = primary_ip
158
159 secondary_ip = self.op.secondary_ip
160 if not netutils.IP4Address.IsValid(secondary_ip):
161 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
162 " address" % secondary_ip, errors.ECODE_INVAL)
163
164 node_list = cfg.GetNodeList()
165 if not self.op.readd and node in node_list:
166 raise errors.OpPrereqError("Node %s is already in the configuration" %
167 node, errors.ECODE_EXISTS)
168 elif self.op.readd and node not in node_list:
169 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
170 errors.ECODE_NOENT)
171
172 self.changed_primary_ip = False
173
174 for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
175 if self.op.readd and node == existing_node_name:
176 if existing_node.secondary_ip != secondary_ip:
177 raise errors.OpPrereqError("Readded node doesn't have the same IP"
178 " address configuration as before",
179 errors.ECODE_INVAL)
180 if existing_node.primary_ip != primary_ip:
181 self.changed_primary_ip = True
182
183 continue
184
185 if (existing_node.primary_ip == primary_ip or
186 existing_node.secondary_ip == primary_ip or
187 existing_node.primary_ip == secondary_ip or
188 existing_node.secondary_ip == secondary_ip):
189 raise errors.OpPrereqError("New node ip address(es) conflict with"
190 " existing node %s" % existing_node.name,
191 errors.ECODE_NOTUNIQUE)
192
193
194
195 if self.op.readd:
196 old_node = self.cfg.GetNodeInfo(node)
197 assert old_node is not None, "Can't retrieve locked node %s" % node
198 for attr in self._NFLAGS:
199 if getattr(self.op, attr) is None:
200 setattr(self.op, attr, getattr(old_node, attr))
201 else:
202 for attr in self._NFLAGS:
203 if getattr(self.op, attr) is None:
204 setattr(self.op, attr, True)
205
206 if self.op.readd and not self.op.vm_capable:
207 pri, sec = cfg.GetNodeInstances(node)
208 if pri or sec:
209 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
210 " flag set to false, but it already holds"
211 " instances" % node,
212 errors.ECODE_STATE)
213
214
215
216 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
217 master_singlehomed = myself.secondary_ip == myself.primary_ip
218 newbie_singlehomed = secondary_ip == primary_ip
219 if master_singlehomed != newbie_singlehomed:
220 if master_singlehomed:
221 raise errors.OpPrereqError("The master has no secondary ip but the"
222 " new node has one",
223 errors.ECODE_INVAL)
224 else:
225 raise errors.OpPrereqError("The master has a secondary ip but the"
226 " new node doesn't have one",
227 errors.ECODE_INVAL)
228
229
230 if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
231 raise errors.OpPrereqError("Node not reachable by ping",
232 errors.ECODE_ENVIRON)
233
234 if not newbie_singlehomed:
235
236 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
237 source=myself.secondary_ip):
238 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
239 " based ping to node daemon port",
240 errors.ECODE_ENVIRON)
241
242 if self.op.readd:
243 exceptions = [node]
244 else:
245 exceptions = []
246
247 if self.op.master_capable:
248 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
249 else:
250 self.master_candidate = False
251
252 if self.op.readd:
253 self.new_node = old_node
254 else:
255 node_group = cfg.LookupNodeGroup(self.op.group)
256 self.new_node = objects.Node(name=node,
257 primary_ip=primary_ip,
258 secondary_ip=secondary_ip,
259 master_candidate=self.master_candidate,
260 offline=False, drained=False,
261 group=node_group, ndparams={})
262
263 if self.op.ndparams:
264 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
265 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
266 "node", "cluster or group")
267
268 if self.op.hv_state:
269 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
270
271 if self.op.disk_state:
272 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
273
274
275
276 rpcrunner = rpc.DnsOnlyRunner()
277 result = rpcrunner.call_version([node])[node]
278 result.Raise("Can't get version information from node %s" % node)
279 if constants.PROTOCOL_VERSION == result.payload:
280 logging.info("Communication to node %s fine, sw version %s match",
281 node, result.payload)
282 else:
283 raise errors.OpPrereqError("Version mismatch master version %s,"
284 " node version %s" %
285 (constants.PROTOCOL_VERSION, result.payload),
286 errors.ECODE_ENVIRON)
287
288 vg_name = cfg.GetVGName()
289 if vg_name is not None:
290 vparams = {constants.NV_PVLIST: [vg_name]}
291 excl_stor = IsExclusiveStorageEnabledNode(cfg, self.new_node)
292 cname = self.cfg.GetClusterName()
293 result = rpcrunner.call_node_verify_light([node], vparams, cname)[node]
294 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
295 if errmsgs:
296 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
297 "; ".join(errmsgs), errors.ECODE_ENVIRON)
298
299 - def Exec(self, feedback_fn):
300 """Adds the new node to the cluster.
301
302 """
303 new_node = self.new_node
304 node = new_node.name
305
306 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
307 "Not owning BGL"
308
309
310 new_node.powered = True
311
312
313
314
315
316 if self.op.readd:
317 new_node.drained = new_node.offline = False
318 self.LogInfo("Readding a node, the offline/drained flags were reset")
319
320 new_node.master_candidate = self.master_candidate
321 if self.changed_primary_ip:
322 new_node.primary_ip = self.op.primary_ip
323
324
325 for attr in self._NFLAGS:
326 setattr(new_node, attr, getattr(self.op, attr))
327
328
329 if new_node.master_candidate:
330 self.LogInfo("Node will be a master candidate")
331
332 if self.op.ndparams:
333 new_node.ndparams = self.op.ndparams
334 else:
335 new_node.ndparams = {}
336
337 if self.op.hv_state:
338 new_node.hv_state_static = self.new_hv_state
339
340 if self.op.disk_state:
341 new_node.disk_state_static = self.new_disk_state
342
343
344 if self.cfg.GetClusterInfo().modify_etc_hosts:
345 master_node = self.cfg.GetMasterNode()
346 result = self.rpc.call_etc_hosts_modify(master_node,
347 constants.ETC_HOSTS_ADD,
348 self.hostname.name,
349 self.hostname.ip)
350 result.Raise("Can't update hosts file with new host data")
351
352 if new_node.secondary_ip != new_node.primary_ip:
353 _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
354 False)
355
356 node_verify_list = [self.cfg.GetMasterNode()]
357 node_verify_param = {
358 constants.NV_NODELIST: ([node], {}),
359
360 }
361
362 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
363 self.cfg.GetClusterName())
364 for verifier in node_verify_list:
365 result[verifier].Raise("Cannot communicate with node %s" % verifier)
366 nl_payload = result[verifier].payload[constants.NV_NODELIST]
367 if nl_payload:
368 for failed in nl_payload:
369 feedback_fn("ssh/hostname verification failed"
370 " (checking from %s): %s" %
371 (verifier, nl_payload[failed]))
372 raise errors.OpExecError("ssh/hostname verification failed")
373
374 if self.op.readd:
375 RedistributeAncillaryFiles(self)
376 self.context.ReaddNode(new_node)
377
378 self.cfg.Update(new_node, feedback_fn)
379
380 if not new_node.master_candidate:
381 result = self.rpc.call_node_demote_from_mc(new_node.name)
382 msg = result.fail_msg
383 if msg:
384 self.LogWarning("Node failed to demote itself from master"
385 " candidate status: %s" % msg)
386 else:
387 RedistributeAncillaryFiles(self, additional_nodes=[node],
388 additional_vm=self.op.vm_capable)
389 self.context.AddNode(new_node, self.proc.GetECId())
390
391
393 """Modifies the parameters of a node.
394
395 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
396 to the node role (as _ROLE_*)
397 @cvar _R2F: a dictionary from node role to tuples of flags
398 @cvar _FLAGS: a list of attribute names corresponding to the flags
399
400 """
401 HPATH = "node-modify"
402 HTYPE = constants.HTYPE_NODE
403 REQ_BGL = False
404 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
405 _F2R = {
406 (True, False, False): _ROLE_CANDIDATE,
407 (False, True, False): _ROLE_DRAINED,
408 (False, False, True): _ROLE_OFFLINE,
409 (False, False, False): _ROLE_REGULAR,
410 }
411 _R2F = dict((v, k) for k, v in _F2R.items())
412 _FLAGS = ["master_candidate", "drained", "offline"]
413
415 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
416 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
417 self.op.master_capable, self.op.vm_capable,
418 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
419 self.op.disk_state]
420 if all_mods.count(None) == len(all_mods):
421 raise errors.OpPrereqError("Please pass at least one modification",
422 errors.ECODE_INVAL)
423 if all_mods.count(True) > 1:
424 raise errors.OpPrereqError("Can't set the node into more than one"
425 " state at the same time",
426 errors.ECODE_INVAL)
427
428
429 self.might_demote = (self.op.master_candidate is False or
430 self.op.offline is True or
431 self.op.drained is True or
432 self.op.master_capable is False)
433
434 if self.op.secondary_ip:
435 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
436 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
437 " address" % self.op.secondary_ip,
438 errors.ECODE_INVAL)
439
440 self.lock_all = self.op.auto_promote and self.might_demote
441 self.lock_instances = self.op.secondary_ip is not None
442
449
478
480 """Build hooks env.
481
482 This runs on the master node.
483
484 """
485 return {
486 "OP_TARGET": self.op.node_name,
487 "MASTER_CANDIDATE": str(self.op.master_candidate),
488 "OFFLINE": str(self.op.offline),
489 "DRAINED": str(self.op.drained),
490 "MASTER_CAPABLE": str(self.op.master_capable),
491 "VM_CAPABLE": str(self.op.vm_capable),
492 }
493
495 """Build hooks nodes.
496
497 """
498 nl = [self.cfg.GetMasterNode(), self.op.node_name]
499 return (nl, nl)
500
502 """Check prerequisites.
503
504 This only checks the instance list against the existing names.
505
506 """
507 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
508
509 if self.lock_instances:
510 affected_instances = \
511 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
512
513
514 owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
515 wanted_instances = frozenset(affected_instances.keys())
516 if wanted_instances - owned_instances:
517 raise errors.OpPrereqError("Instances affected by changing node %s's"
518 " secondary IP address have changed since"
519 " locks were acquired, wanted '%s', have"
520 " '%s'; retry the operation" %
521 (self.op.node_name,
522 utils.CommaJoin(wanted_instances),
523 utils.CommaJoin(owned_instances)),
524 errors.ECODE_STATE)
525 else:
526 affected_instances = None
527
528 if (self.op.master_candidate is not None or
529 self.op.drained is not None or
530 self.op.offline is not None):
531
532 if self.op.node_name == self.cfg.GetMasterNode():
533 raise errors.OpPrereqError("The master role can be changed"
534 " only via master-failover",
535 errors.ECODE_INVAL)
536
537 if self.op.master_candidate and not node.master_capable:
538 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
539 " it a master candidate" % node.name,
540 errors.ECODE_STATE)
541
542 if self.op.vm_capable is False:
543 (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
544 if ipri or isec:
545 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
546 " the vm_capable flag" % node.name,
547 errors.ECODE_STATE)
548
549 if node.master_candidate and self.might_demote and not self.lock_all:
550 assert not self.op.auto_promote, "auto_promote set but lock_all not"
551
552
553 (mc_remaining, mc_should, _) = \
554 self.cfg.GetMasterCandidateStats(exceptions=[node.name])
555 if mc_remaining < mc_should:
556 raise errors.OpPrereqError("Not enough master candidates, please"
557 " pass auto promote option to allow"
558 " promotion (--auto-promote or RAPI"
559 " auto_promote=True)", errors.ECODE_STATE)
560
561 self.old_flags = old_flags = (node.master_candidate,
562 node.drained, node.offline)
563 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
564 self.old_role = old_role = self._F2R[old_flags]
565
566
567 for attr in self._FLAGS:
568 if (getattr(self.op, attr) is False and getattr(node, attr) is False):
569 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
570 setattr(self.op, attr, None)
571
572
573
574
575
576 if SupportsOob(self.cfg, node):
577 if self.op.offline is False and not (node.powered or
578 self.op.powered is True):
579 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
580 " offline status can be reset") %
581 self.op.node_name, errors.ECODE_STATE)
582 elif self.op.powered is not None:
583 raise errors.OpPrereqError(("Unable to change powered state for node %s"
584 " as it does not support out-of-band"
585 " handling") % self.op.node_name,
586 errors.ECODE_STATE)
587
588
589 if (self.op.drained is False or self.op.offline is False or
590 (self.op.master_capable and not node.master_capable)):
591 if _DecideSelfPromotion(self):
592 self.op.master_candidate = True
593 self.LogInfo("Auto-promoting node to master candidate")
594
595
596 if self.op.master_capable is False and node.master_candidate:
597 self.LogInfo("Demoting from master candidate")
598 self.op.master_candidate = False
599
600
601 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
602 if self.op.master_candidate:
603 new_role = self._ROLE_CANDIDATE
604 elif self.op.drained:
605 new_role = self._ROLE_DRAINED
606 elif self.op.offline:
607 new_role = self._ROLE_OFFLINE
608 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
609
610
611 new_role = self._ROLE_REGULAR
612 else:
613 new_role = old_role
614
615 self.new_role = new_role
616
617 if old_role == self._ROLE_OFFLINE and new_role != old_role:
618
619 result = self.rpc.call_version([node.name])[node.name]
620 if result.fail_msg:
621 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
622 " to report its version: %s" %
623 (node.name, result.fail_msg),
624 errors.ECODE_STATE)
625 else:
626 self.LogWarning("Transitioning node from offline to online state"
627 " without using re-add. Please make sure the node"
628 " is healthy!")
629
630
631
632
633 if self.op.secondary_ip:
634
635 master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
636 master_singlehomed = master.secondary_ip == master.primary_ip
637 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
638 if self.op.force and node.name == master.name:
639 self.LogWarning("Transitioning from single-homed to multi-homed"
640 " cluster; all nodes will require a secondary IP"
641 " address")
642 else:
643 raise errors.OpPrereqError("Changing the secondary ip on a"
644 " single-homed cluster requires the"
645 " --force option to be passed, and the"
646 " target node to be the master",
647 errors.ECODE_INVAL)
648 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
649 if self.op.force and node.name == master.name:
650 self.LogWarning("Transitioning from multi-homed to single-homed"
651 " cluster; secondary IP addresses will have to be"
652 " removed")
653 else:
654 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
655 " same as the primary IP on a multi-homed"
656 " cluster, unless the --force option is"
657 " passed, and the target node is the"
658 " master", errors.ECODE_INVAL)
659
660 assert not (frozenset(affected_instances) -
661 self.owned_locks(locking.LEVEL_INSTANCE))
662
663 if node.offline:
664 if affected_instances:
665 msg = ("Cannot change secondary IP address: offline node has"
666 " instances (%s) configured to use it" %
667 utils.CommaJoin(affected_instances.keys()))
668 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
669 else:
670
671
672 for instance in affected_instances.values():
673 CheckInstanceState(self, instance, INSTANCE_DOWN,
674 msg="cannot change secondary ip")
675
676 _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
677 if master.name != node.name:
678
679 if not netutils.TcpPing(self.op.secondary_ip,
680 constants.DEFAULT_NODED_PORT,
681 source=master.secondary_ip):
682 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
683 " based ping to node daemon port",
684 errors.ECODE_ENVIRON)
685
686 if self.op.ndparams:
687 new_ndparams = GetUpdatedParams(self.node.ndparams, self.op.ndparams)
688 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
689 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
690 "node", "cluster or group")
691 self.new_ndparams = new_ndparams
692
693 if self.op.hv_state:
694 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
695 self.node.hv_state_static)
696
697 if self.op.disk_state:
698 self.new_disk_state = \
699 MergeAndVerifyDiskState(self.op.disk_state,
700 self.node.disk_state_static)
701
702 - def Exec(self, feedback_fn):
703 """Modifies a node.
704
705 """
706 node = self.node
707 old_role = self.old_role
708 new_role = self.new_role
709
710 result = []
711
712 if self.op.ndparams:
713 node.ndparams = self.new_ndparams
714
715 if self.op.powered is not None:
716 node.powered = self.op.powered
717
718 if self.op.hv_state:
719 node.hv_state_static = self.new_hv_state
720
721 if self.op.disk_state:
722 node.disk_state_static = self.new_disk_state
723
724 for attr in ["master_capable", "vm_capable"]:
725 val = getattr(self.op, attr)
726 if val is not None:
727 setattr(node, attr, val)
728 result.append((attr, str(val)))
729
730 if new_role != old_role:
731
732 if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
733 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
734 if msg:
735 self.LogWarning("Node failed to demote itself: %s", msg)
736
737 new_flags = self._R2F[new_role]
738 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
739 if of != nf:
740 result.append((desc, str(nf)))
741 (node.master_candidate, node.drained, node.offline) = new_flags
742
743
744 if self.lock_all:
745 AdjustCandidatePool(self, [node.name])
746
747 if self.op.secondary_ip:
748 node.secondary_ip = self.op.secondary_ip
749 result.append(("secondary_ip", self.op.secondary_ip))
750
751
752 self.cfg.Update(node, feedback_fn)
753
754
755
756 if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
757 self.context.ReaddNode(node)
758
759 return result
760
761
763 """Powercycles a node.
764
765 """
766 REQ_BGL = False
767
774
776 """Locking for PowercycleNode.
777
778 This is a last-resort option and shouldn't block on other
779 jobs. Therefore, we grab no locks.
780
781 """
782 self.needed_locks = {}
783
784 - def Exec(self, feedback_fn):
792
793
796
797
799 """Returns primary instances on a node.
800
801 """
802 return _GetNodeInstancesInner(cfg,
803 lambda inst: node_name == inst.primary_node)
804
805
812
813
815 """Returns a list of all primary and secondary instances on a node.
816
817 """
818
819 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
820
821
823 """Evacuates instances off a list of nodes.
824
825 """
826 REQ_BGL = False
827
828 _MODE2IALLOCATOR = {
829 constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
830 constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
831 constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
832 }
833 assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
834 assert (frozenset(_MODE2IALLOCATOR.values()) ==
835 constants.IALLOCATOR_NEVAC_MODES)
836
839
867
869 """Gets the list of nodes to operate on.
870
871 """
872 if self.op.remote_node is None:
873
874 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
875 else:
876 group_nodes = frozenset([self.op.remote_node])
877
878
879 return set([self.op.node_name]) | group_nodes
880
909
925
927
928 owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
929 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
930 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
931
932 need_nodes = self._DetermineNodes()
933
934 if not owned_nodes.issuperset(need_nodes):
935 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
936 " locks were acquired, current nodes are"
937 " are '%s', used to be '%s'; retry the"
938 " operation" %
939 (self.op.node_name,
940 utils.CommaJoin(need_nodes),
941 utils.CommaJoin(owned_nodes)),
942 errors.ECODE_STATE)
943
944 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
945 if owned_groups != wanted_groups:
946 raise errors.OpExecError("Node groups changed since locks were acquired,"
947 " current groups are '%s', used to be '%s';"
948 " retry the operation" %
949 (utils.CommaJoin(wanted_groups),
950 utils.CommaJoin(owned_groups)))
951
952
953 self.instances = self._DetermineInstances()
954 self.instance_names = [i.name for i in self.instances]
955
956 if set(self.instance_names) != owned_instances:
957 raise errors.OpExecError("Instances on node '%s' changed since locks"
958 " were acquired, current instances are '%s',"
959 " used to be '%s'; retry the operation" %
960 (self.op.node_name,
961 utils.CommaJoin(self.instance_names),
962 utils.CommaJoin(owned_instances)))
963
964 if self.instance_names:
965 self.LogInfo("Evacuating instances from node '%s': %s",
966 self.op.node_name,
967 utils.CommaJoin(utils.NiceSort(self.instance_names)))
968 else:
969 self.LogInfo("No instances to evacuate from node '%s'",
970 self.op.node_name)
971
972 if self.op.remote_node is not None:
973 for i in self.instances:
974 if i.primary_node == self.op.remote_node:
975 raise errors.OpPrereqError("Node %s is the primary node of"
976 " instance %s, cannot use it as"
977 " secondary" %
978 (self.op.remote_node, i.name),
979 errors.ECODE_INVAL)
980
981 - def Exec(self, feedback_fn):
982 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
983
984 if not self.instance_names:
985
986 jobs = []
987
988 elif self.op.iallocator is not None:
989
990 evac_mode = self._MODE2IALLOCATOR[self.op.mode]
991 req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
992 instances=list(self.instance_names))
993 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
994
995 ial.Run(self.op.iallocator)
996
997 if not ial.success:
998 raise errors.OpPrereqError("Can't compute node evacuation using"
999 " iallocator '%s': %s" %
1000 (self.op.iallocator, ial.info),
1001 errors.ECODE_NORES)
1002
1003 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1004
1005 elif self.op.remote_node is not None:
1006 assert self.op.mode == constants.NODE_EVAC_SEC
1007 jobs = [
1008 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1009 remote_node=self.op.remote_node,
1010 disks=[],
1011 mode=constants.REPLACE_DISK_CHG,
1012 early_release=self.op.early_release)]
1013 for instance_name in self.instance_names]
1014
1015 else:
1016 raise errors.ProgrammerError("No iallocator or remote node")
1017
1018 return ResultWithJobs(jobs)
1019
1020
1022 """Migrate all instances from a node.
1023
1024 """
1025 HPATH = "node-migrate"
1026 HTYPE = constants.HTYPE_NODE
1027 REQ_BGL = False
1028
1031
1033 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1034
1035 self.share_locks = ShareAll()
1036 self.needed_locks = {
1037 locking.LEVEL_NODE: [self.op.node_name],
1038 }
1039
1041 """Build hooks env.
1042
1043 This runs on the master, the primary and all the secondaries.
1044
1045 """
1046 return {
1047 "NODE_NAME": self.op.node_name,
1048 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1049 }
1050
1052 """Build hooks nodes.
1053
1054 """
1055 nl = [self.cfg.GetMasterNode()]
1056 return (nl, nl)
1057
1060
1061 - def Exec(self, feedback_fn):
1062
1063 allow_runtime_changes = self.op.allow_runtime_changes
1064 jobs = [
1065 [opcodes.OpInstanceMigrate(instance_name=inst.name,
1066 mode=self.op.mode,
1067 live=self.op.live,
1068 iallocator=self.op.iallocator,
1069 target_node=self.op.target_node,
1070 allow_runtime_changes=allow_runtime_changes,
1071 ignore_ipolicy=self.op.ignore_ipolicy)]
1072 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)]
1073
1074
1075
1076
1077
1078
1079 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1080 frozenset([self.op.node_name]))
1081
1082 return ResultWithJobs(jobs)
1083
1084
1095
1096
1098 """Logical unit for modifying a storage volume on a node.
1099
1100 """
1101 REQ_BGL = False
1102
1121
1123 self.needed_locks = {
1124 locking.LEVEL_NODE: self.op.node_name,
1125 }
1126
1127 - def Exec(self, feedback_fn):
1128 """Computes the list of nodes and their attributes.
1129
1130 """
1131 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1132 result = self.rpc.call_storage_modify(self.op.node_name,
1133 self.op.storage_type, st_args,
1134 self.op.name, self.op.changes)
1135 result.Raise("Failed to modify storage unit '%s' on %s" %
1136 (self.op.name, self.op.node_name))
1137
1138
1140 FIELDS = query.NODE_FIELDS
1141
1143 lu.needed_locks = {}
1144 lu.share_locks = ShareAll()
1145
1146 if self.names:
1147 self.wanted = GetWantedNodes(lu, self.names)
1148 else:
1149 self.wanted = locking.ALL_SET
1150
1151 self.do_locking = (self.use_locking and
1152 query.NQ_LIVE in self.requested_data)
1153
1154 if self.do_locking:
1155
1156 lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1157 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1158
1161
1163 """Computes the list of nodes and their attributes.
1164
1165 """
1166 all_info = lu.cfg.GetAllNodesInfo()
1167
1168 nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1169
1170
1171 if query.NQ_LIVE in self.requested_data:
1172
1173 toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
1174
1175 es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
1176 node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
1177 [lu.cfg.GetHypervisorType()], es_flags)
1178 live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
1179 for (name, nresult) in node_data.items()
1180 if not nresult.fail_msg and nresult.payload)
1181 else:
1182 live_data = None
1183
1184 if query.NQ_INST in self.requested_data:
1185 node_to_primary = dict([(name, set()) for name in nodenames])
1186 node_to_secondary = dict([(name, set()) for name in nodenames])
1187
1188 inst_data = lu.cfg.GetAllInstancesInfo()
1189
1190 for inst in inst_data.values():
1191 if inst.primary_node in node_to_primary:
1192 node_to_primary[inst.primary_node].add(inst.name)
1193 for secnode in inst.secondary_nodes:
1194 if secnode in node_to_secondary:
1195 node_to_secondary[secnode].add(inst.name)
1196 else:
1197 node_to_primary = None
1198 node_to_secondary = None
1199
1200 if query.NQ_OOB in self.requested_data:
1201 oob_support = dict((name, bool(SupportsOob(lu.cfg, node)))
1202 for name, node in all_info.iteritems())
1203 else:
1204 oob_support = None
1205
1206 if query.NQ_GROUP in self.requested_data:
1207 groups = lu.cfg.GetAllNodeGroupsInfo()
1208 else:
1209 groups = {}
1210
1211 return query.NodeQueryData([all_info[name] for name in nodenames],
1212 live_data, lu.cfg.GetMasterNode(),
1213 node_to_primary, node_to_secondary, groups,
1214 oob_support, lu.cfg.GetClusterInfo())
1215
1216
1218 """Logical unit for querying nodes.
1219
1220 """
1221
1222 REQ_BGL = False
1223
1227
1230
1233
1234 - def Exec(self, feedback_fn):
1236
1237
1239 """Checks whether all selected fields are valid.
1240
1241 @type static: L{utils.FieldSet}
1242 @param static: static fields set
1243 @type dynamic: L{utils.FieldSet}
1244 @param dynamic: dynamic fields set
1245
1246 """
1247 f = utils.FieldSet()
1248 f.Extend(static)
1249 f.Extend(dynamic)
1250
1251 delta = f.NonMatching(selected)
1252 if delta:
1253 raise errors.OpPrereqError("Unknown output fields selected: %s"
1254 % ",".join(delta), errors.ECODE_INVAL)
1255
1256
1258 """Logical unit for getting volumes on node(s).
1259
1260 """
1261 REQ_BGL = False
1262 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1263 _FIELDS_STATIC = utils.FieldSet("node")
1264
1269
1282
1283 - def Exec(self, feedback_fn):
1284 """Computes the list of nodes and their attributes.
1285
1286 """
1287 nodenames = self.owned_locks(locking.LEVEL_NODE)
1288 volumes = self.rpc.call_node_volumes(nodenames)
1289
1290 ilist = self.cfg.GetAllInstancesInfo()
1291 vol2inst = MapInstanceDisksToNodes(ilist.values())
1292
1293 output = []
1294 for node in nodenames:
1295 nresult = volumes[node]
1296 if nresult.offline:
1297 continue
1298 msg = nresult.fail_msg
1299 if msg:
1300 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
1301 continue
1302
1303 node_vols = sorted(nresult.payload,
1304 key=operator.itemgetter("dev"))
1305
1306 for vol in node_vols:
1307 node_output = []
1308 for field in self.op.output_fields:
1309 if field == "node":
1310 val = node
1311 elif field == "phys":
1312 val = vol["dev"]
1313 elif field == "vg":
1314 val = vol["vg"]
1315 elif field == "name":
1316 val = vol["name"]
1317 elif field == "size":
1318 val = int(float(vol["size"]))
1319 elif field == "instance":
1320 val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
1321 else:
1322 raise errors.ParameterError(field)
1323 node_output.append(str(val))
1324
1325 output.append(node_output)
1326
1327 return output
1328
1329
1331 """Logical unit for getting information on storage units on node(s).
1332
1333 """
1334 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1335 REQ_BGL = False
1336
1341
1354
1355 - def Exec(self, feedback_fn):
1356 """Computes the list of nodes and their attributes.
1357
1358 """
1359 self.nodes = self.owned_locks(locking.LEVEL_NODE)
1360
1361
1362 if constants.SF_NAME in self.op.output_fields:
1363 fields = self.op.output_fields[:]
1364 else:
1365 fields = [constants.SF_NAME] + self.op.output_fields
1366
1367
1368 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1369 while extra in fields:
1370 fields.remove(extra)
1371
1372 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1373 name_idx = field_idx[constants.SF_NAME]
1374
1375 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1376 data = self.rpc.call_storage_list(self.nodes,
1377 self.op.storage_type, st_args,
1378 self.op.name, fields)
1379
1380 result = []
1381
1382 for node in utils.NiceSort(self.nodes):
1383 nresult = data[node]
1384 if nresult.offline:
1385 continue
1386
1387 msg = nresult.fail_msg
1388 if msg:
1389 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
1390 continue
1391
1392 rows = dict([(row[name_idx], row) for row in nresult.payload])
1393
1394 for name in utils.NiceSort(rows.keys()):
1395 row = rows[name]
1396
1397 out = []
1398
1399 for field in self.op.output_fields:
1400 if field == constants.SF_NODE:
1401 val = node
1402 elif field == constants.SF_TYPE:
1403 val = self.op.storage_type
1404 elif field in field_idx:
1405 val = row[field_idx[field]]
1406 else:
1407 raise errors.ParameterError(field)
1408
1409 out.append(val)
1410
1411 result.append(out)
1412
1413 return result
1414
1415
1417 """Logical unit for removing a node.
1418
1419 """
1420 HPATH = "node-remove"
1421 HTYPE = constants.HTYPE_NODE
1422
1424 """Build hooks env.
1425
1426 """
1427 return {
1428 "OP_TARGET": self.op.node_name,
1429 "NODE_NAME": self.op.node_name,
1430 }
1431
1433 """Build hooks nodes.
1434
1435 This doesn't run on the target node in the pre phase as a failed
1436 node would then be impossible to remove.
1437
1438 """
1439 all_nodes = self.cfg.GetNodeList()
1440 try:
1441 all_nodes.remove(self.op.node_name)
1442 except ValueError:
1443 pass
1444 return (all_nodes, all_nodes)
1445
1473
1474 - def Exec(self, feedback_fn):
1508
1509
1511 """Repairs the volume group on a node.
1512
1513 """
1514 REQ_BGL = False
1515
1526
1528 self.needed_locks = {
1529 locking.LEVEL_NODE: [self.op.node_name],
1530 }
1531
1545
1547 """Check prerequisites.
1548
1549 """
1550
1551 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
1552 if not inst.disks_active:
1553 continue
1554 check_nodes = set(inst.all_nodes)
1555 check_nodes.discard(self.op.node_name)
1556 for inst_node_name in check_nodes:
1557 self._CheckFaultyDisks(inst, inst_node_name)
1558
1559 - def Exec(self, feedback_fn):
1570