1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Logical units dealing with nodes."""
23
24 import logging
25 import operator
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
38
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40 ResultWithJobs
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43 IsExclusiveStorageEnabledNode, CheckNodePVs, \
44 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48 FindFaultyInstanceDisks, CheckStorageTypeEnabled
49
50
60
61
63 """Ensure that a node has the given secondary ip.
64
65 @type lu: L{LogicalUnit}
66 @param lu: the LU on behalf of which we make the check
67 @type node: L{objects.Node}
68 @param node: the node to check
69 @type secondary_ip: string
70 @param secondary_ip: the ip to check
71 @type prereq: boolean
72 @param prereq: whether to throw a prerequisite or an execute error
73 @raise errors.OpPrereqError: if the node doesn't have the ip,
74 and prereq=True
75 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76
77 """
78
79
80 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81 result.Raise("Failure checking secondary ip on node %s" % node.name,
82 prereq=prereq, ecode=errors.ECODE_ENVIRON)
83 if not result.payload:
84 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85 " please fix and re-run this command" % secondary_ip)
86 if prereq:
87 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88 else:
89 raise errors.OpExecError(msg)
90
91
93 """Logical unit for adding node to the cluster.
94
95 """
96 HPATH = "node-add"
97 HTYPE = constants.HTYPE_NODE
98 _NFLAGS = ["master_capable", "vm_capable"]
99
114
116 """Build hooks env.
117
118 This will run on all nodes before, and on all nodes + the new node after.
119
120 """
121 return {
122 "OP_TARGET": self.op.node_name,
123 "NODE_NAME": self.op.node_name,
124 "NODE_PIP": self.op.primary_ip,
125 "NODE_SIP": self.op.secondary_ip,
126 "MASTER_CAPABLE": str(self.op.master_capable),
127 "VM_CAPABLE": str(self.op.vm_capable),
128 }
129
131 """Build hooks nodes.
132
133 """
134 hook_nodes = self.cfg.GetNodeList()
135 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136 if new_node_info is not None:
137
138 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139
140
141 return (hook_nodes, hook_nodes, [self.op.node_name, ])
142
144 """Check prerequisites.
145
146 This checks:
147 - the new node is not already in the config
148 - it is resolvable
149 - its parameters (single/dual homed) matches the cluster
150
151 Any errors are signaled by raising errors.OpPrereqError.
152
153 """
154 node_name = self.hostname.name
155 self.op.primary_ip = self.hostname.ip
156 if self.op.secondary_ip is None:
157 if self.primary_ip_family == netutils.IP6Address.family:
158 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159 " IPv4 address must be given as secondary",
160 errors.ECODE_INVAL)
161 self.op.secondary_ip = self.op.primary_ip
162
163 secondary_ip = self.op.secondary_ip
164 if not netutils.IP4Address.IsValid(secondary_ip):
165 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166 " address" % secondary_ip, errors.ECODE_INVAL)
167
168 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169 if not self.op.readd and existing_node_info is not None:
170 raise errors.OpPrereqError("Node %s is already in the configuration" %
171 node_name, errors.ECODE_EXISTS)
172 elif self.op.readd and existing_node_info is None:
173 raise errors.OpPrereqError("Node %s is not in the configuration" %
174 node_name, errors.ECODE_NOENT)
175
176 self.changed_primary_ip = False
177
178 for existing_node in self.cfg.GetAllNodesInfo().values():
179 if self.op.readd and node_name == existing_node.name:
180 if existing_node.secondary_ip != secondary_ip:
181 raise errors.OpPrereqError("Readded node doesn't have the same IP"
182 " address configuration as before",
183 errors.ECODE_INVAL)
184 if existing_node.primary_ip != self.op.primary_ip:
185 self.changed_primary_ip = True
186
187 continue
188
189 if (existing_node.primary_ip == self.op.primary_ip or
190 existing_node.secondary_ip == self.op.primary_ip or
191 existing_node.primary_ip == secondary_ip or
192 existing_node.secondary_ip == secondary_ip):
193 raise errors.OpPrereqError("New node ip address(es) conflict with"
194 " existing node %s" % existing_node.name,
195 errors.ECODE_NOTUNIQUE)
196
197
198
199 if self.op.readd:
200 assert existing_node_info is not None, \
201 "Can't retrieve locked node %s" % node_name
202 for attr in self._NFLAGS:
203 if getattr(self.op, attr) is None:
204 setattr(self.op, attr, getattr(existing_node_info, attr))
205 else:
206 for attr in self._NFLAGS:
207 if getattr(self.op, attr) is None:
208 setattr(self.op, attr, True)
209
210 if self.op.readd and not self.op.vm_capable:
211 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
212 if pri or sec:
213 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214 " flag set to false, but it already holds"
215 " instances" % node_name,
216 errors.ECODE_STATE)
217
218
219
220 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
221 master_singlehomed = myself.secondary_ip == myself.primary_ip
222 newbie_singlehomed = secondary_ip == self.op.primary_ip
223 if master_singlehomed != newbie_singlehomed:
224 if master_singlehomed:
225 raise errors.OpPrereqError("The master has no secondary ip but the"
226 " new node has one",
227 errors.ECODE_INVAL)
228 else:
229 raise errors.OpPrereqError("The master has a secondary ip but the"
230 " new node doesn't have one",
231 errors.ECODE_INVAL)
232
233
234 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235 raise errors.OpPrereqError("Node not reachable by ping",
236 errors.ECODE_ENVIRON)
237
238 if not newbie_singlehomed:
239
240 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241 source=myself.secondary_ip):
242 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243 " based ping to node daemon port",
244 errors.ECODE_ENVIRON)
245
246 if self.op.readd:
247 exceptions = [existing_node_info.uuid]
248 else:
249 exceptions = []
250
251 if self.op.master_capable:
252 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
253 else:
254 self.master_candidate = False
255
256 if self.op.readd:
257 self.new_node = existing_node_info
258 else:
259 node_group = self.cfg.LookupNodeGroup(self.op.group)
260 self.new_node = objects.Node(name=node_name,
261 primary_ip=self.op.primary_ip,
262 secondary_ip=secondary_ip,
263 master_candidate=self.master_candidate,
264 offline=False, drained=False,
265 group=node_group, ndparams={})
266
267 if self.op.ndparams:
268 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270 "node", "cluster or group")
271
272 if self.op.hv_state:
273 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
274
275 if self.op.disk_state:
276 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
277
278
279
280 rpcrunner = rpc.DnsOnlyRunner()
281 result = rpcrunner.call_version([node_name])[node_name]
282 result.Raise("Can't get version information from node %s" % node_name)
283 if constants.PROTOCOL_VERSION == result.payload:
284 logging.info("Communication to node %s fine, sw version %s match",
285 node_name, result.payload)
286 else:
287 raise errors.OpPrereqError("Version mismatch master version %s,"
288 " node version %s" %
289 (constants.PROTOCOL_VERSION, result.payload),
290 errors.ECODE_ENVIRON)
291
292 vg_name = self.cfg.GetVGName()
293 if vg_name is not None:
294 vparams = {constants.NV_PVLIST: [vg_name]}
295 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296 cname = self.cfg.GetClusterName()
297 result = rpcrunner.call_node_verify_light(
298 [node_name], vparams, cname,
299 self.cfg.GetClusterInfo().hvparams)[node_name]
300 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
301 if errmsgs:
302 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303 "; ".join(errmsgs), errors.ECODE_ENVIRON)
304
305 - def Exec(self, feedback_fn):
306 """Adds the new node to the cluster.
307
308 """
309 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
310 "Not owning BGL"
311
312
313 self.new_node.powered = True
314
315
316
317
318
319 if self.op.readd:
320 self.new_node.offline = False
321 self.new_node.drained = False
322 self.LogInfo("Readding a node, the offline/drained flags were reset")
323
324 self.new_node.master_candidate = self.master_candidate
325 if self.changed_primary_ip:
326 self.new_node.primary_ip = self.op.primary_ip
327
328
329 for attr in self._NFLAGS:
330 setattr(self.new_node, attr, getattr(self.op, attr))
331
332
333 if self.new_node.master_candidate:
334 self.LogInfo("Node will be a master candidate")
335
336 if self.op.ndparams:
337 self.new_node.ndparams = self.op.ndparams
338 else:
339 self.new_node.ndparams = {}
340
341 if self.op.hv_state:
342 self.new_node.hv_state_static = self.new_hv_state
343
344 if self.op.disk_state:
345 self.new_node.disk_state_static = self.new_disk_state
346
347
348 if self.cfg.GetClusterInfo().modify_etc_hosts:
349 master_node = self.cfg.GetMasterNode()
350 result = self.rpc.call_etc_hosts_modify(
351 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
352 self.hostname.ip)
353 result.Raise("Can't update hosts file with new host data")
354
355 if self.new_node.secondary_ip != self.new_node.primary_ip:
356 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
357 False)
358
359 node_verifier_uuids = [self.cfg.GetMasterNode()]
360 node_verify_param = {
361 constants.NV_NODELIST: ([self.new_node.name], {}),
362
363 }
364
365 result = self.rpc.call_node_verify(
366 node_verifier_uuids, node_verify_param,
367 self.cfg.GetClusterName(),
368 self.cfg.GetClusterInfo().hvparams)
369 for verifier in node_verifier_uuids:
370 result[verifier].Raise("Cannot communicate with node %s" % verifier)
371 nl_payload = result[verifier].payload[constants.NV_NODELIST]
372 if nl_payload:
373 for failed in nl_payload:
374 feedback_fn("ssh/hostname verification failed"
375 " (checking from %s): %s" %
376 (verifier, nl_payload[failed]))
377 raise errors.OpExecError("ssh/hostname verification failed")
378
379 if self.op.readd:
380 self.context.ReaddNode(self.new_node)
381 RedistributeAncillaryFiles(self)
382
383 self.cfg.Update(self.new_node, feedback_fn)
384
385 if not self.new_node.master_candidate:
386 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
387 result.Warn("Node failed to demote itself from master candidate status",
388 self.LogWarning)
389 else:
390 self.context.AddNode(self.new_node, self.proc.GetECId())
391 RedistributeAncillaryFiles(self)
392
393
395 """Modifies the parameters of a node.
396
397 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
398 to the node role (as _ROLE_*)
399 @cvar _R2F: a dictionary from node role to tuples of flags
400 @cvar _FLAGS: a list of attribute names corresponding to the flags
401
402 """
403 HPATH = "node-modify"
404 HTYPE = constants.HTYPE_NODE
405 REQ_BGL = False
406 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
407 _F2R = {
408 (True, False, False): _ROLE_CANDIDATE,
409 (False, True, False): _ROLE_DRAINED,
410 (False, False, True): _ROLE_OFFLINE,
411 (False, False, False): _ROLE_REGULAR,
412 }
413 _R2F = dict((v, k) for k, v in _F2R.items())
414 _FLAGS = ["master_candidate", "drained", "offline"]
415
417 (self.op.node_uuid, self.op.node_name) = \
418 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
419 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
420 self.op.master_capable, self.op.vm_capable,
421 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
422 self.op.disk_state]
423 if all_mods.count(None) == len(all_mods):
424 raise errors.OpPrereqError("Please pass at least one modification",
425 errors.ECODE_INVAL)
426 if all_mods.count(True) > 1:
427 raise errors.OpPrereqError("Can't set the node into more than one"
428 " state at the same time",
429 errors.ECODE_INVAL)
430
431
432 self.might_demote = (self.op.master_candidate is False or
433 self.op.offline is True or
434 self.op.drained is True or
435 self.op.master_capable is False)
436
437 if self.op.secondary_ip:
438 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
439 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
440 " address" % self.op.secondary_ip,
441 errors.ECODE_INVAL)
442
443 self.lock_all = self.op.auto_promote and self.might_demote
444 self.lock_instances = self.op.secondary_ip is not None
445
452
482
484 """Build hooks env.
485
486 This runs on the master node.
487
488 """
489 return {
490 "OP_TARGET": self.op.node_name,
491 "MASTER_CANDIDATE": str(self.op.master_candidate),
492 "OFFLINE": str(self.op.offline),
493 "DRAINED": str(self.op.drained),
494 "MASTER_CAPABLE": str(self.op.master_capable),
495 "VM_CAPABLE": str(self.op.vm_capable),
496 }
497
499 """Build hooks nodes.
500
501 """
502 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
503 return (nl, nl)
504
506 """Check prerequisites.
507
508 This only checks the instance list against the existing names.
509
510 """
511 node = self.cfg.GetNodeInfo(self.op.node_uuid)
512 if self.lock_instances:
513 affected_instances = \
514 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
515
516
517 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
518 wanted_instance_names = frozenset([inst.name for inst in
519 affected_instances.values()])
520 if wanted_instance_names - owned_instance_names:
521 raise errors.OpPrereqError("Instances affected by changing node %s's"
522 " secondary IP address have changed since"
523 " locks were acquired, wanted '%s', have"
524 " '%s'; retry the operation" %
525 (node.name,
526 utils.CommaJoin(wanted_instance_names),
527 utils.CommaJoin(owned_instance_names)),
528 errors.ECODE_STATE)
529 else:
530 affected_instances = None
531
532 if (self.op.master_candidate is not None or
533 self.op.drained is not None or
534 self.op.offline is not None):
535
536 if node.uuid == self.cfg.GetMasterNode():
537 raise errors.OpPrereqError("The master role can be changed"
538 " only via master-failover",
539 errors.ECODE_INVAL)
540
541 if self.op.master_candidate and not node.master_capable:
542 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
543 " it a master candidate" % node.name,
544 errors.ECODE_STATE)
545
546 if self.op.vm_capable is False:
547 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
548 if ipri or isec:
549 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
550 " the vm_capable flag" % node.name,
551 errors.ECODE_STATE)
552
553 if node.master_candidate and self.might_demote and not self.lock_all:
554 assert not self.op.auto_promote, "auto_promote set but lock_all not"
555
556
557 (mc_remaining, mc_should, _) = \
558 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
559 if mc_remaining < mc_should:
560 raise errors.OpPrereqError("Not enough master candidates, please"
561 " pass auto promote option to allow"
562 " promotion (--auto-promote or RAPI"
563 " auto_promote=True)", errors.ECODE_STATE)
564
565 self.old_flags = old_flags = (node.master_candidate,
566 node.drained, node.offline)
567 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
568 self.old_role = old_role = self._F2R[old_flags]
569
570
571 for attr in self._FLAGS:
572 if getattr(self.op, attr) is False and getattr(node, attr) is False:
573 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
574 setattr(self.op, attr, None)
575
576
577
578
579
580 if SupportsOob(self.cfg, node):
581 if self.op.offline is False and not (node.powered or
582 self.op.powered is True):
583 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
584 " offline status can be reset") %
585 self.op.node_name, errors.ECODE_STATE)
586 elif self.op.powered is not None:
587 raise errors.OpPrereqError(("Unable to change powered state for node %s"
588 " as it does not support out-of-band"
589 " handling") % self.op.node_name,
590 errors.ECODE_STATE)
591
592
593 if (self.op.drained is False or self.op.offline is False or
594 (self.op.master_capable and not node.master_capable)):
595 if _DecideSelfPromotion(self):
596 self.op.master_candidate = True
597 self.LogInfo("Auto-promoting node to master candidate")
598
599
600 if self.op.master_capable is False and node.master_candidate:
601 self.LogInfo("Demoting from master candidate")
602 self.op.master_candidate = False
603
604
605 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
606 if self.op.master_candidate:
607 new_role = self._ROLE_CANDIDATE
608 elif self.op.drained:
609 new_role = self._ROLE_DRAINED
610 elif self.op.offline:
611 new_role = self._ROLE_OFFLINE
612 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
613
614
615 new_role = self._ROLE_REGULAR
616 else:
617 new_role = old_role
618
619 self.new_role = new_role
620
621 if old_role == self._ROLE_OFFLINE and new_role != old_role:
622
623 result = self.rpc.call_version([node.uuid])[node.uuid]
624 if result.fail_msg:
625 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
626 " to report its version: %s" %
627 (node.name, result.fail_msg),
628 errors.ECODE_STATE)
629 else:
630 self.LogWarning("Transitioning node from offline to online state"
631 " without using re-add. Please make sure the node"
632 " is healthy!")
633
634
635
636
637 if self.op.secondary_ip:
638
639 master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
640 master_singlehomed = master.secondary_ip == master.primary_ip
641 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
642 if self.op.force and node.uuid == master.uuid:
643 self.LogWarning("Transitioning from single-homed to multi-homed"
644 " cluster; all nodes will require a secondary IP"
645 " address")
646 else:
647 raise errors.OpPrereqError("Changing the secondary ip on a"
648 " single-homed cluster requires the"
649 " --force option to be passed, and the"
650 " target node to be the master",
651 errors.ECODE_INVAL)
652 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
653 if self.op.force and node.uuid == master.uuid:
654 self.LogWarning("Transitioning from multi-homed to single-homed"
655 " cluster; secondary IP addresses will have to be"
656 " removed")
657 else:
658 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
659 " same as the primary IP on a multi-homed"
660 " cluster, unless the --force option is"
661 " passed, and the target node is the"
662 " master", errors.ECODE_INVAL)
663
664 assert not (set([inst.name for inst in affected_instances.values()]) -
665 self.owned_locks(locking.LEVEL_INSTANCE))
666
667 if node.offline:
668 if affected_instances:
669 msg = ("Cannot change secondary IP address: offline node has"
670 " instances (%s) configured to use it" %
671 utils.CommaJoin(
672 [inst.name for inst in affected_instances.values()]))
673 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
674 else:
675
676
677 for instance in affected_instances.values():
678 CheckInstanceState(self, instance, INSTANCE_DOWN,
679 msg="cannot change secondary ip")
680
681 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
682 if master.uuid != node.uuid:
683
684 if not netutils.TcpPing(self.op.secondary_ip,
685 constants.DEFAULT_NODED_PORT,
686 source=master.secondary_ip):
687 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
688 " based ping to node daemon port",
689 errors.ECODE_ENVIRON)
690
691 if self.op.ndparams:
692 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
693 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
694 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
695 "node", "cluster or group")
696 self.new_ndparams = new_ndparams
697
698 if self.op.hv_state:
699 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
700 node.hv_state_static)
701
702 if self.op.disk_state:
703 self.new_disk_state = \
704 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
705
706 - def Exec(self, feedback_fn):
707 """Modifies a node.
708
709 """
710 node = self.cfg.GetNodeInfo(self.op.node_uuid)
711 result = []
712
713 if self.op.ndparams:
714 node.ndparams = self.new_ndparams
715
716 if self.op.powered is not None:
717 node.powered = self.op.powered
718
719 if self.op.hv_state:
720 node.hv_state_static = self.new_hv_state
721
722 if self.op.disk_state:
723 node.disk_state_static = self.new_disk_state
724
725 for attr in ["master_capable", "vm_capable"]:
726 val = getattr(self.op, attr)
727 if val is not None:
728 setattr(node, attr, val)
729 result.append((attr, str(val)))
730
731 if self.new_role != self.old_role:
732
733 if self.old_role == self._ROLE_CANDIDATE and \
734 self.new_role != self._ROLE_OFFLINE:
735 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
736 if msg:
737 self.LogWarning("Node failed to demote itself: %s", msg)
738
739 new_flags = self._R2F[self.new_role]
740 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
741 if of != nf:
742 result.append((desc, str(nf)))
743 (node.master_candidate, node.drained, node.offline) = new_flags
744
745
746 if self.lock_all:
747 AdjustCandidatePool(self, [node.uuid])
748
749 if self.op.secondary_ip:
750 node.secondary_ip = self.op.secondary_ip
751 result.append(("secondary_ip", self.op.secondary_ip))
752
753
754 self.cfg.Update(node, feedback_fn)
755
756
757
758 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
759 self.context.ReaddNode(node)
760
761 return result
762
763
765 """Powercycles a node.
766
767 """
768 REQ_BGL = False
769
778
780 """Locking for PowercycleNode.
781
782 This is a last-resort option and shouldn't block on other
783 jobs. Therefore, we grab no locks.
784
785 """
786 self.needed_locks = {}
787
788 - def Exec(self, feedback_fn):
789 """Reboots a node.
790
791 """
792 default_hypervisor = self.cfg.GetHypervisorType()
793 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
794 result = self.rpc.call_node_powercycle(self.op.node_uuid,
795 default_hypervisor,
796 hvparams)
797 result.Raise("Failed to schedule the reboot")
798 return result.payload
799
800
803
804
806 """Returns primary instances on a node.
807
808 """
809 return _GetNodeInstancesInner(cfg,
810 lambda inst: node_uuid == inst.primary_node)
811
812
819
820
822 """Returns a list of all primary and secondary instances on a node.
823
824 """
825
826 return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
827
828
830 """Evacuates instances off a list of nodes.
831
832 """
833 REQ_BGL = False
834
835 _MODE2IALLOCATOR = {
836 constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
837 constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
838 constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
839 }
840 assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
841 assert (frozenset(_MODE2IALLOCATOR.values()) ==
842 constants.IALLOCATOR_NEVAC_MODES)
843
846
848 (self.op.node_uuid, self.op.node_name) = \
849 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
850
851 if self.op.remote_node is not None:
852 (self.op.remote_node_uuid, self.op.remote_node) = \
853 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
854 self.op.remote_node)
855 assert self.op.remote_node
856
857 if self.op.node_uuid == self.op.remote_node_uuid:
858 raise errors.OpPrereqError("Can not use evacuated node as a new"
859 " secondary node", errors.ECODE_INVAL)
860
861 if self.op.mode != constants.NODE_EVAC_SEC:
862 raise errors.OpPrereqError("Without the use of an iallocator only"
863 " secondary instances can be evacuated",
864 errors.ECODE_INVAL)
865
866
867 self.share_locks = ShareAll()
868 self.needed_locks = {
869 locking.LEVEL_INSTANCE: [],
870 locking.LEVEL_NODEGROUP: [],
871 locking.LEVEL_NODE: [],
872 }
873
874
875
876 self.lock_nodes = self._DetermineNodes()
877
879 """Gets the list of node UUIDs to operate on.
880
881 """
882 if self.op.remote_node is None:
883
884 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
885 else:
886 group_nodes = frozenset([self.op.remote_node_uuid])
887
888
889 return set([self.op.node_uuid]) | group_nodes
890
919
935
937
938 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
939 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
940 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
941
942 need_nodes = self._DetermineNodes()
943
944 if not owned_nodes.issuperset(need_nodes):
945 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
946 " locks were acquired, current nodes are"
947 " are '%s', used to be '%s'; retry the"
948 " operation" %
949 (self.op.node_name,
950 utils.CommaJoin(need_nodes),
951 utils.CommaJoin(owned_nodes)),
952 errors.ECODE_STATE)
953
954 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
955 if owned_groups != wanted_groups:
956 raise errors.OpExecError("Node groups changed since locks were acquired,"
957 " current groups are '%s', used to be '%s';"
958 " retry the operation" %
959 (utils.CommaJoin(wanted_groups),
960 utils.CommaJoin(owned_groups)))
961
962
963 self.instances = self._DetermineInstances()
964 self.instance_names = [i.name for i in self.instances]
965
966 if set(self.instance_names) != owned_instance_names:
967 raise errors.OpExecError("Instances on node '%s' changed since locks"
968 " were acquired, current instances are '%s',"
969 " used to be '%s'; retry the operation" %
970 (self.op.node_name,
971 utils.CommaJoin(self.instance_names),
972 utils.CommaJoin(owned_instance_names)))
973
974 if self.instance_names:
975 self.LogInfo("Evacuating instances from node '%s': %s",
976 self.op.node_name,
977 utils.CommaJoin(utils.NiceSort(self.instance_names)))
978 else:
979 self.LogInfo("No instances to evacuate from node '%s'",
980 self.op.node_name)
981
982 if self.op.remote_node is not None:
983 for i in self.instances:
984 if i.primary_node == self.op.remote_node_uuid:
985 raise errors.OpPrereqError("Node %s is the primary node of"
986 " instance %s, cannot use it as"
987 " secondary" %
988 (self.op.remote_node, i.name),
989 errors.ECODE_INVAL)
990
991 - def Exec(self, feedback_fn):
992 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
993
994 if not self.instance_names:
995
996 jobs = []
997
998 elif self.op.iallocator is not None:
999
1000 evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1001 req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1002 instances=list(self.instance_names))
1003 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1004
1005 ial.Run(self.op.iallocator)
1006
1007 if not ial.success:
1008 raise errors.OpPrereqError("Can't compute node evacuation using"
1009 " iallocator '%s': %s" %
1010 (self.op.iallocator, ial.info),
1011 errors.ECODE_NORES)
1012
1013 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1014
1015 elif self.op.remote_node is not None:
1016 assert self.op.mode == constants.NODE_EVAC_SEC
1017 jobs = [
1018 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1019 remote_node=self.op.remote_node,
1020 disks=[],
1021 mode=constants.REPLACE_DISK_CHG,
1022 early_release=self.op.early_release)]
1023 for instance_name in self.instance_names]
1024
1025 else:
1026 raise errors.ProgrammerError("No iallocator or remote node")
1027
1028 return ResultWithJobs(jobs)
1029
1030
1032 """Migrate all instances from a node.
1033
1034 """
1035 HPATH = "node-migrate"
1036 HTYPE = constants.HTYPE_NODE
1037 REQ_BGL = False
1038
1041
1043 (self.op.node_uuid, self.op.node_name) = \
1044 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1045
1046 self.share_locks = ShareAll()
1047 self.needed_locks = {
1048 locking.LEVEL_NODE: [self.op.node_uuid],
1049 }
1050
1052 """Build hooks env.
1053
1054 This runs on the master, the primary and all the secondaries.
1055
1056 """
1057 return {
1058 "NODE_NAME": self.op.node_name,
1059 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1060 }
1061
1063 """Build hooks nodes.
1064
1065 """
1066 nl = [self.cfg.GetMasterNode()]
1067 return (nl, nl)
1068
1071
1072 - def Exec(self, feedback_fn):
1073
1074 jobs = [
1075 [opcodes.OpInstanceMigrate(
1076 instance_name=inst.name,
1077 mode=self.op.mode,
1078 live=self.op.live,
1079 iallocator=self.op.iallocator,
1080 target_node=self.op.target_node,
1081 allow_runtime_changes=self.op.allow_runtime_changes,
1082 ignore_ipolicy=self.op.ignore_ipolicy)]
1083 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1084
1085
1086
1087
1088
1089
1090 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1091 frozenset([self.op.node_uuid]))
1092
1093 return ResultWithJobs(jobs)
1094
1095
1106
1107
1109 """Logical unit for modifying a storage volume on a node.
1110
1111 """
1112 REQ_BGL = False
1113
1115 (self.op.node_uuid, self.op.node_name) = \
1116 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1117
1118 storage_type = self.op.storage_type
1119
1120 try:
1121 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1122 except KeyError:
1123 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1124 " modified" % storage_type,
1125 errors.ECODE_INVAL)
1126
1127 diff = set(self.op.changes.keys()) - modifiable
1128 if diff:
1129 raise errors.OpPrereqError("The following fields can not be modified for"
1130 " storage units of type '%s': %r" %
1131 (storage_type, list(diff)),
1132 errors.ECODE_INVAL)
1133
1139
1141 self.needed_locks = {
1142 locking.LEVEL_NODE: self.op.node_uuid,
1143 }
1144
1145 - def Exec(self, feedback_fn):
1146 """Computes the list of nodes and their attributes.
1147
1148 """
1149 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1150 result = self.rpc.call_storage_modify(self.op.node_uuid,
1151 self.op.storage_type, st_args,
1152 self.op.name, self.op.changes)
1153 result.Raise("Failed to modify storage unit '%s' on %s" %
1154 (self.op.name, self.op.node_name))
1155
1156
1158 FIELDS = query.NODE_FIELDS
1159
1161 lu.needed_locks = {}
1162 lu.share_locks = ShareAll()
1163
1164 if self.names:
1165 (self.wanted, _) = GetWantedNodes(lu, self.names)
1166 else:
1167 self.wanted = locking.ALL_SET
1168
1169 self.do_locking = (self.use_locking and
1170 query.NQ_LIVE in self.requested_data)
1171
1172 if self.do_locking:
1173
1174 lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1175 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1176
1179
1181 """Computes the list of nodes and their attributes.
1182
1183 """
1184 all_info = lu.cfg.GetAllNodesInfo()
1185
1186 node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1187
1188
1189 if query.NQ_LIVE in self.requested_data:
1190
1191 toquery_node_uuids = [node.uuid for node in all_info.values()
1192 if node.vm_capable and node.uuid in node_uuids]
1193 lvm_enabled = utils.storage.IsLvmEnabled(
1194 lu.cfg.GetClusterInfo().enabled_disk_templates)
1195
1196
1197
1198 raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1199 lu.cfg, include_spindles=lvm_enabled)
1200 storage_units = rpc.PrepareStorageUnitsForNodes(
1201 lu.cfg, raw_storage_units, toquery_node_uuids)
1202 default_hypervisor = lu.cfg.GetHypervisorType()
1203 hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1204 hvspecs = [(default_hypervisor, hvparams)]
1205 node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1206 hvspecs)
1207 live_data = dict(
1208 (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1209 require_spindles=lvm_enabled))
1210 for (uuid, nresult) in node_data.items()
1211 if not nresult.fail_msg and nresult.payload)
1212 else:
1213 live_data = None
1214
1215 if query.NQ_INST in self.requested_data:
1216 node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1217 node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1218
1219 inst_data = lu.cfg.GetAllInstancesInfo()
1220 inst_uuid_to_inst_name = {}
1221
1222 for inst in inst_data.values():
1223 inst_uuid_to_inst_name[inst.uuid] = inst.name
1224 if inst.primary_node in node_to_primary:
1225 node_to_primary[inst.primary_node].add(inst.uuid)
1226 for secnode in inst.secondary_nodes:
1227 if secnode in node_to_secondary:
1228 node_to_secondary[secnode].add(inst.uuid)
1229 else:
1230 node_to_primary = None
1231 node_to_secondary = None
1232 inst_uuid_to_inst_name = None
1233
1234 if query.NQ_OOB in self.requested_data:
1235 oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1236 for uuid, node in all_info.iteritems())
1237 else:
1238 oob_support = None
1239
1240 if query.NQ_GROUP in self.requested_data:
1241 groups = lu.cfg.GetAllNodeGroupsInfo()
1242 else:
1243 groups = {}
1244
1245 return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1246 live_data, lu.cfg.GetMasterNode(),
1247 node_to_primary, node_to_secondary,
1248 inst_uuid_to_inst_name, groups, oob_support,
1249 lu.cfg.GetClusterInfo())
1250
1251
1253 """Logical unit for querying nodes.
1254
1255 """
1256
1257 REQ_BGL = False
1258
1262
1265
1268
1269 - def Exec(self, feedback_fn):
1271
1272
1274 """Checks whether all selected fields are valid.
1275
1276 @type static: L{utils.FieldSet}
1277 @param static: static fields set
1278 @type dynamic: L{utils.FieldSet}
1279 @param dynamic: dynamic fields set
1280
1281 """
1282 f = utils.FieldSet()
1283 f.Extend(static)
1284 f.Extend(dynamic)
1285
1286 delta = f.NonMatching(selected)
1287 if delta:
1288 raise errors.OpPrereqError("Unknown output fields selected: %s"
1289 % ",".join(delta), errors.ECODE_INVAL)
1290
1291
1293 """Logical unit for getting volumes on node(s).
1294
1295 """
1296 REQ_BGL = False
1297 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1298 _FIELDS_STATIC = utils.FieldSet("node")
1299
1304
1317
1318 - def Exec(self, feedback_fn):
1319 """Computes the list of nodes and their attributes.
1320
1321 """
1322 node_uuids = self.owned_locks(locking.LEVEL_NODE)
1323 volumes = self.rpc.call_node_volumes(node_uuids)
1324
1325 ilist = self.cfg.GetAllInstancesInfo()
1326 vol2inst = MapInstanceLvsToNodes(ilist.values())
1327
1328 output = []
1329 for node_uuid in node_uuids:
1330 nresult = volumes[node_uuid]
1331 if nresult.offline:
1332 continue
1333 msg = nresult.fail_msg
1334 if msg:
1335 self.LogWarning("Can't compute volume data on node %s: %s",
1336 self.cfg.GetNodeName(node_uuid), msg)
1337 continue
1338
1339 node_vols = sorted(nresult.payload,
1340 key=operator.itemgetter("dev"))
1341
1342 for vol in node_vols:
1343 node_output = []
1344 for field in self.op.output_fields:
1345 if field == "node":
1346 val = self.cfg.GetNodeName(node_uuid)
1347 elif field == "phys":
1348 val = vol["dev"]
1349 elif field == "vg":
1350 val = vol["vg"]
1351 elif field == "name":
1352 val = vol["name"]
1353 elif field == "size":
1354 val = int(float(vol["size"]))
1355 elif field == "instance":
1356 inst = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]),
1357 None)
1358 if inst is not None:
1359 val = inst.name
1360 else:
1361 val = "-"
1362 else:
1363 raise errors.ParameterError(field)
1364 node_output.append(str(val))
1365
1366 output.append(node_output)
1367
1368 return output
1369
1370
1372 """Logical unit for getting information on storage units on node(s).
1373
1374 """
1375 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1376 REQ_BGL = False
1377
1382
1395
1401
1402 - def Exec(self, feedback_fn):
1403 """Computes the list of nodes and their attributes.
1404
1405 """
1406 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1407
1408
1409 if constants.SF_NAME in self.op.output_fields:
1410 fields = self.op.output_fields[:]
1411 else:
1412 fields = [constants.SF_NAME] + self.op.output_fields
1413
1414
1415 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1416 while extra in fields:
1417 fields.remove(extra)
1418
1419 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1420 name_idx = field_idx[constants.SF_NAME]
1421
1422 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1423 data = self.rpc.call_storage_list(self.node_uuids,
1424 self.op.storage_type, st_args,
1425 self.op.name, fields)
1426
1427 result = []
1428
1429 for node_uuid in utils.NiceSort(self.node_uuids):
1430 node_name = self.cfg.GetNodeName(node_uuid)
1431 nresult = data[node_uuid]
1432 if nresult.offline:
1433 continue
1434
1435 msg = nresult.fail_msg
1436 if msg:
1437 self.LogWarning("Can't get storage data from node %s: %s",
1438 node_name, msg)
1439 continue
1440
1441 rows = dict([(row[name_idx], row) for row in nresult.payload])
1442
1443 for name in utils.NiceSort(rows.keys()):
1444 row = rows[name]
1445
1446 out = []
1447
1448 for field in self.op.output_fields:
1449 if field == constants.SF_NODE:
1450 val = node_name
1451 elif field == constants.SF_TYPE:
1452 val = self.op.storage_type
1453 elif field in field_idx:
1454 val = row[field_idx[field]]
1455 else:
1456 raise errors.ParameterError(field)
1457
1458 out.append(val)
1459
1460 result.append(out)
1461
1462 return result
1463
1464
1466 """Logical unit for removing a node.
1467
1468 """
1469 HPATH = "node-remove"
1470 HTYPE = constants.HTYPE_NODE
1471
1473 """Build hooks env.
1474
1475 """
1476 return {
1477 "OP_TARGET": self.op.node_name,
1478 "NODE_NAME": self.op.node_name,
1479 }
1480
1482 """Build hooks nodes.
1483
1484 This doesn't run on the target node in the pre phase as a failed
1485 node would then be impossible to remove.
1486
1487 """
1488 all_nodes = self.cfg.GetNodeList()
1489 try:
1490 all_nodes.remove(self.op.node_uuid)
1491 except ValueError:
1492 pass
1493 return (all_nodes, all_nodes)
1494
1523
1524 - def Exec(self, feedback_fn):
1559
1560
1562 """Repairs the volume group on a node.
1563
1564 """
1565 REQ_BGL = False
1566
1578
1580 self.needed_locks = {
1581 locking.LEVEL_NODE: [self.op.node_uuid],
1582 }
1583
1599
1601 """Check prerequisites.
1602
1603 """
1604 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1605
1606
1607 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1608 if not inst.disks_active:
1609 continue
1610 check_nodes = set(inst.all_nodes)
1611 check_nodes.discard(self.op.node_uuid)
1612 for inst_node_uuid in check_nodes:
1613 self._CheckFaultyDisks(inst, inst_node_uuid)
1614
1615 - def Exec(self, feedback_fn):
1626