Package ganeti :: Package cmdlib :: Module cluster
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.cluster

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Logical units dealing with the cluster.""" 
  23   
  24  import OpenSSL 
  25   
  26  import copy 
  27  import itertools 
  28  import logging 
  29  import operator 
  30  import os 
  31  import re 
  32  import time 
  33   
  34  from ganeti import compat 
  35  from ganeti import constants 
  36  from ganeti import errors 
  37  from ganeti import hypervisor 
  38  from ganeti import locking 
  39  from ganeti import masterd 
  40  from ganeti import netutils 
  41  from ganeti import objects 
  42  from ganeti import opcodes 
  43  from ganeti import pathutils 
  44  from ganeti import query 
  45  from ganeti import rpc 
  46  from ganeti import runtime 
  47  from ganeti import ssh 
  48  from ganeti import uidpool 
  49  from ganeti import utils 
  50  from ganeti import vcluster 
  51   
  52  from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \ 
  53    ResultWithJobs 
  54  from ganeti.cmdlib.common import ShareAll, RunPostHook, \ 
  55    ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \ 
  56    GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  57    GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \ 
  58    CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \ 
  59    ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob 
  60   
  61  import ganeti.masterd.instance 
62 63 64 -class LUClusterActivateMasterIp(NoHooksLU):
65 """Activate the master IP on the master node. 66 67 """
68 - def Exec(self, feedback_fn):
69 """Activate the master IP. 70 71 """ 72 master_params = self.cfg.GetMasterNetworkParameters() 73 ems = self.cfg.GetUseExternalMipScript() 74 result = self.rpc.call_node_activate_master_ip(master_params.name, 75 master_params, ems) 76 result.Raise("Could not activate the master IP")
77
78 79 -class LUClusterDeactivateMasterIp(NoHooksLU):
80 """Deactivate the master IP on the master node. 81 82 """
83 - def Exec(self, feedback_fn):
84 """Deactivate the master IP. 85 86 """ 87 master_params = self.cfg.GetMasterNetworkParameters() 88 ems = self.cfg.GetUseExternalMipScript() 89 result = self.rpc.call_node_deactivate_master_ip(master_params.name, 90 master_params, ems) 91 result.Raise("Could not deactivate the master IP")
92
93 94 -class LUClusterConfigQuery(NoHooksLU):
95 """Return configuration values. 96 97 """ 98 REQ_BGL = False 99
100 - def CheckArguments(self):
101 self.cq = ClusterQuery(None, self.op.output_fields, False)
102
103 - def ExpandNames(self):
104 self.cq.ExpandNames(self)
105
106 - def DeclareLocks(self, level):
107 self.cq.DeclareLocks(self, level)
108
109 - def Exec(self, feedback_fn):
110 result = self.cq.OldStyleQuery(self) 111 112 assert len(result) == 1 113 114 return result[0]
115
116 117 -class LUClusterDestroy(LogicalUnit):
118 """Logical unit for destroying the cluster. 119 120 """ 121 HPATH = "cluster-destroy" 122 HTYPE = constants.HTYPE_CLUSTER 123
124 - def BuildHooksEnv(self):
125 """Build hooks env. 126 127 """ 128 return { 129 "OP_TARGET": self.cfg.GetClusterName(), 130 }
131
132 - def BuildHooksNodes(self):
133 """Build hooks nodes. 134 135 """ 136 return ([], [])
137
138 - def CheckPrereq(self):
139 """Check prerequisites. 140 141 This checks whether the cluster is empty. 142 143 Any errors are signaled by raising errors.OpPrereqError. 144 145 """ 146 master = self.cfg.GetMasterNode() 147 148 nodelist = self.cfg.GetNodeList() 149 if len(nodelist) != 1 or nodelist[0] != master: 150 raise errors.OpPrereqError("There are still %d node(s) in" 151 " this cluster." % (len(nodelist) - 1), 152 errors.ECODE_INVAL) 153 instancelist = self.cfg.GetInstanceList() 154 if instancelist: 155 raise errors.OpPrereqError("There are still %d instance(s) in" 156 " this cluster." % len(instancelist), 157 errors.ECODE_INVAL)
158
159 - def Exec(self, feedback_fn):
160 """Destroys the cluster. 161 162 """ 163 master_params = self.cfg.GetMasterNetworkParameters() 164 165 # Run post hooks on master node before it's removed 166 RunPostHook(self, master_params.name) 167 168 ems = self.cfg.GetUseExternalMipScript() 169 result = self.rpc.call_node_deactivate_master_ip(master_params.name, 170 master_params, ems) 171 if result.fail_msg: 172 self.LogWarning("Error disabling the master IP address: %s", 173 result.fail_msg) 174 175 return master_params.name
176
177 178 -class LUClusterPostInit(LogicalUnit):
179 """Logical unit for running hooks after cluster initialization. 180 181 """ 182 HPATH = "cluster-init" 183 HTYPE = constants.HTYPE_CLUSTER 184
185 - def BuildHooksEnv(self):
186 """Build hooks env. 187 188 """ 189 return { 190 "OP_TARGET": self.cfg.GetClusterName(), 191 }
192
193 - def BuildHooksNodes(self):
194 """Build hooks nodes. 195 196 """ 197 return ([], [self.cfg.GetMasterNode()])
198
199 - def Exec(self, feedback_fn):
200 """Nothing to do. 201 202 """ 203 return True
204
205 206 -class ClusterQuery(QueryBase):
207 FIELDS = query.CLUSTER_FIELDS 208 209 #: Do not sort (there is only one item) 210 SORT_FIELD = None 211
212 - def ExpandNames(self, lu):
213 lu.needed_locks = {} 214 215 # The following variables interact with _QueryBase._GetNames 216 self.wanted = locking.ALL_SET 217 self.do_locking = self.use_locking 218 219 if self.do_locking: 220 raise errors.OpPrereqError("Can not use locking for cluster queries", 221 errors.ECODE_INVAL)
222
223 - def DeclareLocks(self, lu, level):
224 pass
225
226 - def _GetQueryData(self, lu):
227 """Computes the list of nodes and their attributes. 228 229 """ 230 # Locking is not used 231 assert not (compat.any(lu.glm.is_owned(level) 232 for level in locking.LEVELS 233 if level != locking.LEVEL_CLUSTER) or 234 self.do_locking or self.use_locking) 235 236 if query.CQ_CONFIG in self.requested_data: 237 cluster = lu.cfg.GetClusterInfo() 238 else: 239 cluster = NotImplemented 240 241 if query.CQ_QUEUE_DRAINED in self.requested_data: 242 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE) 243 else: 244 drain_flag = NotImplemented 245 246 if query.CQ_WATCHER_PAUSE in self.requested_data: 247 master_name = lu.cfg.GetMasterNode() 248 249 result = lu.rpc.call_get_watcher_pause(master_name) 250 result.Raise("Can't retrieve watcher pause from master node '%s'" % 251 master_name) 252 253 watcher_pause = result.payload 254 else: 255 watcher_pause = NotImplemented 256 257 return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258
259 260 -class LUClusterQuery(NoHooksLU):
261 """Query cluster configuration. 262 263 """ 264 REQ_BGL = False 265
266 - def ExpandNames(self):
267 self.needed_locks = {}
268
269 - def Exec(self, feedback_fn):
270 """Return cluster config. 271 272 """ 273 cluster = self.cfg.GetClusterInfo() 274 os_hvp = {} 275 276 # Filter just for enabled hypervisors 277 for os_name, hv_dict in cluster.os_hvp.items(): 278 os_hvp[os_name] = {} 279 for hv_name, hv_params in hv_dict.items(): 280 if hv_name in cluster.enabled_hypervisors: 281 os_hvp[os_name][hv_name] = hv_params 282 283 # Convert ip_family to ip_version 284 primary_ip_version = constants.IP4_VERSION 285 if cluster.primary_ip_family == netutils.IP6Address.family: 286 primary_ip_version = constants.IP6_VERSION 287 288 result = { 289 "software_version": constants.RELEASE_VERSION, 290 "protocol_version": constants.PROTOCOL_VERSION, 291 "config_version": constants.CONFIG_VERSION, 292 "os_api_version": max(constants.OS_API_VERSIONS), 293 "export_version": constants.EXPORT_VERSION, 294 "vcs_version": constants.VCS_VERSION, 295 "architecture": runtime.GetArchInfo(), 296 "name": cluster.cluster_name, 297 "master": cluster.master_node, 298 "default_hypervisor": cluster.primary_hypervisor, 299 "enabled_hypervisors": cluster.enabled_hypervisors, 300 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name]) 301 for hypervisor_name in cluster.enabled_hypervisors]), 302 "os_hvp": os_hvp, 303 "beparams": cluster.beparams, 304 "osparams": cluster.osparams, 305 "ipolicy": cluster.ipolicy, 306 "nicparams": cluster.nicparams, 307 "ndparams": cluster.ndparams, 308 "diskparams": cluster.diskparams, 309 "candidate_pool_size": cluster.candidate_pool_size, 310 "master_netdev": cluster.master_netdev, 311 "master_netmask": cluster.master_netmask, 312 "use_external_mip_script": cluster.use_external_mip_script, 313 "volume_group_name": cluster.volume_group_name, 314 "drbd_usermode_helper": cluster.drbd_usermode_helper, 315 "file_storage_dir": cluster.file_storage_dir, 316 "shared_file_storage_dir": cluster.shared_file_storage_dir, 317 "maintain_node_health": cluster.maintain_node_health, 318 "ctime": cluster.ctime, 319 "mtime": cluster.mtime, 320 "uuid": cluster.uuid, 321 "tags": list(cluster.GetTags()), 322 "uid_pool": cluster.uid_pool, 323 "default_iallocator": cluster.default_iallocator, 324 "reserved_lvs": cluster.reserved_lvs, 325 "primary_ip_version": primary_ip_version, 326 "prealloc_wipe_disks": cluster.prealloc_wipe_disks, 327 "hidden_os": cluster.hidden_os, 328 "blacklisted_os": cluster.blacklisted_os, 329 "enabled_disk_templates": cluster.enabled_disk_templates, 330 } 331 332 return result
333
334 335 -class LUClusterRedistConf(NoHooksLU):
336 """Force the redistribution of cluster configuration. 337 338 This is a very simple LU. 339 340 """ 341 REQ_BGL = False 342
343 - def ExpandNames(self):
344 self.needed_locks = { 345 locking.LEVEL_NODE: locking.ALL_SET, 346 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 347 } 348 self.share_locks = ShareAll()
349
350 - def Exec(self, feedback_fn):
351 """Redistribute the configuration. 352 353 """ 354 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn) 355 RedistributeAncillaryFiles(self)
356
357 358 -class LUClusterRename(LogicalUnit):
359 """Rename the cluster. 360 361 """ 362 HPATH = "cluster-rename" 363 HTYPE = constants.HTYPE_CLUSTER 364
365 - def BuildHooksEnv(self):
366 """Build hooks env. 367 368 """ 369 return { 370 "OP_TARGET": self.cfg.GetClusterName(), 371 "NEW_NAME": self.op.name, 372 }
373
374 - def BuildHooksNodes(self):
375 """Build hooks nodes. 376 377 """ 378 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379
380 - def CheckPrereq(self):
381 """Verify that the passed name is a valid one. 382 383 """ 384 hostname = netutils.GetHostname(name=self.op.name, 385 family=self.cfg.GetPrimaryIPFamily()) 386 387 new_name = hostname.name 388 self.ip = new_ip = hostname.ip 389 old_name = self.cfg.GetClusterName() 390 old_ip = self.cfg.GetMasterIP() 391 if new_name == old_name and new_ip == old_ip: 392 raise errors.OpPrereqError("Neither the name nor the IP address of the" 393 " cluster has changed", 394 errors.ECODE_INVAL) 395 if new_ip != old_ip: 396 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): 397 raise errors.OpPrereqError("The given cluster IP address (%s) is" 398 " reachable on the network" % 399 new_ip, errors.ECODE_NOTUNIQUE) 400 401 self.op.name = new_name
402
403 - def Exec(self, feedback_fn):
404 """Rename the cluster. 405 406 """ 407 clustername = self.op.name 408 new_ip = self.ip 409 410 # shutdown the master IP 411 master_params = self.cfg.GetMasterNetworkParameters() 412 ems = self.cfg.GetUseExternalMipScript() 413 result = self.rpc.call_node_deactivate_master_ip(master_params.name, 414 master_params, ems) 415 result.Raise("Could not disable the master role") 416 417 try: 418 cluster = self.cfg.GetClusterInfo() 419 cluster.cluster_name = clustername 420 cluster.master_ip = new_ip 421 self.cfg.Update(cluster, feedback_fn) 422 423 # update the known hosts file 424 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE) 425 node_list = self.cfg.GetOnlineNodeList() 426 try: 427 node_list.remove(master_params.name) 428 except ValueError: 429 pass 430 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE) 431 finally: 432 master_params.ip = new_ip 433 result = self.rpc.call_node_activate_master_ip(master_params.name, 434 master_params, ems) 435 msg = result.fail_msg 436 if msg: 437 self.LogWarning("Could not re-enable the master role on" 438 " the master, please restart manually: %s", msg) 439 440 return clustername
441
442 443 -class LUClusterRepairDiskSizes(NoHooksLU):
444 """Verifies the cluster disks sizes. 445 446 """ 447 REQ_BGL = False 448
449 - def ExpandNames(self):
450 if self.op.instances: 451 self.wanted_names = GetWantedInstances(self, self.op.instances) 452 # Not getting the node allocation lock as only a specific set of 453 # instances (and their nodes) is going to be acquired 454 self.needed_locks = { 455 locking.LEVEL_NODE_RES: [], 456 locking.LEVEL_INSTANCE: self.wanted_names, 457 } 458 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE 459 else: 460 self.wanted_names = None 461 self.needed_locks = { 462 locking.LEVEL_NODE_RES: locking.ALL_SET, 463 locking.LEVEL_INSTANCE: locking.ALL_SET, 464 465 # This opcode is acquires the node locks for all instances 466 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 467 } 468 469 self.share_locks = { 470 locking.LEVEL_NODE_RES: 1, 471 locking.LEVEL_INSTANCE: 0, 472 locking.LEVEL_NODE_ALLOC: 1, 473 }
474
475 - def DeclareLocks(self, level):
476 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None: 477 self._LockInstancesNodes(primary_only=True, level=level)
478
479 - def CheckPrereq(self):
480 """Check prerequisites. 481 482 This only checks the optional instance list against the existing names. 483 484 """ 485 if self.wanted_names is None: 486 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE) 487 488 self.wanted_instances = \ 489 map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
490
491 - def _EnsureChildSizes(self, disk):
492 """Ensure children of the disk have the needed disk size. 493 494 This is valid mainly for DRBD8 and fixes an issue where the 495 children have smaller disk size. 496 497 @param disk: an L{ganeti.objects.Disk} object 498 499 """ 500 if disk.dev_type == constants.LD_DRBD8: 501 assert disk.children, "Empty children for DRBD8?" 502 fchild = disk.children[0] 503 mismatch = fchild.size < disk.size 504 if mismatch: 505 self.LogInfo("Child disk has size %d, parent %d, fixing", 506 fchild.size, disk.size) 507 fchild.size = disk.size 508 509 # and we recurse on this child only, not on the metadev 510 return self._EnsureChildSizes(fchild) or mismatch 511 else: 512 return False
513
514 - def Exec(self, feedback_fn):
515 """Verify the size of cluster disks. 516 517 """ 518 # TODO: check child disks too 519 # TODO: check differences in size between primary/secondary nodes 520 per_node_disks = {} 521 for instance in self.wanted_instances: 522 pnode = instance.primary_node 523 if pnode not in per_node_disks: 524 per_node_disks[pnode] = [] 525 for idx, disk in enumerate(instance.disks): 526 per_node_disks[pnode].append((instance, idx, disk)) 527 528 assert not (frozenset(per_node_disks.keys()) - 529 self.owned_locks(locking.LEVEL_NODE_RES)), \ 530 "Not owning correct locks" 531 assert not self.owned_locks(locking.LEVEL_NODE) 532 533 changed = [] 534 for node, dskl in per_node_disks.items(): 535 newl = [v[2].Copy() for v in dskl] 536 for dsk in newl: 537 self.cfg.SetDiskID(dsk, node) 538 result = self.rpc.call_blockdev_getsize(node, newl) 539 if result.fail_msg: 540 self.LogWarning("Failure in blockdev_getsize call to node" 541 " %s, ignoring", node) 542 continue 543 if len(result.payload) != len(dskl): 544 logging.warning("Invalid result from node %s: len(dksl)=%d," 545 " result.payload=%s", node, len(dskl), result.payload) 546 self.LogWarning("Invalid result from node %s, ignoring node results", 547 node) 548 continue 549 for ((instance, idx, disk), size) in zip(dskl, result.payload): 550 if size is None: 551 self.LogWarning("Disk %d of instance %s did not return size" 552 " information, ignoring", idx, instance.name) 553 continue 554 if not isinstance(size, (int, long)): 555 self.LogWarning("Disk %d of instance %s did not return valid" 556 " size information, ignoring", idx, instance.name) 557 continue 558 size = size >> 20 559 if size != disk.size: 560 self.LogInfo("Disk %d of instance %s has mismatched size," 561 " correcting: recorded %d, actual %d", idx, 562 instance.name, disk.size, size) 563 disk.size = size 564 self.cfg.Update(instance, feedback_fn) 565 changed.append((instance.name, idx, size)) 566 if self._EnsureChildSizes(disk): 567 self.cfg.Update(instance, feedback_fn) 568 changed.append((instance.name, idx, disk.size)) 569 return changed
570
571 572 -def _ValidateNetmask(cfg, netmask):
573 """Checks if a netmask is valid. 574 575 @type cfg: L{config.ConfigWriter} 576 @param cfg: The cluster configuration 577 @type netmask: int 578 @param netmask: the netmask to be verified 579 @raise errors.OpPrereqError: if the validation fails 580 581 """ 582 ip_family = cfg.GetPrimaryIPFamily() 583 try: 584 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family) 585 except errors.ProgrammerError: 586 raise errors.OpPrereqError("Invalid primary ip family: %s." % 587 ip_family, errors.ECODE_INVAL) 588 if not ipcls.ValidateNetmask(netmask): 589 raise errors.OpPrereqError("CIDR netmask (%s) not valid" % 590 (netmask), errors.ECODE_INVAL)
591
592 593 -class LUClusterSetParams(LogicalUnit):
594 """Change the parameters of the cluster. 595 596 """ 597 HPATH = "cluster-modify" 598 HTYPE = constants.HTYPE_CLUSTER 599 REQ_BGL = False 600
601 - def CheckArguments(self):
602 """Check parameters 603 604 """ 605 if self.op.uid_pool: 606 uidpool.CheckUidPool(self.op.uid_pool) 607 608 if self.op.add_uids: 609 uidpool.CheckUidPool(self.op.add_uids) 610 611 if self.op.remove_uids: 612 uidpool.CheckUidPool(self.op.remove_uids) 613 614 if self.op.master_netmask is not None: 615 _ValidateNetmask(self.cfg, self.op.master_netmask) 616 617 if self.op.diskparams: 618 for dt_params in self.op.diskparams.values(): 619 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES) 620 try: 621 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS) 622 except errors.OpPrereqError, err: 623 raise errors.OpPrereqError("While verify diskparams options: %s" % err, 624 errors.ECODE_INVAL)
625
626 - def ExpandNames(self):
627 # FIXME: in the future maybe other cluster params won't require checking on 628 # all nodes to be modified. 629 # FIXME: This opcode changes cluster-wide settings. Is acquiring all 630 # resource locks the right thing, shouldn't it be the BGL instead? 631 self.needed_locks = { 632 locking.LEVEL_NODE: locking.ALL_SET, 633 locking.LEVEL_INSTANCE: locking.ALL_SET, 634 locking.LEVEL_NODEGROUP: locking.ALL_SET, 635 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 636 } 637 self.share_locks = ShareAll()
638
639 - def BuildHooksEnv(self):
640 """Build hooks env. 641 642 """ 643 return { 644 "OP_TARGET": self.cfg.GetClusterName(), 645 "NEW_VG_NAME": self.op.vg_name, 646 }
647
648 - def BuildHooksNodes(self):
649 """Build hooks nodes. 650 651 """ 652 mn = self.cfg.GetMasterNode() 653 return ([mn], [mn])
654
655 - def CheckPrereq(self):
656 """Check prerequisites. 657 658 This checks whether the given params don't conflict and 659 if the given volume group is valid. 660 661 """ 662 if self.op.vg_name is not None and not self.op.vg_name: 663 if self.cfg.HasAnyDiskOfType(constants.LD_LV): 664 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based" 665 " instances exist", errors.ECODE_INVAL) 666 667 if self.op.drbd_helper is not None and not self.op.drbd_helper: 668 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8): 669 raise errors.OpPrereqError("Cannot disable drbd helper while" 670 " drbd-based instances exist", 671 errors.ECODE_INVAL) 672 673 node_list = self.owned_locks(locking.LEVEL_NODE) 674 675 vm_capable_nodes = [node.name 676 for node in self.cfg.GetAllNodesInfo().values() 677 if node.name in node_list and node.vm_capable] 678 679 # if vg_name not None, checks given volume group on all nodes 680 if self.op.vg_name: 681 vglist = self.rpc.call_vg_list(vm_capable_nodes) 682 for node in vm_capable_nodes: 683 msg = vglist[node].fail_msg 684 if msg: 685 # ignoring down node 686 self.LogWarning("Error while gathering data on node %s" 687 " (ignoring node): %s", node, msg) 688 continue 689 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload, 690 self.op.vg_name, 691 constants.MIN_VG_SIZE) 692 if vgstatus: 693 raise errors.OpPrereqError("Error on node '%s': %s" % 694 (node, vgstatus), errors.ECODE_ENVIRON) 695 696 if self.op.drbd_helper: 697 # checks given drbd helper on all nodes 698 helpers = self.rpc.call_drbd_helper(node_list) 699 for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list): 700 if ninfo.offline: 701 self.LogInfo("Not checking drbd helper on offline node %s", node) 702 continue 703 msg = helpers[node].fail_msg 704 if msg: 705 raise errors.OpPrereqError("Error checking drbd helper on node" 706 " '%s': %s" % (node, msg), 707 errors.ECODE_ENVIRON) 708 node_helper = helpers[node].payload 709 if node_helper != self.op.drbd_helper: 710 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" % 711 (node, node_helper), errors.ECODE_ENVIRON) 712 713 self.cluster = cluster = self.cfg.GetClusterInfo() 714 # validate params changes 715 if self.op.beparams: 716 objects.UpgradeBeParams(self.op.beparams) 717 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) 718 self.new_beparams = cluster.SimpleFillBE(self.op.beparams) 719 720 if self.op.ndparams: 721 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 722 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams) 723 724 # TODO: we need a more general way to handle resetting 725 # cluster-level parameters to default values 726 if self.new_ndparams["oob_program"] == "": 727 self.new_ndparams["oob_program"] = \ 728 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM] 729 730 if self.op.hv_state: 731 new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 732 self.cluster.hv_state_static) 733 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values)) 734 for hv, values in new_hv_state.items()) 735 736 if self.op.disk_state: 737 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, 738 self.cluster.disk_state_static) 739 self.new_disk_state = \ 740 dict((storage, dict((name, cluster.SimpleFillDiskState(values)) 741 for name, values in svalues.items())) 742 for storage, svalues in new_disk_state.items()) 743 744 if self.op.ipolicy: 745 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy, 746 group_policy=False) 747 748 all_instances = self.cfg.GetAllInstancesInfo().values() 749 violations = set() 750 for group in self.cfg.GetAllNodeGroupsInfo().values(): 751 instances = frozenset([inst for inst in all_instances 752 if compat.any(node in group.members 753 for node in inst.all_nodes)]) 754 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy) 755 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group) 756 new = ComputeNewInstanceViolations(ipol, 757 new_ipolicy, instances, self.cfg) 758 if new: 759 violations.update(new) 760 761 if violations: 762 self.LogWarning("After the ipolicy change the following instances" 763 " violate them: %s", 764 utils.CommaJoin(utils.NiceSort(violations))) 765 766 if self.op.nicparams: 767 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) 768 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams) 769 objects.NIC.CheckParameterSyntax(self.new_nicparams) 770 nic_errors = [] 771 772 # check all instances for consistency 773 for instance in self.cfg.GetAllInstancesInfo().values(): 774 for nic_idx, nic in enumerate(instance.nics): 775 params_copy = copy.deepcopy(nic.nicparams) 776 params_filled = objects.FillDict(self.new_nicparams, params_copy) 777 778 # check parameter syntax 779 try: 780 objects.NIC.CheckParameterSyntax(params_filled) 781 except errors.ConfigurationError, err: 782 nic_errors.append("Instance %s, nic/%d: %s" % 783 (instance.name, nic_idx, err)) 784 785 # if we're moving instances to routed, check that they have an ip 786 target_mode = params_filled[constants.NIC_MODE] 787 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip: 788 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip" 789 " address" % (instance.name, nic_idx)) 790 if nic_errors: 791 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" % 792 "\n".join(nic_errors), errors.ECODE_INVAL) 793 794 # hypervisor list/parameters 795 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {}) 796 if self.op.hvparams: 797 for hv_name, hv_dict in self.op.hvparams.items(): 798 if hv_name not in self.new_hvparams: 799 self.new_hvparams[hv_name] = hv_dict 800 else: 801 self.new_hvparams[hv_name].update(hv_dict) 802 803 # disk template parameters 804 self.new_diskparams = objects.FillDict(cluster.diskparams, {}) 805 if self.op.diskparams: 806 for dt_name, dt_params in self.op.diskparams.items(): 807 if dt_name not in self.new_diskparams: 808 self.new_diskparams[dt_name] = dt_params 809 else: 810 self.new_diskparams[dt_name].update(dt_params) 811 812 # os hypervisor parameters 813 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {}) 814 if self.op.os_hvp: 815 for os_name, hvs in self.op.os_hvp.items(): 816 if os_name not in self.new_os_hvp: 817 self.new_os_hvp[os_name] = hvs 818 else: 819 for hv_name, hv_dict in hvs.items(): 820 if hv_dict is None: 821 # Delete if it exists 822 self.new_os_hvp[os_name].pop(hv_name, None) 823 elif hv_name not in self.new_os_hvp[os_name]: 824 self.new_os_hvp[os_name][hv_name] = hv_dict 825 else: 826 self.new_os_hvp[os_name][hv_name].update(hv_dict) 827 828 # os parameters 829 self.new_osp = objects.FillDict(cluster.osparams, {}) 830 if self.op.osparams: 831 for os_name, osp in self.op.osparams.items(): 832 if os_name not in self.new_osp: 833 self.new_osp[os_name] = {} 834 835 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp, 836 use_none=True) 837 838 if not self.new_osp[os_name]: 839 # we removed all parameters 840 del self.new_osp[os_name] 841 else: 842 # check the parameter validity (remote check) 843 CheckOSParams(self, False, [self.cfg.GetMasterNode()], 844 os_name, self.new_osp[os_name]) 845 846 # changes to the hypervisor list 847 if self.op.enabled_hypervisors is not None: 848 self.hv_list = self.op.enabled_hypervisors 849 for hv in self.hv_list: 850 # if the hypervisor doesn't already exist in the cluster 851 # hvparams, we initialize it to empty, and then (in both 852 # cases) we make sure to fill the defaults, as we might not 853 # have a complete defaults list if the hypervisor wasn't 854 # enabled before 855 if hv not in new_hvp: 856 new_hvp[hv] = {} 857 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv]) 858 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES) 859 else: 860 self.hv_list = cluster.enabled_hypervisors 861 862 if self.op.hvparams or self.op.enabled_hypervisors is not None: 863 # either the enabled list has changed, or the parameters have, validate 864 for hv_name, hv_params in self.new_hvparams.items(): 865 if ((self.op.hvparams and hv_name in self.op.hvparams) or 866 (self.op.enabled_hypervisors and 867 hv_name in self.op.enabled_hypervisors)): 868 # either this is a new hypervisor, or its parameters have changed 869 hv_class = hypervisor.GetHypervisorClass(hv_name) 870 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) 871 hv_class.CheckParameterSyntax(hv_params) 872 CheckHVParams(self, node_list, hv_name, hv_params) 873 874 self._CheckDiskTemplateConsistency() 875 876 if self.op.os_hvp: 877 # no need to check any newly-enabled hypervisors, since the 878 # defaults have already been checked in the above code-block 879 for os_name, os_hvp in self.new_os_hvp.items(): 880 for hv_name, hv_params in os_hvp.items(): 881 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) 882 # we need to fill in the new os_hvp on top of the actual hv_p 883 cluster_defaults = self.new_hvparams.get(hv_name, {}) 884 new_osp = objects.FillDict(cluster_defaults, hv_params) 885 hv_class = hypervisor.GetHypervisorClass(hv_name) 886 hv_class.CheckParameterSyntax(new_osp) 887 CheckHVParams(self, node_list, hv_name, new_osp) 888 889 if self.op.default_iallocator: 890 alloc_script = utils.FindFile(self.op.default_iallocator, 891 constants.IALLOCATOR_SEARCH_PATH, 892 os.path.isfile) 893 if alloc_script is None: 894 raise errors.OpPrereqError("Invalid default iallocator script '%s'" 895 " specified" % self.op.default_iallocator, 896 errors.ECODE_INVAL)
897
899 """Check whether the disk templates that are going to be disabled 900 are still in use by some instances. 901 902 """ 903 if self.op.enabled_disk_templates: 904 cluster = self.cfg.GetClusterInfo() 905 instances = self.cfg.GetAllInstancesInfo() 906 907 disk_templates_to_remove = set(cluster.enabled_disk_templates) \ 908 - set(self.op.enabled_disk_templates) 909 for instance in instances.itervalues(): 910 if instance.disk_template in disk_templates_to_remove: 911 raise errors.OpPrereqError("Cannot disable disk template '%s'," 912 " because instance '%s' is using it." % 913 (instance.disk_template, instance.name))
914
915 - def Exec(self, feedback_fn):
916 """Change the parameters of the cluster. 917 918 """ 919 if self.op.vg_name is not None: 920 new_volume = self.op.vg_name 921 if not new_volume: 922 new_volume = None 923 if new_volume != self.cfg.GetVGName(): 924 self.cfg.SetVGName(new_volume) 925 else: 926 feedback_fn("Cluster LVM configuration already in desired" 927 " state, not changing") 928 if self.op.drbd_helper is not None: 929 new_helper = self.op.drbd_helper 930 if not new_helper: 931 new_helper = None 932 if new_helper != self.cfg.GetDRBDHelper(): 933 self.cfg.SetDRBDHelper(new_helper) 934 else: 935 feedback_fn("Cluster DRBD helper already in desired state," 936 " not changing") 937 if self.op.hvparams: 938 self.cluster.hvparams = self.new_hvparams 939 if self.op.os_hvp: 940 self.cluster.os_hvp = self.new_os_hvp 941 if self.op.enabled_hypervisors is not None: 942 self.cluster.hvparams = self.new_hvparams 943 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors 944 if self.op.enabled_disk_templates: 945 self.cluster.enabled_disk_templates = \ 946 list(set(self.op.enabled_disk_templates)) 947 if self.op.beparams: 948 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams 949 if self.op.nicparams: 950 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams 951 if self.op.ipolicy: 952 self.cluster.ipolicy = self.new_ipolicy 953 if self.op.osparams: 954 self.cluster.osparams = self.new_osp 955 if self.op.ndparams: 956 self.cluster.ndparams = self.new_ndparams 957 if self.op.diskparams: 958 self.cluster.diskparams = self.new_diskparams 959 if self.op.hv_state: 960 self.cluster.hv_state_static = self.new_hv_state 961 if self.op.disk_state: 962 self.cluster.disk_state_static = self.new_disk_state 963 964 if self.op.candidate_pool_size is not None: 965 self.cluster.candidate_pool_size = self.op.candidate_pool_size 966 # we need to update the pool size here, otherwise the save will fail 967 AdjustCandidatePool(self, []) 968 969 if self.op.maintain_node_health is not None: 970 if self.op.maintain_node_health and not constants.ENABLE_CONFD: 971 feedback_fn("Note: CONFD was disabled at build time, node health" 972 " maintenance is not useful (still enabling it)") 973 self.cluster.maintain_node_health = self.op.maintain_node_health 974 975 if self.op.modify_etc_hosts is not None: 976 self.cluster.modify_etc_hosts = self.op.modify_etc_hosts 977 978 if self.op.prealloc_wipe_disks is not None: 979 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks 980 981 if self.op.add_uids is not None: 982 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids) 983 984 if self.op.remove_uids is not None: 985 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids) 986 987 if self.op.uid_pool is not None: 988 self.cluster.uid_pool = self.op.uid_pool 989 990 if self.op.default_iallocator is not None: 991 self.cluster.default_iallocator = self.op.default_iallocator 992 993 if self.op.reserved_lvs is not None: 994 self.cluster.reserved_lvs = self.op.reserved_lvs 995 996 if self.op.use_external_mip_script is not None: 997 self.cluster.use_external_mip_script = self.op.use_external_mip_script 998 999 def helper_os(aname, mods, desc): 1000 desc += " OS list" 1001 lst = getattr(self.cluster, aname) 1002 for key, val in mods: 1003 if key == constants.DDM_ADD: 1004 if val in lst: 1005 feedback_fn("OS %s already in %s, ignoring" % (val, desc)) 1006 else: 1007 lst.append(val) 1008 elif key == constants.DDM_REMOVE: 1009 if val in lst: 1010 lst.remove(val) 1011 else: 1012 feedback_fn("OS %s not found in %s, ignoring" % (val, desc)) 1013 else: 1014 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1015 1016 if self.op.hidden_os: 1017 helper_os("hidden_os", self.op.hidden_os, "hidden") 1018 1019 if self.op.blacklisted_os: 1020 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted") 1021 1022 if self.op.master_netdev: 1023 master_params = self.cfg.GetMasterNetworkParameters() 1024 ems = self.cfg.GetUseExternalMipScript() 1025 feedback_fn("Shutting down master ip on the current netdev (%s)" % 1026 self.cluster.master_netdev) 1027 result = self.rpc.call_node_deactivate_master_ip(master_params.name, 1028 master_params, ems) 1029 if not self.op.force: 1030 result.Raise("Could not disable the master ip") 1031 else: 1032 if result.fail_msg: 1033 msg = ("Could not disable the master ip (continuing anyway): %s" % 1034 result.fail_msg) 1035 feedback_fn(msg) 1036 feedback_fn("Changing master_netdev from %s to %s" % 1037 (master_params.netdev, self.op.master_netdev)) 1038 self.cluster.master_netdev = self.op.master_netdev 1039 1040 if self.op.master_netmask: 1041 master_params = self.cfg.GetMasterNetworkParameters() 1042 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask) 1043 result = self.rpc.call_node_change_master_netmask(master_params.name, 1044 master_params.netmask, 1045 self.op.master_netmask, 1046 master_params.ip, 1047 master_params.netdev) 1048 if result.fail_msg: 1049 msg = "Could not change the master IP netmask: %s" % result.fail_msg 1050 feedback_fn(msg) 1051 1052 self.cluster.master_netmask = self.op.master_netmask 1053 1054 self.cfg.Update(self.cluster, feedback_fn) 1055 1056 if self.op.master_netdev: 1057 master_params = self.cfg.GetMasterNetworkParameters() 1058 feedback_fn("Starting the master ip on the new master netdev (%s)" % 1059 self.op.master_netdev) 1060 ems = self.cfg.GetUseExternalMipScript() 1061 result = self.rpc.call_node_activate_master_ip(master_params.name, 1062 master_params, ems) 1063 if result.fail_msg: 1064 self.LogWarning("Could not re-enable the master ip on" 1065 " the master, please restart manually: %s", 1066 result.fail_msg)
1067
1068 1069 -class LUClusterVerify(NoHooksLU):
1070 """Submits all jobs necessary to verify the cluster. 1071 1072 """ 1073 REQ_BGL = False 1074
1075 - def ExpandNames(self):
1076 self.needed_locks = {}
1077
1078 - def Exec(self, feedback_fn):
1079 jobs = [] 1080 1081 if self.op.group_name: 1082 groups = [self.op.group_name] 1083 depends_fn = lambda: None 1084 else: 1085 groups = self.cfg.GetNodeGroupList() 1086 1087 # Verify global configuration 1088 jobs.append([ 1089 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors), 1090 ]) 1091 1092 # Always depend on global verification 1093 depends_fn = lambda: [(-len(jobs), [])] 1094 1095 jobs.extend( 1096 [opcodes.OpClusterVerifyGroup(group_name=group, 1097 ignore_errors=self.op.ignore_errors, 1098 depends=depends_fn())] 1099 for group in groups) 1100 1101 # Fix up all parameters 1102 for op in itertools.chain(*jobs): # pylint: disable=W0142 1103 op.debug_simulate_errors = self.op.debug_simulate_errors 1104 op.verbose = self.op.verbose 1105 op.error_codes = self.op.error_codes 1106 try: 1107 op.skip_checks = self.op.skip_checks 1108 except AttributeError: 1109 assert not isinstance(op, opcodes.OpClusterVerifyGroup) 1110 1111 return ResultWithJobs(jobs)
1112
1113 1114 -class _VerifyErrors(object):
1115 """Mix-in for cluster/group verify LUs. 1116 1117 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects 1118 self.op and self._feedback_fn to be available.) 1119 1120 """ 1121 1122 ETYPE_FIELD = "code" 1123 ETYPE_ERROR = "ERROR" 1124 ETYPE_WARNING = "WARNING" 1125
1126 - def _Error(self, ecode, item, msg, *args, **kwargs):
1127 """Format an error message. 1128 1129 Based on the opcode's error_codes parameter, either format a 1130 parseable error code, or a simpler error string. 1131 1132 This must be called only from Exec and functions called from Exec. 1133 1134 """ 1135 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) 1136 itype, etxt, _ = ecode 1137 # If the error code is in the list of ignored errors, demote the error to a 1138 # warning 1139 if etxt in self.op.ignore_errors: # pylint: disable=E1101 1140 ltype = self.ETYPE_WARNING 1141 # first complete the msg 1142 if args: 1143 msg = msg % args 1144 # then format the whole message 1145 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101 1146 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) 1147 else: 1148 if item: 1149 item = " " + item 1150 else: 1151 item = "" 1152 msg = "%s: %s%s: %s" % (ltype, itype, item, msg) 1153 # and finally report it via the feedback_fn 1154 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101 1155 # do not mark the operation as failed for WARN cases only 1156 if ltype == self.ETYPE_ERROR: 1157 self.bad = True
1158
1159 - def _ErrorIf(self, cond, *args, **kwargs):
1160 """Log an error message if the passed condition is True. 1161 1162 """ 1163 if (bool(cond) 1164 or self.op.debug_simulate_errors): # pylint: disable=E1101 1165 self._Error(*args, **kwargs)
1166
1167 1168 -def _VerifyCertificate(filename):
1169 """Verifies a certificate for L{LUClusterVerifyConfig}. 1170 1171 @type filename: string 1172 @param filename: Path to PEM file 1173 1174 """ 1175 try: 1176 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, 1177 utils.ReadFile(filename)) 1178 except Exception, err: # pylint: disable=W0703 1179 return (LUClusterVerifyConfig.ETYPE_ERROR, 1180 "Failed to load X509 certificate %s: %s" % (filename, err)) 1181 1182 (errcode, msg) = \ 1183 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN, 1184 constants.SSL_CERT_EXPIRATION_ERROR) 1185 1186 if msg: 1187 fnamemsg = "While verifying %s: %s" % (filename, msg) 1188 else: 1189 fnamemsg = None 1190 1191 if errcode is None: 1192 return (None, fnamemsg) 1193 elif errcode == utils.CERT_WARNING: 1194 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg) 1195 elif errcode == utils.CERT_ERROR: 1196 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg) 1197 1198 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1199
1200 1201 -def _GetAllHypervisorParameters(cluster, instances):
1202 """Compute the set of all hypervisor parameters. 1203 1204 @type cluster: L{objects.Cluster} 1205 @param cluster: the cluster object 1206 @param instances: list of L{objects.Instance} 1207 @param instances: additional instances from which to obtain parameters 1208 @rtype: list of (origin, hypervisor, parameters) 1209 @return: a list with all parameters found, indicating the hypervisor they 1210 apply to, and the origin (can be "cluster", "os X", or "instance Y") 1211 1212 """ 1213 hvp_data = [] 1214 1215 for hv_name in cluster.enabled_hypervisors: 1216 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name))) 1217 1218 for os_name, os_hvp in cluster.os_hvp.items(): 1219 for hv_name, hv_params in os_hvp.items(): 1220 if hv_params: 1221 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name) 1222 hvp_data.append(("os %s" % os_name, hv_name, full_params)) 1223 1224 # TODO: collapse identical parameter values in a single one 1225 for instance in instances: 1226 if instance.hvparams: 1227 hvp_data.append(("instance %s" % instance.name, instance.hypervisor, 1228 cluster.FillHV(instance))) 1229 1230 return hvp_data
1231
1232 1233 -class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1234 """Verifies the cluster config. 1235 1236 """ 1237 REQ_BGL = False 1238
1239 - def _VerifyHVP(self, hvp_data):
1240 """Verifies locally the syntax of the hypervisor parameters. 1241 1242 """ 1243 for item, hv_name, hv_params in hvp_data: 1244 msg = ("hypervisor %s parameters syntax check (source %s): %%s" % 1245 (item, hv_name)) 1246 try: 1247 hv_class = hypervisor.GetHypervisorClass(hv_name) 1248 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) 1249 hv_class.CheckParameterSyntax(hv_params) 1250 except errors.GenericError, err: 1251 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1252
1253 - def ExpandNames(self):
1254 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET) 1255 self.share_locks = ShareAll()
1256
1257 - def CheckPrereq(self):
1258 """Check prerequisites. 1259 1260 """ 1261 # Retrieve all information 1262 self.all_group_info = self.cfg.GetAllNodeGroupsInfo() 1263 self.all_node_info = self.cfg.GetAllNodesInfo() 1264 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1265
1266 - def Exec(self, feedback_fn):
1267 """Verify integrity of cluster, performing various test on nodes. 1268 1269 """ 1270 self.bad = False 1271 self._feedback_fn = feedback_fn 1272 1273 feedback_fn("* Verifying cluster config") 1274 1275 for msg in self.cfg.VerifyConfig(): 1276 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg) 1277 1278 feedback_fn("* Verifying cluster certificate files") 1279 1280 for cert_filename in pathutils.ALL_CERT_FILES: 1281 (errcode, msg) = _VerifyCertificate(cert_filename) 1282 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode) 1283 1284 self._ErrorIf(not utils.CanRead(constants.LUXID_USER, 1285 pathutils.NODED_CERT_FILE), 1286 constants.CV_ECLUSTERCERT, 1287 None, 1288 pathutils.NODED_CERT_FILE + " must be accessible by the " + 1289 constants.LUXID_USER + " user") 1290 1291 feedback_fn("* Verifying hypervisor parameters") 1292 1293 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(), 1294 self.all_inst_info.values())) 1295 1296 feedback_fn("* Verifying all nodes belong to an existing group") 1297 1298 # We do this verification here because, should this bogus circumstance 1299 # occur, it would never be caught by VerifyGroup, which only acts on 1300 # nodes/instances reachable from existing node groups. 1301 1302 dangling_nodes = set(node.name for node in self.all_node_info.values() 1303 if node.group not in self.all_group_info) 1304 1305 dangling_instances = {} 1306 no_node_instances = [] 1307 1308 for inst in self.all_inst_info.values(): 1309 if inst.primary_node in dangling_nodes: 1310 dangling_instances.setdefault(inst.primary_node, []).append(inst.name) 1311 elif inst.primary_node not in self.all_node_info: 1312 no_node_instances.append(inst.name) 1313 1314 pretty_dangling = [ 1315 "%s (%s)" % 1316 (node.name, 1317 utils.CommaJoin(dangling_instances.get(node.name, 1318 ["no instances"]))) 1319 for node in dangling_nodes] 1320 1321 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES, 1322 None, 1323 "the following nodes (and their instances) belong to a non" 1324 " existing group: %s", utils.CommaJoin(pretty_dangling)) 1325 1326 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST, 1327 None, 1328 "the following instances have a non-existing primary-node:" 1329 " %s", utils.CommaJoin(no_node_instances)) 1330 1331 return not self.bad
1332
1333 1334 -class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1335 """Verifies the status of a node group. 1336 1337 """ 1338 HPATH = "cluster-verify" 1339 HTYPE = constants.HTYPE_CLUSTER 1340 REQ_BGL = False 1341 1342 _HOOKS_INDENT_RE = re.compile("^", re.M) 1343
1344 - class NodeImage(object):
1345 """A class representing the logical and physical status of a node. 1346 1347 @type name: string 1348 @ivar name: the node name to which this object refers 1349 @ivar volumes: a structure as returned from 1350 L{ganeti.backend.GetVolumeList} (runtime) 1351 @ivar instances: a list of running instances (runtime) 1352 @ivar pinst: list of configured primary instances (config) 1353 @ivar sinst: list of configured secondary instances (config) 1354 @ivar sbp: dictionary of {primary-node: list of instances} for all 1355 instances for which this node is secondary (config) 1356 @ivar mfree: free memory, as reported by hypervisor (runtime) 1357 @ivar dfree: free disk, as reported by the node (runtime) 1358 @ivar offline: the offline status (config) 1359 @type rpc_fail: boolean 1360 @ivar rpc_fail: whether the RPC verify call was successfull (overall, 1361 not whether the individual keys were correct) (runtime) 1362 @type lvm_fail: boolean 1363 @ivar lvm_fail: whether the RPC call didn't return valid LVM data 1364 @type hyp_fail: boolean 1365 @ivar hyp_fail: whether the RPC call didn't return the instance list 1366 @type ghost: boolean 1367 @ivar ghost: whether this is a known node or not (config) 1368 @type os_fail: boolean 1369 @ivar os_fail: whether the RPC call didn't return valid OS data 1370 @type oslist: list 1371 @ivar oslist: list of OSes as diagnosed by DiagnoseOS 1372 @type vm_capable: boolean 1373 @ivar vm_capable: whether the node can host instances 1374 @type pv_min: float 1375 @ivar pv_min: size in MiB of the smallest PVs 1376 @type pv_max: float 1377 @ivar pv_max: size in MiB of the biggest PVs 1378 1379 """
1380 - def __init__(self, offline=False, name=None, vm_capable=True):
1381 self.name = name 1382 self.volumes = {} 1383 self.instances = [] 1384 self.pinst = [] 1385 self.sinst = [] 1386 self.sbp = {} 1387 self.mfree = 0 1388 self.dfree = 0 1389 self.offline = offline 1390 self.vm_capable = vm_capable 1391 self.rpc_fail = False 1392 self.lvm_fail = False 1393 self.hyp_fail = False 1394 self.ghost = False 1395 self.os_fail = False 1396 self.oslist = {} 1397 self.pv_min = None 1398 self.pv_max = None
1399
1400 - def ExpandNames(self):
1401 # This raises errors.OpPrereqError on its own: 1402 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 1403 1404 # Get instances in node group; this is unsafe and needs verification later 1405 inst_names = \ 1406 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True) 1407 1408 self.needed_locks = { 1409 locking.LEVEL_INSTANCE: inst_names, 1410 locking.LEVEL_NODEGROUP: [self.group_uuid], 1411 locking.LEVEL_NODE: [], 1412 1413 # This opcode is run by watcher every five minutes and acquires all nodes 1414 # for a group. It doesn't run for a long time, so it's better to acquire 1415 # the node allocation lock as well. 1416 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 1417 } 1418 1419 self.share_locks = ShareAll()
1420
1421 - def DeclareLocks(self, level):
1422 if level == locking.LEVEL_NODE: 1423 # Get members of node group; this is unsafe and needs verification later 1424 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members) 1425 1426 all_inst_info = self.cfg.GetAllInstancesInfo() 1427 1428 # In Exec(), we warn about mirrored instances that have primary and 1429 # secondary living in separate node groups. To fully verify that 1430 # volumes for these instances are healthy, we will need to do an 1431 # extra call to their secondaries. We ensure here those nodes will 1432 # be locked. 1433 for inst in self.owned_locks(locking.LEVEL_INSTANCE): 1434 # Important: access only the instances whose lock is owned 1435 if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR: 1436 nodes.update(all_inst_info[inst].secondary_nodes) 1437 1438 self.needed_locks[locking.LEVEL_NODE] = nodes
1439
1440 - def CheckPrereq(self):
1441 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 1442 self.group_info = self.cfg.GetNodeGroup(self.group_uuid) 1443 1444 group_nodes = set(self.group_info.members) 1445 group_instances = \ 1446 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True) 1447 1448 unlocked_nodes = \ 1449 group_nodes.difference(self.owned_locks(locking.LEVEL_NODE)) 1450 1451 unlocked_instances = \ 1452 group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE)) 1453 1454 if unlocked_nodes: 1455 raise errors.OpPrereqError("Missing lock for nodes: %s" % 1456 utils.CommaJoin(unlocked_nodes), 1457 errors.ECODE_STATE) 1458 1459 if unlocked_instances: 1460 raise errors.OpPrereqError("Missing lock for instances: %s" % 1461 utils.CommaJoin(unlocked_instances), 1462 errors.ECODE_STATE) 1463 1464 self.all_node_info = self.cfg.GetAllNodesInfo() 1465 self.all_inst_info = self.cfg.GetAllInstancesInfo() 1466 1467 self.my_node_names = utils.NiceSort(group_nodes) 1468 self.my_inst_names = utils.NiceSort(group_instances) 1469 1470 self.my_node_info = dict((name, self.all_node_info[name]) 1471 for name in self.my_node_names) 1472 1473 self.my_inst_info = dict((name, self.all_inst_info[name]) 1474 for name in self.my_inst_names) 1475 1476 # We detect here the nodes that will need the extra RPC calls for verifying 1477 # split LV volumes; they should be locked. 1478 extra_lv_nodes = set() 1479 1480 for inst in self.my_inst_info.values(): 1481 if inst.disk_template in constants.DTS_INT_MIRROR: 1482 for nname in inst.all_nodes: 1483 if self.all_node_info[nname].group != self.group_uuid: 1484 extra_lv_nodes.add(nname) 1485 1486 unlocked_lv_nodes = \ 1487 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE)) 1488 1489 if unlocked_lv_nodes: 1490 raise errors.OpPrereqError("Missing node locks for LV check: %s" % 1491 utils.CommaJoin(unlocked_lv_nodes), 1492 errors.ECODE_STATE) 1493 self.extra_lv_nodes = list(extra_lv_nodes)
1494
1495 - def _VerifyNode(self, ninfo, nresult):
1496 """Perform some basic validation on data returned from a node. 1497 1498 - check the result data structure is well formed and has all the 1499 mandatory fields 1500 - check ganeti version 1501 1502 @type ninfo: L{objects.Node} 1503 @param ninfo: the node to check 1504 @param nresult: the results from the node 1505 @rtype: boolean 1506 @return: whether overall this call was successful (and we can expect 1507 reasonable values in the respose) 1508 1509 """ 1510 node = ninfo.name 1511 _ErrorIf = self._ErrorIf # pylint: disable=C0103 1512 1513 # main result, nresult should be a non-empty dict 1514 test = not nresult or not isinstance(nresult, dict) 1515 _ErrorIf(test, constants.CV_ENODERPC, node, 1516 "unable to verify node: no data returned") 1517 if test: 1518 return False 1519 1520 # compares ganeti version 1521 local_version = constants.PROTOCOL_VERSION 1522 remote_version = nresult.get("version", None) 1523 test = not (remote_version and 1524 isinstance(remote_version, (list, tuple)) and 1525 len(remote_version) == 2) 1526 _ErrorIf(test, constants.CV_ENODERPC, node, 1527 "connection to node returned invalid data") 1528 if test: 1529 return False 1530 1531 test = local_version != remote_version[0] 1532 _ErrorIf(test, constants.CV_ENODEVERSION, node, 1533 "incompatible protocol versions: master %s," 1534 " node %s", local_version, remote_version[0]) 1535 if test: 1536 return False 1537 1538 # node seems compatible, we can actually try to look into its results 1539 1540 # full package version 1541 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1], 1542 constants.CV_ENODEVERSION, node, 1543 "software version mismatch: master %s, node %s", 1544 constants.RELEASE_VERSION, remote_version[1], 1545 code=self.ETYPE_WARNING) 1546 1547 hyp_result = nresult.get(constants.NV_HYPERVISOR, None) 1548 if ninfo.vm_capable and isinstance(hyp_result, dict): 1549 for hv_name, hv_result in hyp_result.iteritems(): 1550 test = hv_result is not None 1551 _ErrorIf(test, constants.CV_ENODEHV, node, 1552 "hypervisor %s verify failure: '%s'", hv_name, hv_result) 1553 1554 hvp_result = nresult.get(constants.NV_HVPARAMS, None) 1555 if ninfo.vm_capable and isinstance(hvp_result, list): 1556 for item, hv_name, hv_result in hvp_result: 1557 _ErrorIf(True, constants.CV_ENODEHV, node, 1558 "hypervisor %s parameter verify failure (source %s): %s", 1559 hv_name, item, hv_result) 1560 1561 test = nresult.get(constants.NV_NODESETUP, 1562 ["Missing NODESETUP results"]) 1563 _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s", 1564 "; ".join(test)) 1565 1566 return True
1567
1568 - def _VerifyNodeTime(self, ninfo, nresult, 1569 nvinfo_starttime, nvinfo_endtime):
1570 """Check the node time. 1571 1572 @type ninfo: L{objects.Node} 1573 @param ninfo: the node to check 1574 @param nresult: the remote results for the node 1575 @param nvinfo_starttime: the start time of the RPC call 1576 @param nvinfo_endtime: the end time of the RPC call 1577 1578 """ 1579 node = ninfo.name 1580 _ErrorIf = self._ErrorIf # pylint: disable=C0103 1581 1582 ntime = nresult.get(constants.NV_TIME, None) 1583 try: 1584 ntime_merged = utils.MergeTime(ntime) 1585 except (ValueError, TypeError): 1586 _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time") 1587 return 1588 1589 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW): 1590 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged) 1591 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW): 1592 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime) 1593 else: 1594 ntime_diff = None 1595 1596 _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node, 1597 "Node time diverges by at least %s from master node time", 1598 ntime_diff)
1599
1600 - def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1601 """Check the node LVM results and update info for cross-node checks. 1602 1603 @type ninfo: L{objects.Node} 1604 @param ninfo: the node to check 1605 @param nresult: the remote results for the node 1606 @param vg_name: the configured VG name 1607 @type nimg: L{NodeImage} 1608 @param nimg: node image 1609 1610 """ 1611 if vg_name is None: 1612 return 1613 1614 node = ninfo.name 1615 _ErrorIf = self._ErrorIf # pylint: disable=C0103 1616 1617 # checks vg existence and size > 20G 1618 vglist = nresult.get(constants.NV_VGLIST, None) 1619 test = not vglist 1620 _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups") 1621 if not test: 1622 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, 1623 constants.MIN_VG_SIZE) 1624 _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus) 1625 1626 # Check PVs 1627 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage) 1628 for em in errmsgs: 1629 self._Error(constants.CV_ENODELVM, node, em) 1630 if pvminmax is not None: 1631 (nimg.pv_min, nimg.pv_max) = pvminmax
1632
1633 - def _VerifyGroupLVM(self, node_image, vg_name):
1634 """Check cross-node consistency in LVM. 1635 1636 @type node_image: dict 1637 @param node_image: info about nodes, mapping from node to names to 1638 L{NodeImage} objects 1639 @param vg_name: the configured VG name 1640 1641 """ 1642 if vg_name is None: 1643 return 1644 1645 # Only exlcusive storage needs this kind of checks 1646 if not self._exclusive_storage: 1647 return 1648 1649 # exclusive_storage wants all PVs to have the same size (approximately), 1650 # if the smallest and the biggest ones are okay, everything is fine. 1651 # pv_min is None iff pv_max is None 1652 vals = filter((lambda ni: ni.pv_min is not None), node_image.values()) 1653 if not vals: 1654 return 1655 (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals) 1656 (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals) 1657 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax) 1658 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name, 1659 "PV sizes differ too much in the group; smallest (%s MB) is" 1660 " on %s, biggest (%s MB) is on %s", 1661 pvmin, minnode, pvmax, maxnode)
1662
1663 - def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1664 """Check the node bridges. 1665 1666 @type ninfo: L{objects.Node} 1667 @param ninfo: the node to check 1668 @param nresult: the remote results for the node 1669 @param bridges: the expected list of bridges 1670 1671 """ 1672 if not bridges: 1673 return 1674 1675 node = ninfo.name 1676 _ErrorIf = self._ErrorIf # pylint: disable=C0103 1677 1678 missing = nresult.get(constants.NV_BRIDGES, None) 1679 test = not isinstance(missing, list) 1680 _ErrorIf(test, constants.CV_ENODENET, node, 1681 "did not return valid bridge information") 1682 if not test: 1683 _ErrorIf(bool(missing), constants.CV_ENODENET, node, 1684 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1685
1686 - def _VerifyNodeUserScripts(self, ninfo, nresult):
1687 """Check the results of user scripts presence and executability on the node 1688 1689 @type ninfo: L{objects.Node} 1690 @param ninfo: the node to check 1691 @param nresult: the remote results for the node 1692 1693 """ 1694 node = ninfo.name 1695 1696 test = not constants.NV_USERSCRIPTS in nresult 1697 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node, 1698 "did not return user scripts information") 1699 1700 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None) 1701 if not test: 1702 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node, 1703 "user scripts not present or not executable: %s" % 1704 utils.CommaJoin(sorted(broken_scripts)))
1705
1706 - def _VerifyNodeNetwork(self, ninfo, nresult):
1707 """Check the node network connectivity results. 1708 1709 @type ninfo: L{objects.Node} 1710 @param ninfo: the node to check 1711 @param nresult: the remote results for the node 1712 1713 """ 1714 node = ninfo.name 1715 _ErrorIf = self._ErrorIf # pylint: disable=C0103 1716 1717 test = constants.NV_NODELIST not in nresult 1718 _ErrorIf(test, constants.CV_ENODESSH, node, 1719 "node hasn't returned node ssh connectivity data") 1720 if not test: 1721 if nresult[constants.NV_NODELIST]: 1722 for a_node, a_msg in nresult[constants.NV_NODELIST].items(): 1723 _ErrorIf(True, constants.CV_ENODESSH, node, 1724 "ssh communication with node '%s': %s", a_node, a_msg) 1725 1726 test = constants.NV_NODENETTEST not in nresult 1727 _ErrorIf(test, constants.CV_ENODENET, node, 1728 "node hasn't returned node tcp connectivity data") 1729 if not test: 1730 if nresult[constants.NV_NODENETTEST]: 1731 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys()) 1732 for anode in nlist: 1733 _ErrorIf(True, constants.CV_ENODENET, node, 1734 "tcp communication with node '%s': %s", 1735 anode, nresult[constants.NV_NODENETTEST][anode]) 1736 1737 test = constants.NV_MASTERIP not in nresult 1738 _ErrorIf(test, constants.CV_ENODENET, node, 1739 "node hasn't returned node master IP reachability data") 1740 if not test: 1741 if not nresult[constants.NV_MASTERIP]: 1742 if node == self.master_node: 1743 msg = "the master node cannot reach the master IP (not configured?)" 1744 else: 1745 msg = "cannot reach the master IP" 1746 _ErrorIf(True, constants.CV_ENODENET, node, msg)
1747
1748 - def _VerifyInstance(self, instance, inst_config, node_image, 1749 diskstatus):
1750 """Verify an instance. 1751 1752 This function checks to see if the required block devices are 1753 available on the instance's node, and that the nodes are in the correct 1754 state. 1755 1756 """ 1757 _ErrorIf = self._ErrorIf # pylint: disable=C0103 1758 pnode = inst_config.primary_node 1759 pnode_img = node_image[pnode] 1760 groupinfo = self.cfg.GetAllNodeGroupsInfo() 1761 1762 node_vol_should = {} 1763 inst_config.MapLVsByNode(node_vol_should) 1764 1765 cluster = self.cfg.GetClusterInfo() 1766 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 1767 self.group_info) 1768 err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg) 1769 _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err), 1770 code=self.ETYPE_WARNING) 1771 1772 for node in node_vol_should: 1773 n_img = node_image[node] 1774 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: 1775 # ignore missing volumes on offline or broken nodes 1776 continue 1777 for volume in node_vol_should[node]: 1778 test = volume not in n_img.volumes 1779 _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance, 1780 "volume %s missing on node %s", volume, node) 1781 1782 if inst_config.admin_state == constants.ADMINST_UP: 1783 test = instance not in pnode_img.instances and not pnode_img.offline 1784 _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance, 1785 "instance not running on its primary node %s", 1786 pnode) 1787 _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance, 1788 "instance is marked as running and lives on offline node %s", 1789 pnode) 1790 1791 diskdata = [(nname, success, status, idx) 1792 for (nname, disks) in diskstatus.items() 1793 for idx, (success, status) in enumerate(disks)] 1794 1795 for nname, success, bdev_status, idx in diskdata: 1796 # the 'ghost node' construction in Exec() ensures that we have a 1797 # node here 1798 snode = node_image[nname] 1799 bad_snode = snode.ghost or snode.offline 1800 _ErrorIf(inst_config.disks_active and 1801 not success and not bad_snode, 1802 constants.CV_EINSTANCEFAULTYDISK, instance, 1803 "couldn't retrieve status for disk/%s on %s: %s", 1804 idx, nname, bdev_status) 1805 _ErrorIf((inst_config.disks_active and 1806 success and bdev_status.ldisk_status == constants.LDS_FAULTY), 1807 constants.CV_EINSTANCEFAULTYDISK, instance, 1808 "disk/%s on %s is faulty", idx, nname) 1809 1810 _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline, 1811 constants.CV_ENODERPC, pnode, "instance %s, connection to" 1812 " primary node failed", instance) 1813 1814 _ErrorIf(len(inst_config.secondary_nodes) > 1, 1815 constants.CV_EINSTANCELAYOUT, 1816 instance, "instance has multiple secondary nodes: %s", 1817 utils.CommaJoin(inst_config.secondary_nodes), 1818 code=self.ETYPE_WARNING) 1819 1820 if inst_config.disk_template not in constants.DTS_EXCL_STORAGE: 1821 # Disk template not compatible with exclusive_storage: no instance 1822 # node should have the flag set 1823 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, 1824 inst_config.all_nodes) 1825 es_nodes = [n for (n, es) in es_flags.items() 1826 if es] 1827 _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance, 1828 "instance has template %s, which is not supported on nodes" 1829 " that have exclusive storage set: %s", 1830 inst_config.disk_template, utils.CommaJoin(es_nodes)) 1831 1832 if inst_config.disk_template in constants.DTS_INT_MIRROR: 1833 instance_nodes = utils.NiceSort(inst_config.all_nodes) 1834 instance_groups = {} 1835 1836 for node in instance_nodes: 1837 instance_groups.setdefault(self.all_node_info[node].group, 1838 []).append(node) 1839 1840 pretty_list = [ 1841 "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name) 1842 # Sort so that we always list the primary node first. 1843 for group, nodes in sorted(instance_groups.items(), 1844 key=lambda (_, nodes): pnode in nodes, 1845 reverse=True)] 1846 1847 self._ErrorIf(len(instance_groups) > 1, 1848 constants.CV_EINSTANCESPLITGROUPS, 1849 instance, "instance has primary and secondary nodes in" 1850 " different groups: %s", utils.CommaJoin(pretty_list), 1851 code=self.ETYPE_WARNING) 1852 1853 inst_nodes_offline = [] 1854 for snode in inst_config.secondary_nodes: 1855 s_img = node_image[snode] 1856 _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC, 1857 snode, "instance %s, connection to secondary node failed", 1858 instance) 1859 1860 if s_img.offline: 1861 inst_nodes_offline.append(snode) 1862 1863 # warn that the instance lives on offline nodes 1864 _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance, 1865 "instance has offline secondary node(s) %s", 1866 utils.CommaJoin(inst_nodes_offline)) 1867 # ... or ghost/non-vm_capable nodes 1868 for node in inst_config.all_nodes: 1869 _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE, 1870 instance, "instance lives on ghost node %s", node) 1871 _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE, 1872 instance, "instance lives on non-vm_capable node %s", node)
1873
1874 - def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1875 """Verify if there are any unknown volumes in the cluster. 1876 1877 The .os, .swap and backup volumes are ignored. All other volumes are 1878 reported as unknown. 1879 1880 @type reserved: L{ganeti.utils.FieldSet} 1881 @param reserved: a FieldSet of reserved volume names 1882 1883 """ 1884 for node, n_img in node_image.items(): 1885 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or 1886 self.all_node_info[node].group != self.group_uuid): 1887 # skip non-healthy nodes 1888 continue 1889 for volume in n_img.volumes: 1890 test = ((node not in node_vol_should or 1891 volume not in node_vol_should[node]) and 1892 not reserved.Matches(volume)) 1893 self._ErrorIf(test, constants.CV_ENODEORPHANLV, node, 1894 "volume %s is unknown", volume)
1895
1896 - def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1897 """Verify N+1 Memory Resilience. 1898 1899 Check that if one single node dies we can still start all the 1900 instances it was primary for. 1901 1902 """ 1903 cluster_info = self.cfg.GetClusterInfo() 1904 for node, n_img in node_image.items(): 1905 # This code checks that every node which is now listed as 1906 # secondary has enough memory to host all instances it is 1907 # supposed to should a single other node in the cluster fail. 1908 # FIXME: not ready for failover to an arbitrary node 1909 # FIXME: does not support file-backed instances 1910 # WARNING: we currently take into account down instances as well 1911 # as up ones, considering that even if they're down someone 1912 # might want to start them even in the event of a node failure. 1913 if n_img.offline or self.all_node_info[node].group != self.group_uuid: 1914 # we're skipping nodes marked offline and nodes in other groups from 1915 # the N+1 warning, since most likely we don't have good memory 1916 # infromation from them; we already list instances living on such 1917 # nodes, and that's enough warning 1918 continue 1919 #TODO(dynmem): also consider ballooning out other instances 1920 for prinode, instances in n_img.sbp.items(): 1921 needed_mem = 0 1922 for instance in instances: 1923 bep = cluster_info.FillBE(instance_cfg[instance]) 1924 if bep[constants.BE_AUTO_BALANCE]: 1925 needed_mem += bep[constants.BE_MINMEM] 1926 test = n_img.mfree < needed_mem 1927 self._ErrorIf(test, constants.CV_ENODEN1, node, 1928 "not enough memory to accomodate instance failovers" 1929 " should node %s fail (%dMiB needed, %dMiB available)", 1930 prinode, needed_mem, n_img.mfree)
1931 1932 @classmethod
1933 - def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo, 1934 (files_all, files_opt, files_mc, files_vm)):
1935 """Verifies file checksums collected from all nodes. 1936 1937 @param errorif: Callback for reporting errors 1938 @param nodeinfo: List of L{objects.Node} objects 1939 @param master_node: Name of master node 1940 @param all_nvinfo: RPC results 1941 1942 """ 1943 # Define functions determining which nodes to consider for a file 1944 files2nodefn = [ 1945 (files_all, None), 1946 (files_mc, lambda node: (node.master_candidate or 1947 node.name == master_node)), 1948 (files_vm, lambda node: node.vm_capable), 1949 ] 1950 1951 # Build mapping from filename to list of nodes which should have the file 1952 nodefiles = {} 1953 for (files, fn) in files2nodefn: 1954 if fn is None: 1955 filenodes = nodeinfo 1956 else: 1957 filenodes = filter(fn, nodeinfo) 1958 nodefiles.update((filename, 1959 frozenset(map(operator.attrgetter("name"), filenodes))) 1960 for filename in files) 1961 1962 assert set(nodefiles) == (files_all | files_mc | files_vm) 1963 1964 fileinfo = dict((filename, {}) for filename in nodefiles) 1965 ignore_nodes = set() 1966 1967 for node in nodeinfo: 1968 if node.offline: 1969 ignore_nodes.add(node.name) 1970 continue 1971 1972 nresult = all_nvinfo[node.name] 1973 1974 if nresult.fail_msg or not nresult.payload: 1975 node_files = None 1976 else: 1977 fingerprints = nresult.payload.get(constants.NV_FILELIST, None) 1978 node_files = dict((vcluster.LocalizeVirtualPath(key), value) 1979 for (key, value) in fingerprints.items()) 1980 del fingerprints 1981 1982 test = not (node_files and isinstance(node_files, dict)) 1983 errorif(test, constants.CV_ENODEFILECHECK, node.name, 1984 "Node did not return file checksum data") 1985 if test: 1986 ignore_nodes.add(node.name) 1987 continue 1988 1989 # Build per-checksum mapping from filename to nodes having it 1990 for (filename, checksum) in node_files.items(): 1991 assert filename in nodefiles 1992 fileinfo[filename].setdefault(checksum, set()).add(node.name) 1993 1994 for (filename, checksums) in fileinfo.items(): 1995 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum" 1996 1997 # Nodes having the file 1998 with_file = frozenset(node_name 1999 for nodes in fileinfo[filename].values() 2000 for node_name in nodes) - ignore_nodes 2001 2002 expected_nodes = nodefiles[filename] - ignore_nodes 2003 2004 # Nodes missing file 2005 missing_file = expected_nodes - with_file 2006 2007 if filename in files_opt: 2008 # All or no nodes 2009 errorif(missing_file and missing_file != expected_nodes, 2010 constants.CV_ECLUSTERFILECHECK, None, 2011 "File %s is optional, but it must exist on all or no" 2012 " nodes (not found on %s)", 2013 filename, utils.CommaJoin(utils.NiceSort(missing_file))) 2014 else: 2015 errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None, 2016 "File %s is missing from node(s) %s", filename, 2017 utils.CommaJoin(utils.NiceSort(missing_file))) 2018 2019 # Warn if a node has a file it shouldn't 2020 unexpected = with_file - expected_nodes 2021 errorif(unexpected, 2022 constants.CV_ECLUSTERFILECHECK, None, 2023 "File %s should not exist on node(s) %s", 2024 filename, utils.CommaJoin(utils.NiceSort(unexpected))) 2025 2026 # See if there are multiple versions of the file 2027 test = len(checksums) > 1 2028 if test: 2029 variants = ["variant %s on %s" % 2030 (idx + 1, utils.CommaJoin(utils.NiceSort(nodes))) 2031 for (idx, (checksum, nodes)) in 2032 enumerate(sorted(checksums.items()))] 2033 else: 2034 variants = [] 2035 2036 errorif(test, constants.CV_ECLUSTERFILECHECK, None, 2037 "File %s found with %s different checksums (%s)", 2038 filename, len(checksums), "; ".join(variants))
2039
2040 - def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper, 2041 drbd_map):
2042 """Verifies and the node DRBD status. 2043 2044 @type ninfo: L{objects.Node} 2045 @param ninfo: the node to check 2046 @param nresult: the remote results for the node 2047 @param instanceinfo: the dict of instances 2048 @param drbd_helper: the configured DRBD usermode helper 2049 @param drbd_map: the DRBD map as returned by 2050 L{ganeti.config.ConfigWriter.ComputeDRBDMap} 2051 2052 """ 2053 node = ninfo.name 2054 _ErrorIf = self._ErrorIf # pylint: disable=C0103 2055 2056 if drbd_helper: 2057 helper_result = nresult.get(constants.NV_DRBDHELPER, None) 2058 test = (helper_result is None) 2059 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node, 2060 "no drbd usermode helper returned") 2061 if helper_result: 2062 status, payload = helper_result 2063 test = not status 2064 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node, 2065 "drbd usermode helper check unsuccessful: %s", payload) 2066 test = status and (payload != drbd_helper) 2067 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node, 2068 "wrong drbd usermode helper: %s", payload) 2069 2070 # compute the DRBD minors 2071 node_drbd = {} 2072 for minor, instance in drbd_map[node].items(): 2073 test = instance not in instanceinfo 2074 _ErrorIf(test, constants.CV_ECLUSTERCFG, None, 2075 "ghost instance '%s' in temporary DRBD map", instance) 2076 # ghost instance should not be running, but otherwise we 2077 # don't give double warnings (both ghost instance and 2078 # unallocated minor in use) 2079 if test: 2080 node_drbd[minor] = (instance, False) 2081 else: 2082 instance = instanceinfo[instance] 2083 node_drbd[minor] = (instance.name, instance.disks_active) 2084 2085 # and now check them 2086 used_minors = nresult.get(constants.NV_DRBDLIST, []) 2087 test = not isinstance(used_minors, (tuple, list)) 2088 _ErrorIf(test, constants.CV_ENODEDRBD, node, 2089 "cannot parse drbd status file: %s", str(used_minors)) 2090 if test: 2091 # we cannot check drbd status 2092 return 2093 2094 for minor, (iname, must_exist) in node_drbd.items(): 2095 test = minor not in used_minors and must_exist 2096 _ErrorIf(test, constants.CV_ENODEDRBD, node, 2097 "drbd minor %d of instance %s is not active", minor, iname) 2098 for minor in used_minors: 2099 test = minor not in node_drbd 2100 _ErrorIf(test, constants.CV_ENODEDRBD, node, 2101 "unallocated drbd minor %d is in use", minor)
2102
2103 - def _UpdateNodeOS(self, ninfo, nresult, nimg):
2104 """Builds the node OS structures. 2105 2106 @type ninfo: L{objects.Node} 2107 @param ninfo: the node to check 2108 @param nresult: the remote results for the node 2109 @param nimg: the node image object 2110 2111 """ 2112 node = ninfo.name 2113 _ErrorIf = self._ErrorIf # pylint: disable=C0103 2114 2115 remote_os = nresult.get(constants.NV_OSLIST, None) 2116 test = (not isinstance(remote_os, list) or 2117 not compat.all(isinstance(v, list) and len(v) == 7 2118 for v in remote_os)) 2119 2120 _ErrorIf(test, constants.CV_ENODEOS, node, 2121 "node hasn't returned valid OS data") 2122 2123 nimg.os_fail = test 2124 2125 if test: 2126 return 2127 2128 os_dict = {} 2129 2130 for (name, os_path, status, diagnose, 2131 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]: 2132 2133 if name not in os_dict: 2134 os_dict[name] = [] 2135 2136 # parameters is a list of lists instead of list of tuples due to 2137 # JSON lacking a real tuple type, fix it: 2138 parameters = [tuple(v) for v in parameters] 2139 os_dict[name].append((os_path, status, diagnose, 2140 set(variants), set(parameters), set(api_ver))) 2141 2142 nimg.oslist = os_dict
2143
2144 - def _VerifyNodeOS(self, ninfo, nimg, base):
2145 """Verifies the node OS list. 2146 2147 @type ninfo: L{objects.Node} 2148 @param ninfo: the node to check 2149 @param nimg: the node image object 2150 @param base: the 'template' node we match against (e.g. from the master) 2151 2152 """ 2153 node = ninfo.name 2154 _ErrorIf = self._ErrorIf # pylint: disable=C0103 2155 2156 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?" 2157 2158 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l] 2159 for os_name, os_data in nimg.oslist.items(): 2160 assert os_data, "Empty OS status for OS %s?!" % os_name 2161 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0] 2162 _ErrorIf(not f_status, constants.CV_ENODEOS, node, 2163 "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag) 2164 _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node, 2165 "OS '%s' has multiple entries (first one shadows the rest): %s", 2166 os_name, utils.CommaJoin([v[0] for v in os_data])) 2167 # comparisons with the 'base' image 2168 test = os_name not in base.oslist 2169 _ErrorIf(test, constants.CV_ENODEOS, node, 2170 "Extra OS %s not present on reference node (%s)", 2171 os_name, base.name) 2172 if test: 2173 continue 2174 assert base.oslist[os_name], "Base node has empty OS status?" 2175 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0] 2176 if not b_status: 2177 # base OS is invalid, skipping 2178 continue 2179 for kind, a, b in [("API version", f_api, b_api), 2180 ("variants list", f_var, b_var), 2181 ("parameters", beautify_params(f_param), 2182 beautify_params(b_param))]: 2183 _ErrorIf(a != b, constants.CV_ENODEOS, node, 2184 "OS %s for %s differs from reference node %s: [%s] vs. [%s]", 2185 kind, os_name, base.name, 2186 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b))) 2187 2188 # check any missing OSes 2189 missing = set(base.oslist.keys()).difference(nimg.oslist.keys()) 2190 _ErrorIf(missing, constants.CV_ENODEOS, node, 2191 "OSes present on reference node %s but missing on this node: %s", 2192 base.name, utils.CommaJoin(missing))
2193
2194 - def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2195 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}. 2196 2197 @type ninfo: L{objects.Node} 2198 @param ninfo: the node to check 2199 @param nresult: the remote results for the node 2200 @type is_master: bool 2201 @param is_master: Whether node is the master node 2202 2203 """ 2204 node = ninfo.name 2205 2206 if (is_master and 2207 (constants.ENABLE_FILE_STORAGE or 2208 constants.ENABLE_SHARED_FILE_STORAGE)): 2209 try: 2210 fspaths = nresult[constants.NV_FILE_STORAGE_PATHS] 2211 except KeyError: 2212 # This should never happen 2213 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node, 2214 "Node did not return forbidden file storage paths") 2215 else: 2216 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node, 2217 "Found forbidden file storage paths: %s", 2218 utils.CommaJoin(fspaths)) 2219 else: 2220 self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult, 2221 constants.CV_ENODEFILESTORAGEPATHS, node, 2222 "Node should not have returned forbidden file storage" 2223 " paths")
2224
2225 - def _VerifyOob(self, ninfo, nresult):
2226 """Verifies out of band functionality of a node. 2227 2228 @type ninfo: L{objects.Node} 2229 @param ninfo: the node to check 2230 @param nresult: the remote results for the node 2231 2232 """ 2233 node = ninfo.name 2234 # We just have to verify the paths on master and/or master candidates 2235 # as the oob helper is invoked on the master 2236 if ((ninfo.master_candidate or ninfo.master_capable) and 2237 constants.NV_OOB_PATHS in nresult): 2238 for path_result in nresult[constants.NV_OOB_PATHS]: 2239 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2240
2241 - def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2242 """Verifies and updates the node volume data. 2243 2244 This function will update a L{NodeImage}'s internal structures 2245 with data from the remote call. 2246 2247 @type ninfo: L{objects.Node} 2248 @param ninfo: the node to check 2249 @param nresult: the remote results for the node 2250 @param nimg: the node image object 2251 @param vg_name: the configured VG name 2252 2253 """ 2254 node = ninfo.name 2255 _ErrorIf = self._ErrorIf # pylint: disable=C0103 2256 2257 nimg.lvm_fail = True 2258 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") 2259 if vg_name is None: 2260 pass 2261 elif isinstance(lvdata, basestring): 2262 _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s", 2263 utils.SafeEncode(lvdata)) 2264 elif not isinstance(lvdata, dict): 2265 _ErrorIf(True, constants.CV_ENODELVM, node, 2266 "rpc call to node failed (lvlist)") 2267 else: 2268 nimg.volumes = lvdata 2269 nimg.lvm_fail = False
2270
2271 - def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2272 """Verifies and updates the node instance list. 2273 2274 If the listing was successful, then updates this node's instance 2275 list. Otherwise, it marks the RPC call as failed for the instance 2276 list key. 2277 2278 @type ninfo: L{objects.Node} 2279 @param ninfo: the node to check 2280 @param nresult: the remote results for the node 2281 @param nimg: the node image object 2282 2283 """ 2284 idata = nresult.get(constants.NV_INSTANCELIST, None) 2285 test = not isinstance(idata, list) 2286 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name, 2287 "rpc call to node failed (instancelist): %s", 2288 utils.SafeEncode(str(idata))) 2289 if test: 2290 nimg.hyp_fail = True 2291 else: 2292 nimg.instances = idata
2293
2294 - def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2295 """Verifies and computes a node information map 2296 2297 @type ninfo: L{objects.Node} 2298 @param ninfo: the node to check 2299 @param nresult: the remote results for the node 2300 @param nimg: the node image object 2301 @param vg_name: the configured VG name 2302 2303 """ 2304 node = ninfo.name 2305 _ErrorIf = self._ErrorIf # pylint: disable=C0103 2306 2307 # try to read free memory (from the hypervisor) 2308 hv_info = nresult.get(constants.NV_HVINFO, None) 2309 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info 2310 _ErrorIf(test, constants.CV_ENODEHV, node, 2311 "rpc call to node failed (hvinfo)") 2312 if not test: 2313 try: 2314 nimg.mfree = int(hv_info["memory_free"]) 2315 except (ValueError, TypeError): 2316 _ErrorIf(True, constants.CV_ENODERPC, node, 2317 "node returned invalid nodeinfo, check hypervisor") 2318 2319 # FIXME: devise a free space model for file based instances as well 2320 if vg_name is not None: 2321 test = (constants.NV_VGLIST not in nresult or 2322 vg_name not in nresult[constants.NV_VGLIST]) 2323 _ErrorIf(test, constants.CV_ENODELVM, node, 2324 "node didn't return data for the volume group '%s'" 2325 " - it is either missing or broken", vg_name) 2326 if not test: 2327 try: 2328 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name]) 2329 except (ValueError, TypeError): 2330 _ErrorIf(True, constants.CV_ENODERPC, node, 2331 "node returned invalid LVM info, check LVM status")
2332
2333 - def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2334 """Gets per-disk status information for all instances. 2335 2336 @type nodelist: list of strings 2337 @param nodelist: Node names 2338 @type node_image: dict of (name, L{objects.Node}) 2339 @param node_image: Node objects 2340 @type instanceinfo: dict of (name, L{objects.Instance}) 2341 @param instanceinfo: Instance objects 2342 @rtype: {instance: {node: [(succes, payload)]}} 2343 @return: a dictionary of per-instance dictionaries with nodes as 2344 keys and disk information as values; the disk information is a 2345 list of tuples (success, payload) 2346 2347 """ 2348 _ErrorIf = self._ErrorIf # pylint: disable=C0103 2349 2350 node_disks = {} 2351 node_disks_devonly = {} 2352 diskless_instances = set() 2353 diskless = constants.DT_DISKLESS 2354 2355 for nname in nodelist: 2356 node_instances = list(itertools.chain(node_image[nname].pinst, 2357 node_image[nname].sinst)) 2358 diskless_instances.update(inst for inst in node_instances 2359 if instanceinfo[inst].disk_template == diskless) 2360 disks = [(inst, disk) 2361 for inst in node_instances 2362 for disk in instanceinfo[inst].disks] 2363 2364 if not disks: 2365 # No need to collect data 2366 continue 2367 2368 node_disks[nname] = disks 2369 2370 # _AnnotateDiskParams makes already copies of the disks 2371 devonly = [] 2372 for (inst, dev) in disks: 2373 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg) 2374 self.cfg.SetDiskID(anno_disk, nname) 2375 devonly.append(anno_disk) 2376 2377 node_disks_devonly[nname] = devonly 2378 2379 assert len(node_disks) == len(node_disks_devonly) 2380 2381 # Collect data from all nodes with disks 2382 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(), 2383 node_disks_devonly) 2384 2385 assert len(result) == len(node_disks) 2386 2387 instdisk = {} 2388 2389 for (nname, nres) in result.items(): 2390 disks = node_disks[nname] 2391 2392 if nres.offline: 2393 # No data from this node 2394 data = len(disks) * [(False, "node offline")] 2395 else: 2396 msg = nres.fail_msg 2397 _ErrorIf(msg, constants.CV_ENODERPC, nname, 2398 "while getting disk information: %s", msg) 2399 if msg: 2400 # No data from this node 2401 data = len(disks) * [(False, msg)] 2402 else: 2403 data = [] 2404 for idx, i in enumerate(nres.payload): 2405 if isinstance(i, (tuple, list)) and len(i) == 2: 2406 data.append(i) 2407 else: 2408 logging.warning("Invalid result from node %s, entry %d: %s", 2409 nname, idx, i) 2410 data.append((False, "Invalid result from the remote node")) 2411 2412 for ((inst, _), status) in zip(disks, data): 2413 instdisk.setdefault(inst, {}).setdefault(nname, []).append(status) 2414 2415 # Add empty entries for diskless instances. 2416 for inst in diskless_instances: 2417 assert inst not in instdisk 2418 instdisk[inst] = {} 2419 2420 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and 2421 len(nnames) <= len(instanceinfo[inst].all_nodes) and 2422 compat.all(isinstance(s, (tuple, list)) and 2423 len(s) == 2 for s in statuses) 2424 for inst, nnames in instdisk.items() 2425 for nname, statuses in nnames.items()) 2426 if __debug__: 2427 instdisk_keys = set(instdisk) 2428 instanceinfo_keys = set(instanceinfo) 2429 assert instdisk_keys == instanceinfo_keys, \ 2430 ("instdisk keys (%s) do not match instanceinfo keys (%s)" % 2431 (instdisk_keys, instanceinfo_keys)) 2432 2433 return instdisk
2434 2435 @staticmethod
2436 - def _SshNodeSelector(group_uuid, all_nodes):
2437 """Create endless iterators for all potential SSH check hosts. 2438 2439 """ 2440 nodes = [node for node in all_nodes 2441 if (node.group != group_uuid and 2442 not node.offline)] 2443 keyfunc = operator.attrgetter("group") 2444 2445 return map(itertools.cycle, 2446 [sorted(map(operator.attrgetter("name"), names)) 2447 for _, names in itertools.groupby(sorted(nodes, key=keyfunc), 2448 keyfunc)])
2449 2450 @classmethod
2451 - def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2452 """Choose which nodes should talk to which other nodes. 2453 2454 We will make nodes contact all nodes in their group, and one node from 2455 every other group. 2456 2457 @warning: This algorithm has a known issue if one node group is much 2458 smaller than others (e.g. just one node). In such a case all other 2459 nodes will talk to the single node. 2460 2461 """ 2462 online_nodes = sorted(node.name for node in group_nodes if not node.offline) 2463 sel = cls._SshNodeSelector(group_uuid, all_nodes) 2464 2465 return (online_nodes, 2466 dict((name, sorted([i.next() for i in sel])) 2467 for name in online_nodes))
2468
2469 - def BuildHooksEnv(self):
2470 """Build hooks env. 2471 2472 Cluster-Verify hooks just ran in the post phase and their failure makes 2473 the output be logged in the verify output and the verification to fail. 2474 2475 """ 2476 env = { 2477 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()), 2478 } 2479 2480 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags())) 2481 for node in self.my_node_info.values()) 2482 2483 return env
2484
2485 - def BuildHooksNodes(self):
2486 """Build hooks nodes. 2487 2488 """ 2489 return ([], self.my_node_names)
2490
2491 - def Exec(self, feedback_fn):
2492 """Verify integrity of the node group, performing various test on nodes. 2493 2494 """ 2495 # This method has too many local variables. pylint: disable=R0914 2496 feedback_fn("* Verifying group '%s'" % self.group_info.name) 2497 2498 if not self.my_node_names: 2499 # empty node group 2500 feedback_fn("* Empty node group, skipping verification") 2501 return True 2502 2503 self.bad = False 2504 _ErrorIf = self._ErrorIf # pylint: disable=C0103 2505 verbose = self.op.verbose 2506 self._feedback_fn = feedback_fn 2507 2508 vg_name = self.cfg.GetVGName() 2509 drbd_helper = self.cfg.GetDRBDHelper() 2510 cluster = self.cfg.GetClusterInfo() 2511 hypervisors = cluster.enabled_hypervisors 2512 node_data_list = [self.my_node_info[name] for name in self.my_node_names] 2513 2514 i_non_redundant = [] # Non redundant instances 2515 i_non_a_balanced = [] # Non auto-balanced instances 2516 i_offline = 0 # Count of offline instances 2517 n_offline = 0 # Count of offline nodes 2518 n_drained = 0 # Count of nodes being drained 2519 node_vol_should = {} 2520 2521 # FIXME: verify OS list 2522 2523 # File verification 2524 filemap = ComputeAncillaryFiles(cluster, False) 2525 2526 # do local checksums 2527 master_node = self.master_node = self.cfg.GetMasterNode() 2528 master_ip = self.cfg.GetMasterIP() 2529 2530 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names)) 2531 2532 user_scripts = [] 2533 if self.cfg.GetUseExternalMipScript(): 2534 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT) 2535 2536 node_verify_param = { 2537 constants.NV_FILELIST: 2538 map(vcluster.MakeVirtualPath, 2539 utils.UniqueSequence(filename 2540 for files in filemap 2541 for filename in files)), 2542 constants.NV_NODELIST: 2543 self._SelectSshCheckNodes(node_data_list, self.group_uuid, 2544 self.all_node_info.values()), 2545 constants.NV_HYPERVISOR: hypervisors, 2546 constants.NV_HVPARAMS: 2547 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()), 2548 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip) 2549 for node in node_data_list 2550 if not node.offline], 2551 constants.NV_INSTANCELIST: hypervisors, 2552 constants.NV_VERSION: None, 2553 constants.NV_HVINFO: self.cfg.GetHypervisorType(), 2554 constants.NV_NODESETUP: None, 2555 constants.NV_TIME: None, 2556 constants.NV_MASTERIP: (master_node, master_ip), 2557 constants.NV_OSLIST: None, 2558 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(), 2559 constants.NV_USERSCRIPTS: user_scripts, 2560 } 2561 2562 if vg_name is not None: 2563 node_verify_param[constants.NV_VGLIST] = None 2564 node_verify_param[constants.NV_LVLIST] = vg_name 2565 node_verify_param[constants.NV_PVLIST] = [vg_name] 2566 2567 if drbd_helper: 2568 node_verify_param[constants.NV_DRBDLIST] = None 2569 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper 2570 2571 if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE: 2572 # Load file storage paths only from master node 2573 node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node 2574 2575 # bridge checks 2576 # FIXME: this needs to be changed per node-group, not cluster-wide 2577 bridges = set() 2578 default_nicpp = cluster.nicparams[constants.PP_DEFAULT] 2579 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: 2580 bridges.add(default_nicpp[constants.NIC_LINK]) 2581 for instance in self.my_inst_info.values(): 2582 for nic in instance.nics: 2583 full_nic = cluster.SimpleFillNIC(nic.nicparams) 2584 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: 2585 bridges.add(full_nic[constants.NIC_LINK]) 2586 2587 if bridges: 2588 node_verify_param[constants.NV_BRIDGES] = list(bridges) 2589 2590 # Build our expected cluster state 2591 node_image = dict((node.name, self.NodeImage(offline=node.offline, 2592 name=node.name, 2593 vm_capable=node.vm_capable)) 2594 for node in node_data_list) 2595 2596 # Gather OOB paths 2597 oob_paths = [] 2598 for node in self.all_node_info.values(): 2599 path = SupportsOob(self.cfg, node) 2600 if path and path not in oob_paths: 2601 oob_paths.append(path) 2602 2603 if oob_paths: 2604 node_verify_param[constants.NV_OOB_PATHS] = oob_paths 2605 2606 for instance in self.my_inst_names: 2607 inst_config = self.my_inst_info[instance] 2608 if inst_config.admin_state == constants.ADMINST_OFFLINE: 2609 i_offline += 1 2610 2611 for nname in inst_config.all_nodes: 2612 if nname not in node_image: 2613 gnode = self.NodeImage(name=nname) 2614 gnode.ghost = (nname not in self.all_node_info) 2615 node_image[nname] = gnode 2616 2617 inst_config.MapLVsByNode(node_vol_should) 2618 2619 pnode = inst_config.primary_node 2620 node_image[pnode].pinst.append(instance) 2621 2622 for snode in inst_config.secondary_nodes: 2623 nimg = node_image[snode] 2624 nimg.sinst.append(instance) 2625 if pnode not in nimg.sbp: 2626 nimg.sbp[pnode] = [] 2627 nimg.sbp[pnode].append(instance) 2628 2629 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names) 2630 # The value of exclusive_storage should be the same across the group, so if 2631 # it's True for at least a node, we act as if it were set for all the nodes 2632 self._exclusive_storage = compat.any(es_flags.values()) 2633 if self._exclusive_storage: 2634 node_verify_param[constants.NV_EXCLUSIVEPVS] = True 2635 2636 # At this point, we have the in-memory data structures complete, 2637 # except for the runtime information, which we'll gather next 2638 2639 # Due to the way our RPC system works, exact response times cannot be 2640 # guaranteed (e.g. a broken node could run into a timeout). By keeping the 2641 # time before and after executing the request, we can at least have a time 2642 # window. 2643 nvinfo_starttime = time.time() 2644 all_nvinfo = self.rpc.call_node_verify(self.my_node_names, 2645 node_verify_param, 2646 self.cfg.GetClusterName()) 2647 nvinfo_endtime = time.time() 2648 2649 if self.extra_lv_nodes and vg_name is not None: 2650 extra_lv_nvinfo = \ 2651 self.rpc.call_node_verify(self.extra_lv_nodes, 2652 {constants.NV_LVLIST: vg_name}, 2653 self.cfg.GetClusterName()) 2654 else: 2655 extra_lv_nvinfo = {} 2656 2657 all_drbd_map = self.cfg.ComputeDRBDMap() 2658 2659 feedback_fn("* Gathering disk information (%s nodes)" % 2660 len(self.my_node_names)) 2661 instdisk = self<