Package ganeti :: Package cmdlib :: Module cluster
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.cluster

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Logical units dealing with the cluster.""" 
  32   
  33  import copy 
  34  import itertools 
  35  import logging 
  36  import operator 
  37  import os 
  38  import re 
  39  import time 
  40   
  41  from ganeti import compat 
  42  from ganeti import constants 
  43  from ganeti import errors 
  44  from ganeti import hypervisor 
  45  from ganeti import locking 
  46  from ganeti import masterd 
  47  from ganeti import netutils 
  48  from ganeti import objects 
  49  from ganeti import opcodes 
  50  from ganeti import pathutils 
  51  from ganeti import query 
  52  import ganeti.rpc.node as rpc 
  53  from ganeti import runtime 
  54  from ganeti import ssh 
  55  from ganeti import uidpool 
  56  from ganeti import utils 
  57  from ganeti import vcluster 
  58   
  59  from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \ 
  60    ResultWithJobs 
  61  from ganeti.cmdlib.common import ShareAll, RunPostHook, \ 
  62    ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \ 
  63    GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \ 
  64    GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \ 
  65    CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \ 
  66    ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \ 
  67    CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \ 
  68    CheckDiskAccessModeConsistency, GetClientCertDigest, \ 
  69    AddInstanceCommunicationNetworkOp, ConnectInstanceCommunicationNetworkOp, \ 
  70    CheckImageValidity, CheckDiskAccessModeConsistency, EnsureKvmdOnNodes, \ 
  71    WarnAboutFailedSshUpdates 
  72   
  73  import ganeti.masterd.instance 
74 75 76 -class LUClusterRenewCrypto(NoHooksLU):
77 """Renew the cluster's crypto tokens. 78 79 """ 80 81 _MAX_NUM_RETRIES = 3 82 REQ_BGL = False 83
84 - def ExpandNames(self):
85 self.needed_locks = { 86 locking.LEVEL_NODE: locking.ALL_SET, 87 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 88 } 89 self.share_locks = ShareAll() 90 self.share_locks[locking.LEVEL_NODE] = 0 91 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
92
93 - def CheckPrereq(self):
94 """Check prerequisites. 95 96 This checks whether the cluster is empty. 97 98 Any errors are signaled by raising errors.OpPrereqError. 99 100 """ 101 self._ssh_renewal_suppressed = \ 102 not self.cfg.GetClusterInfo().modify_ssh_setup and self.op.ssh_keys
103
104 - def _RenewNodeSslCertificates(self, feedback_fn):
105 """Renews the nodes' SSL certificates. 106 107 Note that most of this operation is done in gnt_cluster.py, this LU only 108 takes care of the renewal of the client SSL certificates. 109 110 """ 111 master_uuid = self.cfg.GetMasterNode() 112 cluster = self.cfg.GetClusterInfo() 113 114 logging.debug("Renewing the master's SSL node certificate." 115 " Master's UUID: %s.", master_uuid) 116 117 # mapping node UUIDs to client certificate digests 118 digest_map = {} 119 master_digest = utils.GetCertificateDigest( 120 cert_filename=pathutils.NODED_CLIENT_CERT_FILE) 121 digest_map[master_uuid] = master_digest 122 logging.debug("Adding the master's SSL node certificate digest to the" 123 " configuration. Master's UUID: %s, Digest: %s", 124 master_uuid, master_digest) 125 126 node_errors = {} 127 nodes = self.cfg.GetAllNodesInfo() 128 logging.debug("Renewing non-master nodes' node certificates.") 129 for (node_uuid, node_info) in nodes.items(): 130 if node_info.offline: 131 feedback_fn("* Skipping offline node %s" % node_info.name) 132 logging.debug("Skipping offline node %s (UUID: %s).", 133 node_info.name, node_uuid) 134 continue 135 if node_uuid != master_uuid: 136 logging.debug("Adding certificate digest of node '%s'.", node_uuid) 137 last_exception = None 138 for i in range(self._MAX_NUM_RETRIES): 139 try: 140 if node_info.master_candidate: 141 node_digest = GetClientCertDigest(self, node_uuid) 142 digest_map[node_uuid] = node_digest 143 logging.debug("Added the node's certificate to candidate" 144 " certificate list. Current list: %s.", 145 str(cluster.candidate_certs)) 146 break 147 except errors.OpExecError as e: 148 last_exception = e 149 logging.error("Could not fetch a non-master node's SSL node" 150 " certificate at attempt no. %s. The node's UUID" 151 " is %s, and the error was: %s.", 152 str(i), node_uuid, e) 153 else: 154 if last_exception: 155 node_errors[node_uuid] = last_exception 156 157 if node_errors: 158 msg = ("Some nodes' SSL client certificates could not be fetched." 159 " Please make sure those nodes are reachable and rerun" 160 " the operation. The affected nodes and their errors are:\n") 161 for uuid, e in node_errors.items(): 162 msg += "Node %s: %s\n" % (uuid, e) 163 feedback_fn(msg) 164 165 self.cfg.SetCandidateCerts(digest_map)
166
167 - def _RenewSshKeys(self, feedback_fn):
168 """Renew all nodes' SSH keys. 169 170 """ 171 master_uuid = self.cfg.GetMasterNode() 172 173 nodes = self.cfg.GetAllNodesInfo() 174 nodes_uuid_names = [(node_uuid, node_info.name) for (node_uuid, node_info) 175 in nodes.items() if not node_info.offline] 176 node_names = [name for (_, name) in nodes_uuid_names] 177 node_uuids = [uuid for (uuid, _) in nodes_uuid_names] 178 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 179 master_candidate_uuids = self.cfg.GetMasterCandidateUuids() 180 181 result = self.rpc.call_node_ssh_keys_renew( 182 [master_uuid], 183 node_uuids, node_names, 184 master_candidate_uuids, 185 potential_master_candidates) 186 187 # Check if there were serious errors (for example master key files not 188 # writable). 189 result[master_uuid].Raise("Could not renew the SSH keys of all nodes") 190 191 # Process any non-disruptive errors (a few nodes unreachable etc.) 192 WarnAboutFailedSshUpdates(result, master_uuid, feedback_fn)
193
194 - def Exec(self, feedback_fn):
195 if self.op.node_certificates: 196 feedback_fn("Renewing Node SSL certificates") 197 self._RenewNodeSslCertificates(feedback_fn) 198 if self.op.ssh_keys and not self._ssh_renewal_suppressed: 199 feedback_fn("Renewing SSH keys") 200 self._RenewSshKeys(feedback_fn) 201 elif self._ssh_renewal_suppressed: 202 feedback_fn("Cannot renew SSH keys if the cluster is configured to not" 203 " modify the SSH setup.")
204
205 206 -class LUClusterActivateMasterIp(NoHooksLU):
207 """Activate the master IP on the master node. 208 209 """
210 - def Exec(self, feedback_fn):
211 """Activate the master IP. 212 213 """ 214 master_params = self.cfg.GetMasterNetworkParameters() 215 ems = self.cfg.GetUseExternalMipScript() 216 result = self.rpc.call_node_activate_master_ip(master_params.uuid, 217 master_params, ems) 218 result.Raise("Could not activate the master IP")
219
220 221 -class LUClusterDeactivateMasterIp(NoHooksLU):
222 """Deactivate the master IP on the master node. 223 224 """
225 - def Exec(self, feedback_fn):
226 """Deactivate the master IP. 227 228 """ 229 master_params = self.cfg.GetMasterNetworkParameters() 230 ems = self.cfg.GetUseExternalMipScript() 231 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid, 232 master_params, ems) 233 result.Raise("Could not deactivate the master IP")
234
235 236 -class LUClusterConfigQuery(NoHooksLU):
237 """Return configuration values. 238 239 """ 240 REQ_BGL = False 241
242 - def CheckArguments(self):
243 self.cq = ClusterQuery(None, self.op.output_fields, False)
244
245 - def ExpandNames(self):
246 self.cq.ExpandNames(self)
247
248 - def DeclareLocks(self, level):
249 self.cq.DeclareLocks(self, level)
250
251 - def Exec(self, feedback_fn):
252 result = self.cq.OldStyleQuery(self) 253 254 assert len(result) == 1 255 256 return result[0]
257
258 259 -class LUClusterDestroy(LogicalUnit):
260 """Logical unit for destroying the cluster. 261 262 """ 263 HPATH = "cluster-destroy" 264 HTYPE = constants.HTYPE_CLUSTER 265 266 # Read by the job queue to detect when the cluster is gone and job files will 267 # never be available. 268 # FIXME: This variable should be removed together with the Python job queue. 269 clusterHasBeenDestroyed = False 270
271 - def BuildHooksEnv(self):
272 """Build hooks env. 273 274 """ 275 return { 276 "OP_TARGET": self.cfg.GetClusterName(), 277 }
278
279 - def BuildHooksNodes(self):
280 """Build hooks nodes. 281 282 """ 283 return ([], [])
284
285 - def CheckPrereq(self):
286 """Check prerequisites. 287 288 This checks whether the cluster is empty. 289 290 Any errors are signaled by raising errors.OpPrereqError. 291 292 """ 293 master = self.cfg.GetMasterNode() 294 295 nodelist = self.cfg.GetNodeList() 296 if len(nodelist) != 1 or nodelist[0] != master: 297 raise errors.OpPrereqError("There are still %d node(s) in" 298 " this cluster." % (len(nodelist) - 1), 299 errors.ECODE_INVAL) 300 instancelist = self.cfg.GetInstanceList() 301 if instancelist: 302 raise errors.OpPrereqError("There are still %d instance(s) in" 303 " this cluster." % len(instancelist), 304 errors.ECODE_INVAL)
305
306 - def Exec(self, feedback_fn):
307 """Destroys the cluster. 308 309 """ 310 master_params = self.cfg.GetMasterNetworkParameters() 311 312 # Run post hooks on master node before it's removed 313 RunPostHook(self, self.cfg.GetNodeName(master_params.uuid)) 314 315 ems = self.cfg.GetUseExternalMipScript() 316 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid, 317 master_params, ems) 318 result.Warn("Error disabling the master IP address", self.LogWarning) 319 320 self.wconfd.Client().PrepareClusterDestruction(self.wconfdcontext) 321 322 # signal to the job queue that the cluster is gone 323 LUClusterDestroy.clusterHasBeenDestroyed = True 324 325 return master_params.uuid
326
327 328 -class LUClusterPostInit(LogicalUnit):
329 """Logical unit for running hooks after cluster initialization. 330 331 """ 332 HPATH = "cluster-init" 333 HTYPE = constants.HTYPE_CLUSTER 334
335 - def CheckArguments(self):
336 self.master_uuid = self.cfg.GetMasterNode() 337 self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo()) 338 339 # TODO: When Issue 584 is solved, and None is properly parsed when used 340 # as a default value, ndparams.get(.., None) can be changed to 341 # ndparams[..] to access the values directly 342 343 # OpenvSwitch: Warn user if link is missing 344 if (self.master_ndparams[constants.ND_OVS] and not 345 self.master_ndparams.get(constants.ND_OVS_LINK, None)): 346 self.LogInfo("No physical interface for OpenvSwitch was given." 347 " OpenvSwitch will not have an outside connection. This" 348 " might not be what you want.")
349
350 - def BuildHooksEnv(self):
351 """Build hooks env. 352 353 """ 354 return { 355 "OP_TARGET": self.cfg.GetClusterName(), 356 }
357
358 - def BuildHooksNodes(self):
359 """Build hooks nodes. 360 361 """ 362 return ([], [self.cfg.GetMasterNode()])
363
364 - def Exec(self, feedback_fn):
365 """Create and configure Open vSwitch 366 367 """ 368 if self.master_ndparams[constants.ND_OVS]: 369 result = self.rpc.call_node_configure_ovs( 370 self.master_uuid, 371 self.master_ndparams[constants.ND_OVS_NAME], 372 self.master_ndparams.get(constants.ND_OVS_LINK, None)) 373 result.Raise("Could not successully configure Open vSwitch") 374 375 return True
376
377 378 -class ClusterQuery(QueryBase):
379 FIELDS = query.CLUSTER_FIELDS 380 381 #: Do not sort (there is only one item) 382 SORT_FIELD = None 383
384 - def ExpandNames(self, lu):
385 lu.needed_locks = {} 386 387 # The following variables interact with _QueryBase._GetNames 388 self.wanted = locking.ALL_SET 389 self.do_locking = self.use_locking 390 391 if self.do_locking: 392 raise errors.OpPrereqError("Can not use locking for cluster queries", 393 errors.ECODE_INVAL)
394
395 - def DeclareLocks(self, lu, level):
396 pass
397
398 - def _GetQueryData(self, lu):
399 """Computes the list of nodes and their attributes. 400 401 """ 402 if query.CQ_CONFIG in self.requested_data: 403 cluster = lu.cfg.GetClusterInfo() 404 nodes = lu.cfg.GetAllNodesInfo() 405 else: 406 cluster = NotImplemented 407 nodes = NotImplemented 408 409 if query.CQ_QUEUE_DRAINED in self.requested_data: 410 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE) 411 else: 412 drain_flag = NotImplemented 413 414 if query.CQ_WATCHER_PAUSE in self.requested_data: 415 master_node_uuid = lu.cfg.GetMasterNode() 416 417 result = lu.rpc.call_get_watcher_pause(master_node_uuid) 418 result.Raise("Can't retrieve watcher pause from master node '%s'" % 419 lu.cfg.GetMasterNodeName()) 420 421 watcher_pause = result.payload 422 else: 423 watcher_pause = NotImplemented 424 425 return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
426
427 428 -class LUClusterQuery(NoHooksLU):
429 """Query cluster configuration. 430 431 """ 432 REQ_BGL = False 433
434 - def ExpandNames(self):
435 self.needed_locks = {}
436
437 - def Exec(self, feedback_fn):
438 """Return cluster config. 439 440 """ 441 cluster = self.cfg.GetClusterInfo() 442 os_hvp = {} 443 444 # Filter just for enabled hypervisors 445 for os_name, hv_dict in cluster.os_hvp.items(): 446 os_hvp[os_name] = {} 447 for hv_name, hv_params in hv_dict.items(): 448 if hv_name in cluster.enabled_hypervisors: 449 os_hvp[os_name][hv_name] = hv_params 450 451 # Convert ip_family to ip_version 452 primary_ip_version = constants.IP4_VERSION 453 if cluster.primary_ip_family == netutils.IP6Address.family: 454 primary_ip_version = constants.IP6_VERSION 455 456 result = { 457 "software_version": constants.RELEASE_VERSION, 458 "protocol_version": constants.PROTOCOL_VERSION, 459 "config_version": constants.CONFIG_VERSION, 460 "os_api_version": max(constants.OS_API_VERSIONS), 461 "export_version": constants.EXPORT_VERSION, 462 "vcs_version": constants.VCS_VERSION, 463 "architecture": runtime.GetArchInfo(), 464 "name": cluster.cluster_name, 465 "master": self.cfg.GetMasterNodeName(), 466 "default_hypervisor": cluster.primary_hypervisor, 467 "enabled_hypervisors": cluster.enabled_hypervisors, 468 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name]) 469 for hypervisor_name in cluster.enabled_hypervisors]), 470 "os_hvp": os_hvp, 471 "beparams": cluster.beparams, 472 "osparams": cluster.osparams, 473 "ipolicy": cluster.ipolicy, 474 "nicparams": cluster.nicparams, 475 "ndparams": cluster.ndparams, 476 "diskparams": cluster.diskparams, 477 "candidate_pool_size": cluster.candidate_pool_size, 478 "max_running_jobs": cluster.max_running_jobs, 479 "max_tracked_jobs": cluster.max_tracked_jobs, 480 "mac_prefix": cluster.mac_prefix, 481 "master_netdev": cluster.master_netdev, 482 "master_netmask": cluster.master_netmask, 483 "use_external_mip_script": cluster.use_external_mip_script, 484 "volume_group_name": cluster.volume_group_name, 485 "drbd_usermode_helper": cluster.drbd_usermode_helper, 486 "file_storage_dir": cluster.file_storage_dir, 487 "shared_file_storage_dir": cluster.shared_file_storage_dir, 488 "maintain_node_health": cluster.maintain_node_health, 489 "ctime": cluster.ctime, 490 "mtime": cluster.mtime, 491 "uuid": cluster.uuid, 492 "tags": list(cluster.GetTags()), 493 "uid_pool": cluster.uid_pool, 494 "default_iallocator": cluster.default_iallocator, 495 "default_iallocator_params": cluster.default_iallocator_params, 496 "reserved_lvs": cluster.reserved_lvs, 497 "primary_ip_version": primary_ip_version, 498 "prealloc_wipe_disks": cluster.prealloc_wipe_disks, 499 "hidden_os": cluster.hidden_os, 500 "blacklisted_os": cluster.blacklisted_os, 501 "enabled_disk_templates": cluster.enabled_disk_templates, 502 "install_image": cluster.install_image, 503 "instance_communication_network": cluster.instance_communication_network, 504 "compression_tools": cluster.compression_tools, 505 "enabled_user_shutdown": cluster.enabled_user_shutdown, 506 } 507 508 return result
509
510 511 -class LUClusterRedistConf(NoHooksLU):
512 """Force the redistribution of cluster configuration. 513 514 This is a very simple LU. 515 516 """ 517 REQ_BGL = False 518
519 - def ExpandNames(self):
520 self.needed_locks = { 521 locking.LEVEL_NODE: locking.ALL_SET, 522 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 523 } 524 self.share_locks = ShareAll()
525
526 - def Exec(self, feedback_fn):
527 """Redistribute the configuration. 528 529 """ 530 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn) 531 RedistributeAncillaryFiles(self)
532
533 534 -class LUClusterRename(LogicalUnit):
535 """Rename the cluster. 536 537 """ 538 HPATH = "cluster-rename" 539 HTYPE = constants.HTYPE_CLUSTER 540
541 - def BuildHooksEnv(self):
542 """Build hooks env. 543 544 """ 545 return { 546 "OP_TARGET": self.cfg.GetClusterName(), 547 "NEW_NAME": self.op.name, 548 }
549
550 - def BuildHooksNodes(self):
551 """Build hooks nodes. 552 553 """ 554 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
555
556 - def CheckPrereq(self):
557 """Verify that the passed name is a valid one. 558 559 """ 560 hostname = netutils.GetHostname(name=self.op.name, 561 family=self.cfg.GetPrimaryIPFamily()) 562 563 new_name = hostname.name 564 self.ip = new_ip = hostname.ip 565 old_name = self.cfg.GetClusterName() 566 old_ip = self.cfg.GetMasterIP() 567 if new_name == old_name and new_ip == old_ip: 568 raise errors.OpPrereqError("Neither the name nor the IP address of the" 569 " cluster has changed", 570 errors.ECODE_INVAL) 571 if new_ip != old_ip: 572 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): 573 raise errors.OpPrereqError("The given cluster IP address (%s) is" 574 " reachable on the network" % 575 new_ip, errors.ECODE_NOTUNIQUE) 576 577 self.op.name = new_name
578
579 - def Exec(self, feedback_fn):
580 """Rename the cluster. 581 582 """ 583 clustername = self.op.name 584 new_ip = self.ip 585 586 # shutdown the master IP 587 master_params = self.cfg.GetMasterNetworkParameters() 588 ems = self.cfg.GetUseExternalMipScript() 589 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid, 590 master_params, ems) 591 result.Raise("Could not disable the master role") 592 593 try: 594 cluster = self.cfg.GetClusterInfo() 595 cluster.cluster_name = clustername 596 cluster.master_ip = new_ip 597 self.cfg.Update(cluster, feedback_fn) 598 599 # update the known hosts file 600 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE) 601 node_list = self.cfg.GetOnlineNodeList() 602 try: 603 node_list.remove(master_params.uuid) 604 except ValueError: 605 pass 606 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE) 607 finally: 608 master_params.ip = new_ip 609 result = self.rpc.call_node_activate_master_ip(master_params.uuid, 610 master_params, ems) 611 result.Warn("Could not re-enable the master role on the master," 612 " please restart manually", self.LogWarning) 613 614 return clustername
615
616 617 -class LUClusterRepairDiskSizes(NoHooksLU):
618 """Verifies the cluster disks sizes. 619 620 """ 621 REQ_BGL = False 622
623 - def ExpandNames(self):
624 if self.op.instances: 625 (_, self.wanted_names) = GetWantedInstances(self, self.op.instances) 626 # Not getting the node allocation lock as only a specific set of 627 # instances (and their nodes) is going to be acquired 628 self.needed_locks = { 629 locking.LEVEL_NODE_RES: [], 630 locking.LEVEL_INSTANCE: self.wanted_names, 631 } 632 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE 633 else: 634 self.wanted_names = None 635 self.needed_locks = { 636 locking.LEVEL_NODE_RES: locking.ALL_SET, 637 locking.LEVEL_INSTANCE: locking.ALL_SET, 638 639 # This opcode is acquires the node locks for all instances 640 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 641 } 642 643 self.share_locks = { 644 locking.LEVEL_NODE_RES: 1, 645 locking.LEVEL_INSTANCE: 0, 646 locking.LEVEL_NODE_ALLOC: 1, 647 }
648
649 - def DeclareLocks(self, level):
650 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None: 651 self._LockInstancesNodes(primary_only=True, level=level)
652
653 - def CheckPrereq(self):
654 """Check prerequisites. 655 656 This only checks the optional instance list against the existing names. 657 658 """ 659 if self.wanted_names is None: 660 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE) 661 662 self.wanted_instances = \ 663 map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
664
665 - def _EnsureChildSizes(self, disk):
666 """Ensure children of the disk have the needed disk size. 667 668 This is valid mainly for DRBD8 and fixes an issue where the 669 children have smaller disk size. 670 671 @param disk: an L{ganeti.objects.Disk} object 672 673 """ 674 if disk.dev_type == constants.DT_DRBD8: 675 assert disk.children, "Empty children for DRBD8?" 676 fchild = disk.children[0] 677 mismatch = fchild.size < disk.size 678 if mismatch: 679 self.LogInfo("Child disk has size %d, parent %d, fixing", 680 fchild.size, disk.size) 681 fchild.size = disk.size 682 683 # and we recurse on this child only, not on the metadev 684 return self._EnsureChildSizes(fchild) or mismatch 685 else: 686 return False
687
688 - def Exec(self, feedback_fn):
689 """Verify the size of cluster disks. 690 691 """ 692 # TODO: check child disks too 693 # TODO: check differences in size between primary/secondary nodes 694 per_node_disks = {} 695 for instance in self.wanted_instances: 696 pnode = instance.primary_node 697 if pnode not in per_node_disks: 698 per_node_disks[pnode] = [] 699 for idx, disk in enumerate(self.cfg.GetInstanceDisks(instance.uuid)): 700 per_node_disks[pnode].append((instance, idx, disk)) 701 702 assert not (frozenset(per_node_disks.keys()) - 703 frozenset(self.owned_locks(locking.LEVEL_NODE_RES))), \ 704 "Not owning correct locks" 705 assert not self.owned_locks(locking.LEVEL_NODE) 706 707 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, 708 per_node_disks.keys()) 709 710 changed = [] 711 for node_uuid, dskl in per_node_disks.items(): 712 if not dskl: 713 # no disks on the node 714 continue 715 716 newl = [([v[2].Copy()], v[0]) for v in dskl] 717 node_name = self.cfg.GetNodeName(node_uuid) 718 result = self.rpc.call_blockdev_getdimensions(node_uuid, newl) 719 if result.fail_msg: 720 self.LogWarning("Failure in blockdev_getdimensions call to node" 721 " %s, ignoring", node_name) 722 continue 723 if len(result.payload) != len(dskl): 724 logging.warning("Invalid result from node %s: len(dksl)=%d," 725 " result.payload=%s", node_name, len(dskl), 726 result.payload) 727 self.LogWarning("Invalid result from node %s, ignoring node results", 728 node_name) 729 continue 730 for ((instance, idx, disk), dimensions) in zip(dskl, result.payload): 731 if dimensions is None: 732 self.LogWarning("Disk %d of instance %s did not return size" 733 " information, ignoring", idx, instance.name) 734 continue 735 if not isinstance(dimensions, (tuple, list)): 736 self.LogWarning("Disk %d of instance %s did not return valid" 737 " dimension information, ignoring", idx, 738 instance.name) 739 continue 740 (size, spindles) = dimensions 741 if not isinstance(size, (int, long)): 742 self.LogWarning("Disk %d of instance %s did not return valid" 743 " size information, ignoring", idx, instance.name) 744 continue 745 size = size >> 20 746 if size != disk.size: 747 self.LogInfo("Disk %d of instance %s has mismatched size," 748 " correcting: recorded %d, actual %d", idx, 749 instance.name, disk.size, size) 750 disk.size = size 751 self.cfg.Update(disk, feedback_fn) 752 changed.append((instance.name, idx, "size", size)) 753 if es_flags[node_uuid]: 754 if spindles is None: 755 self.LogWarning("Disk %d of instance %s did not return valid" 756 " spindles information, ignoring", idx, 757 instance.name) 758 elif disk.spindles is None or disk.spindles != spindles: 759 self.LogInfo("Disk %d of instance %s has mismatched spindles," 760 " correcting: recorded %s, actual %s", 761 idx, instance.name, disk.spindles, spindles) 762 disk.spindles = spindles 763 self.cfg.Update(disk, feedback_fn) 764 changed.append((instance.name, idx, "spindles", disk.spindles)) 765 if self._EnsureChildSizes(disk): 766 self.cfg.Update(disk, feedback_fn) 767 changed.append((instance.name, idx, "size", disk.size)) 768 return changed
769
770 771 -def _ValidateNetmask(cfg, netmask):
772 """Checks if a netmask is valid. 773 774 @type cfg: L{config.ConfigWriter} 775 @param cfg: cluster configuration 776 @type netmask: int 777 @param netmask: netmask to be verified 778 @raise errors.OpPrereqError: if the validation fails 779 780 """ 781 ip_family = cfg.GetPrimaryIPFamily() 782 try: 783 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family) 784 except errors.ProgrammerError: 785 raise errors.OpPrereqError("Invalid primary ip family: %s." % 786 ip_family, errors.ECODE_INVAL) 787 if not ipcls.ValidateNetmask(netmask): 788 raise errors.OpPrereqError("CIDR netmask (%s) not valid" % 789 (netmask), errors.ECODE_INVAL)
790
791 792 -def CheckFileBasedStoragePathVsEnabledDiskTemplates( 793 logging_warn_fn, file_storage_dir, enabled_disk_templates, 794 file_disk_template):
795 """Checks whether the given file-based storage directory is acceptable. 796 797 Note: This function is public, because it is also used in bootstrap.py. 798 799 @type logging_warn_fn: function 800 @param logging_warn_fn: function which accepts a string and logs it 801 @type file_storage_dir: string 802 @param file_storage_dir: the directory to be used for file-based instances 803 @type enabled_disk_templates: list of string 804 @param enabled_disk_templates: the list of enabled disk templates 805 @type file_disk_template: string 806 @param file_disk_template: the file-based disk template for which the 807 path should be checked 808 809 """ 810 assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes( 811 constants.ST_FILE, constants.ST_SHARED_FILE, constants.ST_GLUSTER 812 )) 813 814 file_storage_enabled = file_disk_template in enabled_disk_templates 815 if file_storage_dir is not None: 816 if file_storage_dir == "": 817 if file_storage_enabled: 818 raise errors.OpPrereqError( 819 "Unsetting the '%s' storage directory while having '%s' storage" 820 " enabled is not permitted." % 821 (file_disk_template, file_disk_template), 822 errors.ECODE_INVAL) 823 else: 824 if not file_storage_enabled: 825 logging_warn_fn( 826 "Specified a %s storage directory, although %s storage is not" 827 " enabled." % (file_disk_template, file_disk_template)) 828 else: 829 raise errors.ProgrammerError("Received %s storage dir with value" 830 " 'None'." % file_disk_template)
831
832 833 -def CheckFileStoragePathVsEnabledDiskTemplates( 834 logging_warn_fn, file_storage_dir, enabled_disk_templates):
835 """Checks whether the given file storage directory is acceptable. 836 837 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates} 838 839 """ 840 CheckFileBasedStoragePathVsEnabledDiskTemplates( 841 logging_warn_fn, file_storage_dir, enabled_disk_templates, 842 constants.DT_FILE)
843
844 845 -def CheckSharedFileStoragePathVsEnabledDiskTemplates( 846 logging_warn_fn, file_storage_dir, enabled_disk_templates):
847 """Checks whether the given shared file storage directory is acceptable. 848 849 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates} 850 851 """ 852 CheckFileBasedStoragePathVsEnabledDiskTemplates( 853 logging_warn_fn, file_storage_dir, enabled_disk_templates, 854 constants.DT_SHARED_FILE)
855
856 857 -def CheckGlusterStoragePathVsEnabledDiskTemplates( 858 logging_warn_fn, file_storage_dir, enabled_disk_templates):
859 """Checks whether the given gluster storage directory is acceptable. 860 861 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates} 862 863 """ 864 CheckFileBasedStoragePathVsEnabledDiskTemplates( 865 logging_warn_fn, file_storage_dir, enabled_disk_templates, 866 constants.DT_GLUSTER)
867
868 869 -def CheckCompressionTools(tools):
870 """Check whether the provided compression tools look like executables. 871 872 @type tools: list of string 873 @param tools: The tools provided as opcode input 874 875 """ 876 regex = re.compile('^[-_a-zA-Z0-9]+$') 877 illegal_tools = [t for t in tools if not regex.match(t)] 878 879 if illegal_tools: 880 raise errors.OpPrereqError( 881 "The tools '%s' contain illegal characters: only alphanumeric values," 882 " dashes, and underscores are allowed" % ", ".join(illegal_tools), 883 errors.ECODE_INVAL 884 ) 885 886 if constants.IEC_GZIP not in tools: 887 raise errors.OpPrereqError("For compatibility reasons, the %s utility must" 888 " be present among the compression tools" % 889 constants.IEC_GZIP, errors.ECODE_INVAL) 890 891 if constants.IEC_NONE in tools: 892 raise errors.OpPrereqError("%s is a reserved value used for no compression," 893 " and cannot be used as the name of a tool" % 894 constants.IEC_NONE, errors.ECODE_INVAL)
895
896 897 -class LUClusterSetParams(LogicalUnit):
898 """Change the parameters of the cluster. 899 900 """ 901 HPATH = "cluster-modify" 902 HTYPE = constants.HTYPE_CLUSTER 903 REQ_BGL = False 904
905 - def CheckArguments(self):
906 """Check parameters 907 908 """ 909 if self.op.uid_pool: 910 uidpool.CheckUidPool(self.op.uid_pool) 911 912 if self.op.add_uids: 913 uidpool.CheckUidPool(self.op.add_uids) 914 915 if self.op.remove_uids: 916 uidpool.CheckUidPool(self.op.remove_uids) 917 918 if self.op.mac_prefix: 919 self.op.mac_prefix = \ 920 utils.NormalizeAndValidateThreeOctetMacPrefix(self.op.mac_prefix) 921 922 if self.op.master_netmask is not None: 923 _ValidateNetmask(self.cfg, self.op.master_netmask) 924 925 if self.op.diskparams: 926 for dt_params in self.op.diskparams.values(): 927 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES) 928 try: 929 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS) 930 CheckDiskAccessModeValidity(self.op.diskparams) 931 except errors.OpPrereqError, err: 932 raise errors.OpPrereqError("While verify diskparams options: %s" % err, 933 errors.ECODE_INVAL) 934 935 if self.op.install_image is not None: 936 CheckImageValidity(self.op.install_image, 937 "Install image must be an absolute path or a URL")
938
939 - def ExpandNames(self):
940 # FIXME: in the future maybe other cluster params won't require checking on 941 # all nodes to be modified. 942 # FIXME: This opcode changes cluster-wide settings. Is acquiring all 943 # resource locks the right thing, shouldn't it be the BGL instead? 944 self.needed_locks = { 945 locking.LEVEL_NODE: locking.ALL_SET, 946 locking.LEVEL_INSTANCE: locking.ALL_SET, 947 locking.LEVEL_NODEGROUP: locking.ALL_SET, 948 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 949 } 950 self.share_locks = ShareAll()
951
952 - def BuildHooksEnv(self):
953 """Build hooks env. 954 955 """ 956 return { 957 "OP_TARGET": self.cfg.GetClusterName(), 958 "NEW_VG_NAME": self.op.vg_name, 959 }
960
961 - def BuildHooksNodes(self):
962 """Build hooks nodes. 963 964 """ 965 mn = self.cfg.GetMasterNode() 966 return ([mn], [mn])
967
968 - def _CheckVgName(self, node_uuids, enabled_disk_templates, 969 new_enabled_disk_templates):
970 """Check the consistency of the vg name on all nodes and in case it gets 971 unset whether there are instances still using it. 972 973 """ 974 lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates) 975 lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates, 976 new_enabled_disk_templates) 977 current_vg_name = self.cfg.GetVGName() 978 979 if self.op.vg_name == '': 980 if lvm_is_enabled: 981 raise errors.OpPrereqError("Cannot unset volume group if lvm-based" 982 " disk templates are or get enabled.", 983 errors.ECODE_INVAL) 984 985 if self.op.vg_name is None: 986 if current_vg_name is None and lvm_is_enabled: 987 raise errors.OpPrereqError("Please specify a volume group when" 988 " enabling lvm-based disk-templates.", 989 errors.ECODE_INVAL) 990 991 if self.op.vg_name is not None and not self.op.vg_name: 992 if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN): 993 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based" 994 " instances exist", errors.ECODE_INVAL) 995 996 if (self.op.vg_name is not None and lvm_is_enabled) or \ 997 (self.cfg.GetVGName() is not None and lvm_gets_enabled): 998 self._CheckVgNameOnNodes(node_uuids)
999
1000 - def _CheckVgNameOnNodes(self, node_uuids):
1001 """Check the status of the volume group on each node. 1002 1003 """ 1004 vglist = self.rpc.call_vg_list(node_uuids) 1005 for node_uuid in node_uuids: 1006 msg = vglist[node_uuid].fail_msg 1007 if msg: 1008 # ignoring down node 1009 self.LogWarning("Error while gathering data on node %s" 1010 " (ignoring node): %s", 1011 self.cfg.GetNodeName(node_uuid), msg) 1012 continue 1013 vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload, 1014 self.op.vg_name, 1015 constants.MIN_VG_SIZE) 1016 if vgstatus: 1017 raise errors.OpPrereqError("Error on node '%s': %s" % 1018 (self.cfg.GetNodeName(node_uuid), vgstatus), 1019 errors.ECODE_ENVIRON)
1020 1021 @staticmethod
1022 - def _GetDiskTemplateSetsInner(op_enabled_disk_templates, 1023 old_enabled_disk_templates):
1024 """Computes three sets of disk templates. 1025 1026 @see: C{_GetDiskTemplateSets} for more details. 1027 1028 """ 1029 enabled_disk_templates = None 1030 new_enabled_disk_templates = [] 1031 disabled_disk_templates = [] 1032 if op_enabled_disk_templates: 1033 enabled_disk_templates = op_enabled_disk_templates 1034 new_enabled_disk_templates = \ 1035 list(set(enabled_disk_templates) 1036 - set(old_enabled_disk_templates)) 1037 disabled_disk_templates = \ 1038 list(set(old_enabled_disk_templates) 1039 - set(enabled_disk_templates)) 1040 else: 1041 enabled_disk_templates = old_enabled_disk_templates 1042 return (enabled_disk_templates, new_enabled_disk_templates, 1043 disabled_disk_templates)
1044
1045 - def _GetDiskTemplateSets(self, cluster):
1046 """Computes three sets of disk templates. 1047 1048 The three sets are: 1049 - disk templates that will be enabled after this operation (no matter if 1050 they were enabled before or not) 1051 - disk templates that get enabled by this operation (thus haven't been 1052 enabled before.) 1053 - disk templates that get disabled by this operation 1054 1055 """ 1056 return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates, 1057 cluster.enabled_disk_templates)
1058
1059 - def _CheckIpolicy(self, cluster, enabled_disk_templates):
1060 """Checks the ipolicy. 1061 1062 @type cluster: C{objects.Cluster} 1063 @param cluster: the cluster's configuration 1064 @type enabled_disk_templates: list of string 1065 @param enabled_disk_templates: list of (possibly newly) enabled disk 1066 templates 1067 1068 """ 1069 # FIXME: write unit tests for this 1070 if self.op.ipolicy: 1071 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy, 1072 group_policy=False) 1073 1074 CheckIpolicyVsDiskTemplates(self.new_ipolicy, 1075 enabled_disk_templates) 1076 1077 all_instances = self.cfg.GetAllInstancesInfo().values() 1078 violations = set() 1079 for group in self.cfg.GetAllNodeGroupsInfo().values(): 1080 instances = frozenset( 1081 [inst for inst in all_instances 1082 if compat.any(nuuid in group.members 1083 for nuuid in self.cfg.GetInstanceNodes(inst.uuid))]) 1084 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy) 1085 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group) 1086 new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances, 1087 self.cfg) 1088 if new: 1089 violations.update(new) 1090 1091 if violations: 1092 self.LogWarning("After the ipolicy change the following instances" 1093 " violate them: %s", 1094 utils.CommaJoin(utils.NiceSort(violations))) 1095 else: 1096 CheckIpolicyVsDiskTemplates(cluster.ipolicy, 1097 enabled_disk_templates)
1098
1099 - def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
1100 """Checks whether the set DRBD helper actually exists on the nodes. 1101 1102 @type drbd_helper: string 1103 @param drbd_helper: path of the drbd usermode helper binary 1104 @type node_uuids: list of strings 1105 @param node_uuids: list of node UUIDs to check for the helper 1106 1107 """ 1108 # checks given drbd helper on all nodes 1109 helpers = self.rpc.call_drbd_helper(node_uuids) 1110 for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids): 1111 if ninfo.offline: 1112 self.LogInfo("Not checking drbd helper on offline node %s", 1113 ninfo.name) 1114 continue 1115 msg = helpers[ninfo.uuid].fail_msg 1116 if msg: 1117 raise errors.OpPrereqError("Error checking drbd helper on node" 1118 " '%s': %s" % (ninfo.name, msg), 1119 errors.ECODE_ENVIRON) 1120 node_helper = helpers[ninfo.uuid].payload 1121 if node_helper != drbd_helper: 1122 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" % 1123 (ninfo.name, node_helper), 1124 errors.ECODE_ENVIRON)
1125
1126 - def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
1127 """Check the DRBD usermode helper. 1128 1129 @type node_uuids: list of strings 1130 @param node_uuids: a list of nodes' UUIDs 1131 @type drbd_enabled: boolean 1132 @param drbd_enabled: whether DRBD will be enabled after this operation 1133 (no matter if it was disabled before or not) 1134 @type drbd_gets_enabled: boolen 1135 @param drbd_gets_enabled: true if DRBD was disabled before this 1136 operation, but will be enabled afterwards 1137 1138 """ 1139 if self.op.drbd_helper == '': 1140 if drbd_enabled: 1141 raise errors.OpPrereqError("Cannot disable drbd helper while" 1142 " DRBD is enabled.", errors.ECODE_STATE) 1143 if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8): 1144 raise errors.OpPrereqError("Cannot disable drbd helper while" 1145 " drbd-based instances exist", 1146 errors.ECODE_INVAL) 1147 1148 else: 1149 if self.op.drbd_helper is not None and drbd_enabled: 1150 self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids) 1151 else: 1152 if drbd_gets_enabled: 1153 current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper 1154 if current_drbd_helper is not None: 1155 self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids) 1156 else: 1157 raise errors.OpPrereqError("Cannot enable DRBD without a" 1158 " DRBD usermode helper set.", 1159 errors.ECODE_STATE)
1160
1161 - def _CheckInstancesOfDisabledDiskTemplates( 1162 self, disabled_disk_templates):
1163 """Check whether we try to disable a disk template that is in use. 1164 1165 @type disabled_disk_templates: list of string 1166 @param disabled_disk_templates: list of disk templates that are going to 1167 be disabled by this operation 1168 1169 """ 1170 for disk_template in disabled_disk_templates: 1171 if self.cfg.HasAnyDiskOfType(disk_template): 1172 raise errors.OpPrereqError( 1173 "Cannot disable disk template '%s', because there is at least one" 1174 " instance using it." % disk_template, 1175 errors.ECODE_STATE)
1176 1177 @staticmethod
1178 - def _CheckInstanceCommunicationNetwork(network, warning_fn):
1179 """Check whether an existing network is configured for instance 1180 communication. 1181 1182 Checks whether an existing network is configured with the 1183 parameters that are advisable for instance communication, and 1184 otherwise issue security warnings. 1185 1186 @type network: L{ganeti.objects.Network} 1187 @param network: L{ganeti.objects.Network} object whose 1188 configuration is being checked 1189 @type warning_fn: function 1190 @param warning_fn: function used to print warnings 1191 @rtype: None 1192 @return: None 1193 1194 """ 1195 def _MaybeWarn(err, val, default): 1196 if val != default: 1197 warning_fn("Supplied instance communication network '%s' %s '%s'," 1198 " this might pose a security risk (default is '%s').", 1199 network.name, err, val, default)
1200 1201 if network.network is None: 1202 raise errors.OpPrereqError("Supplied instance communication network '%s'" 1203 " must have an IPv4 network address.", 1204 network.name) 1205 1206 _MaybeWarn("has an IPv4 gateway", network.gateway, None) 1207 _MaybeWarn("has a non-standard IPv4 network address", network.network, 1208 constants.INSTANCE_COMMUNICATION_NETWORK4) 1209 _MaybeWarn("has an IPv6 gateway", network.gateway6, None) 1210 _MaybeWarn("has a non-standard IPv6 network address", network.network6, 1211 constants.INSTANCE_COMMUNICATION_NETWORK6) 1212 _MaybeWarn("has a non-standard MAC prefix", network.mac_prefix, 1213 constants.INSTANCE_COMMUNICATION_MAC_PREFIX)
1214
1215 - def CheckPrereq(self):
1216 """Check prerequisites. 1217 1218 This checks whether the given params don't conflict and 1219 if the given volume group is valid. 1220 1221 """ 1222 node_uuids = self.owned_locks(locking.LEVEL_NODE) 1223 self.cluster = cluster = self.cfg.GetClusterInfo() 1224 1225 vm_capable_node_uuids = [node.uuid 1226 for node in self.cfg.GetAllNodesInfo().values() 1227 if node.uuid in node_uuids and node.vm_capable] 1228 1229 (enabled_disk_templates, new_enabled_disk_templates, 1230 disabled_disk_templates) = self._GetDiskTemplateSets(cluster) 1231 self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates) 1232 1233 self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates, 1234 new_enabled_disk_templates) 1235 1236 if self.op.file_storage_dir is not None: 1237 CheckFileStoragePathVsEnabledDiskTemplates( 1238 self.LogWarning, self.op.file_storage_dir, enabled_disk_templates) 1239 1240 if self.op.shared_file_storage_dir is not None: 1241 CheckSharedFileStoragePathVsEnabledDiskTemplates( 1242 self.LogWarning, self.op.shared_file_storage_dir, 1243 enabled_disk_templates) 1244 1245 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates 1246 drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates 1247 self._CheckDrbdHelper(vm_capable_node_uuids, 1248 drbd_enabled, drbd_gets_enabled) 1249 1250 # validate params changes 1251 if self.op.beparams: 1252 objects.UpgradeBeParams(self.op.beparams) 1253 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) 1254 self.new_beparams = cluster.SimpleFillBE(self.op.beparams) 1255 1256 if self.op.ndparams: 1257 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) 1258 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams) 1259 1260 # TODO: we need a more general way to handle resetting 1261 # cluster-level parameters to default values 1262 if self.new_ndparams["oob_program"] == "": 1263 self.new_ndparams["oob_program"] = \ 1264 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM] 1265 1266 if self.op.hv_state: 1267 new_hv_state = MergeAndVerifyHvState(self.op.hv_state, 1268 self.cluster.hv_state_static) 1269 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values)) 1270 for hv, values in new_hv_state.items()) 1271 1272 if self.op.disk_state: 1273 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, 1274 self.cluster.disk_state_static) 1275 self.new_disk_state = \ 1276 dict((storage, dict((name, cluster.SimpleFillDiskState(values)) 1277 for name, values in svalues.items())) 1278 for storage, svalues in new_disk_state.items()) 1279 1280 self._CheckIpolicy(cluster, enabled_disk_templates) 1281 1282 if self.op.nicparams: 1283 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) 1284 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams) 1285 objects.NIC.CheckParameterSyntax(self.new_nicparams) 1286 nic_errors = [] 1287 1288 # check all instances for consistency 1289 for instance in self.cfg.GetAllInstancesInfo().values(): 1290 for nic_idx, nic in enumerate(instance.nics): 1291 params_copy = copy.deepcopy(nic.nicparams) 1292 params_filled = objects.FillDict(self.new_nicparams, params_copy) 1293 1294 # check parameter syntax 1295 try: 1296 objects.NIC.CheckParameterSyntax(params_filled) 1297 except errors.ConfigurationError, err: 1298 nic_errors.append("Instance %s, nic/%d: %s" % 1299 (instance.name, nic_idx, err)) 1300 1301 # if we're moving instances to routed, check that they have an ip 1302 target_mode = params_filled[constants.NIC_MODE] 1303 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip: 1304 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip" 1305 " address" % (instance.name, nic_idx)) 1306 if nic_errors: 1307 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" % 1308 "\n".join(nic_errors), errors.ECODE_INVAL) 1309 1310 # hypervisor list/parameters 1311 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {}) 1312 if self.op.hvparams: 1313 for hv_name, hv_dict in self.op.hvparams.items(): 1314 if hv_name not in self.new_hvparams: 1315 self.new_hvparams[hv_name] = hv_dict 1316 else: 1317 self.new_hvparams[hv_name].update(hv_dict) 1318 1319 # disk template parameters 1320 self.new_diskparams = objects.FillDict(cluster.diskparams, {}) 1321 if self.op.diskparams: 1322 for dt_name, dt_params in self.op.diskparams.items(): 1323 if dt_name not in self.new_diskparams: 1324 self.new_diskparams[dt_name] = dt_params 1325 else: 1326 self.new_diskparams[dt_name].update(dt_params) 1327 CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg) 1328 1329 # os hypervisor parameters 1330 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {}) 1331 if self.op.os_hvp: 1332 for os_name, hvs in self.op.os_hvp.items(): 1333 if os_name not in self.new_os_hvp: 1334 self.new_os_hvp[os_name] = hvs 1335 else: 1336 for hv_name, hv_dict in hvs.items(): 1337 if hv_dict is None: 1338 # Delete if it exists 1339 self.new_os_hvp[os_name].pop(hv_name, None) 1340 elif hv_name not in self.new_os_hvp[os_name]: 1341 self.new_os_hvp[os_name][hv_name] = hv_dict 1342 else: 1343 self.new_os_hvp[os_name][hv_name].update(hv_dict) 1344 1345 # os parameters 1346 self._BuildOSParams(cluster) 1347 1348 # changes to the hypervisor list 1349 if self.op.enabled_hypervisors is not None: 1350 for hv in self.op.enabled_hypervisors: 1351 # if the hypervisor doesn't already exist in the cluster 1352 # hvparams, we initialize it to empty, and then (in both 1353 # cases) we make sure to fill the defaults, as we might not 1354 # have a complete defaults list if the hypervisor wasn't 1355 # enabled before 1356 if hv not in new_hvp: 1357 new_hvp[hv] = {} 1358 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv]) 1359 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES) 1360 1361 if self.op.hvparams or self.op.enabled_hypervisors is not None: 1362 # either the enabled list has changed, or the parameters have, validate 1363 for hv_name, hv_params in self.new_hvparams.items(): 1364 if ((self.op.hvparams and hv_name in self.op.hvparams) or 1365 (self.op.enabled_hypervisors and 1366 hv_name in self.op.enabled_hypervisors)): 1367 # either this is a new hypervisor, or its parameters have changed 1368 hv_class = hypervisor.GetHypervisorClass(hv_name) 1369 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) 1370 hv_class.CheckParameterSyntax(hv_params) 1371 CheckHVParams(self, node_uuids, hv_name, hv_params) 1372 1373 self._CheckDiskTemplateConsistency() 1374 1375 if self.op.os_hvp: 1376 # no need to check any newly-enabled hypervisors, since the 1377 # defaults have already been checked in the above code-block 1378 for os_name, os_hvp in self.new_os_hvp.items(): 1379 for hv_name, hv_params in os_hvp.items(): 1380 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) 1381 # we need to fill in the new os_hvp on top of the actual hv_p 1382 cluster_defaults = self.new_hvparams.get(hv_name, {}) 1383 new_osp = objects.FillDict(cluster_defaults, hv_params) 1384 hv_class = hypervisor.GetHypervisorClass(hv_name) 1385 hv_class.CheckParameterSyntax(new_osp) 1386 CheckHVParams(self, node_uuids, hv_name, new_osp) 1387 1388 if self.op.default_iallocator: 1389 alloc_script = utils.FindFile(self.op.default_iallocator, 1390 constants.IALLOCATOR_SEARCH_PATH, 1391 os.path.isfile) 1392 if alloc_script is None: 1393 raise errors.OpPrereqError("Invalid default iallocator script '%s'" 1394 " specified" % self.op.default_iallocator, 1395 errors.ECODE_INVAL) 1396 1397 if self.op.instance_communication_network: 1398 network_name = self.op.instance_communication_network 1399 1400 try: 1401 network_uuid = self.cfg.LookupNetwork(network_name) 1402 except errors.OpPrereqError: 1403 network_uuid = None 1404 1405 if network_uuid is not None: 1406 network = self.cfg.GetNetwork(network_uuid) 1407 self._CheckInstanceCommunicationNetwork(network, self.LogWarning) 1408 1409 if self.op.compression_tools: 1410 CheckCompressionTools(self.op.compression_tools)
1411
1412 - def _BuildOSParams(self, cluster):
1413 "Calculate the new OS parameters for this operation." 1414 1415 def _GetNewParams(source, new_params): 1416 "Wrapper around GetUpdatedParams." 1417 if new_params is None: 1418 return source 1419 result = objects.FillDict(source, {}) # deep copy of source 1420 for os_name in new_params: 1421 result[os_name] = GetUpdatedParams(result.get(os_name, {}), 1422 new_params[os_name], 1423 use_none=True) 1424 if not result[os_name]: 1425 del result[os_name] # we removed all parameters 1426 return result
1427 1428 self.new_osp = _GetNewParams(cluster.osparams, 1429 self.op.osparams) 1430 self.new_osp_private = _GetNewParams(cluster.osparams_private_cluster, 1431 self.op.osparams_private_cluster) 1432 1433 # Remove os validity check 1434 changed_oses = (set(self.new_osp.keys()) | set(self.new_osp_private.keys())) 1435 for os_name in changed_oses: 1436 os_params = cluster.SimpleFillOS( 1437 os_name, 1438 self.new_osp.get(os_name, {}), 1439 os_params_private=self.new_osp_private.get(os_name, {}) 1440 ) 1441 # check the parameter validity (remote check) 1442 CheckOSParams(self, False, [self.cfg.GetMasterNode()], 1443 os_name, os_params, False) 1444
1445 - def _CheckDiskTemplateConsistency(self):
1446 """Check whether the disk templates that are going to be disabled 1447 are still in use by some instances. 1448 1449 """ 1450 if self.op.enabled_disk_templates: 1451 cluster = self.cfg.GetClusterInfo() 1452 instances = self.cfg.GetAllInstancesInfo() 1453 1454 disk_templates_to_remove = set(cluster.enabled_disk_templates) \ 1455 - set(self.op.enabled_disk_templates) 1456 for instance in instances.itervalues(): 1457 if instance.disk_template in disk_templates_to_remove: 1458 raise errors.OpPrereqError("Cannot disable disk template '%s'," 1459 " because instance '%s' is using it." % 1460 (instance.disk_template, instance.name))
1461
1462 - def _SetVgName(self, feedback_fn):
1463 """Determines and sets the new volume group name. 1464 1465 """ 1466 if self.op.vg_name is not None: 1467 new_volume = self.op.vg_name 1468 if not new_volume: 1469 new_volume = None 1470 if new_volume != self.cfg.GetVGName(): 1471 self.cfg.SetVGName(new_volume) 1472 else: 1473 feedback_fn("Cluster LVM configuration already in desired" 1474 " state, not changing")
1475
1476 - def _SetFileStorageDir(self, feedback_fn):
1477 """Set the file storage directory. 1478 1479 """ 1480 if self.op.file_storage_dir is not None: 1481 if self.cluster.file_storage_dir == self.op.file_storage_dir: 1482 feedback_fn("Global file storage dir already set to value '%s'" 1483 % self.cluster.file_storage_dir) 1484 else: 1485 self.cluster.file_storage_dir = self.op.file_storage_dir
1486
1487 - def _SetSharedFileStorageDir(self, feedback_fn):
1488 """Set the shared file storage directory. 1489 1490 """ 1491 if self.op.shared_file_storage_dir is not None: 1492 if self.cluster.shared_file_storage_dir == \ 1493 self.op.shared_file_storage_dir: 1494 feedback_fn("Global shared file storage dir already set to value '%s'" 1495 % self.cluster.shared_file_storage_dir) 1496 else: 1497 self.cluster.shared_file_storage_dir = self.op.shared_file_storage_dir
1498
1499 - def _SetDrbdHelper(self, feedback_fn):
1500 """Set the DRBD usermode helper. 1501 1502 """ 1503 if self.op.drbd_helper is not None: 1504 if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates: 1505 feedback_fn("Note that you specified a drbd user helper, but did not" 1506 " enable the drbd disk template.") 1507 new_helper = self.op.drbd_helper 1508 if not new_helper: 1509 new_helper = None 1510 if new_helper != self.cfg.GetDRBDHelper(): 1511 self.cfg.SetDRBDHelper(new_helper) 1512 else: 1513 feedback_fn("Cluster DRBD helper already in desired state," 1514 " not changing")
1515 1516 @staticmethod
1517 - def _EnsureInstanceCommunicationNetwork(cfg, network_name):
1518 """Ensure that the instance communication network exists and is 1519 connected to all groups. 1520 1521 The instance communication network given by L{network_name} it is 1522 created, if necessary, via the opcode 'OpNetworkAdd'. Also, the 1523 instance communication network is connected to all existing node 1524 groups, if necessary, via the opcode 'OpNetworkConnect'. 1525 1526 @type cfg: L{config.ConfigWriter} 1527 @param cfg: cluster configuration 1528 1529 @type network_name: string 1530 @param network_name: instance communication network name 1531 1532 @rtype: L{ganeti.cmdlib.ResultWithJobs} or L{None} 1533 @return: L{ganeti.cmdlib.ResultWithJobs} if the instance 1534 communication needs to be created or it needs to be 1535 connected to a group, otherwise L{None} 1536 1537 """ 1538 jobs = [] 1539 1540 try: 1541 network_uuid = cfg.LookupNetwork(network_name) 1542 network_exists = True 1543 except errors.OpPrereqError: 1544 network_exists = False 1545 1546 if not network_exists: 1547 jobs.append(AddInstanceCommunicationNetworkOp(network_name)) 1548 1549 for group_uuid in cfg.GetNodeGroupList(): 1550 group = cfg.GetNodeGroup(group_uuid) 1551 1552 if network_exists: 1553 network_connected = network_uuid in group.networks 1554 else: 1555 # The network was created asynchronously by the previous 1556 # opcode and, therefore, we don't have access to its 1557 # network_uuid. As a result, we assume that the network is 1558 # not connected to any group yet. 1559 network_connected = False 1560 1561 if not network_connected: 1562 op = ConnectInstanceCommunicationNetworkOp(group_uuid, network_name) 1563 jobs.append(op) 1564 1565 if jobs: 1566 return ResultWithJobs([jobs]) 1567 else: 1568 return None
1569 1570 @staticmethod
1571 - def _ModifyInstanceCommunicationNetwork(cfg, network_name, feedback_fn):
1572 """Update the instance communication network stored in the cluster 1573 configuration. 1574 1575 Compares the user-supplied instance communication network against 1576 the one stored in the Ganeti cluster configuration. If there is a 1577 change, the instance communication network may be possibly created 1578 and connected to all groups (see 1579 L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}). 1580 1581 @type cfg: L{config.ConfigWriter} 1582 @param cfg: cluster configuration 1583 1584 @type network_name: string 1585 @param network_name: instance communication network name 1586 1587 @type feedback_fn: function 1588 @param feedback_fn: see L{ganeti.cmdlist.base.LogicalUnit} 1589 1590 @rtype: L{LUClusterSetParams._EnsureInstanceCommunicationNetwork} or L{None} 1591 @return: see L{LUClusterSetParams._EnsureInstanceCommunicationNetwork} 1592 1593 """ 1594 config_network_name = cfg.GetInstanceCommunicationNetwork() 1595 1596 if network_name == config_network_name: 1597 feedback_fn("Instance communication network already is '%s', nothing to" 1598 " do." % network_name) 1599 else: 1600 try: 1601 cfg.LookupNetwork(config_network_name) 1602 feedback_fn("Previous instance communication network '%s'" 1603 " should be removed manually." % config_network_name) 1604 except errors.OpPrereqError: 1605 pass 1606 1607 if network_name: 1608 feedback_fn("Changing instance communication network to '%s', only new" 1609 " instances will be affected." 1610 % network_name) 1611 else: 1612 feedback_fn("Disabling instance communication network, only new" 1613 " instances will be affected.") 1614 1615 cfg.SetInstanceCommunicationNetwork(network_name) 1616 1617 if network_name: 1618 return LUClusterSetParams._EnsureInstanceCommunicationNetwork( 1619 cfg, 1620 network_name) 1621 else: 1622 return None
1623
1624 - def Exec(self, feedback_fn):
1625 """Change the parameters of the cluster. 1626 1627 """ 1628 # re-read the fresh configuration 1629 self.cluster = self.cfg.GetClusterInfo() 1630 if self.op.enabled_disk_templates: 1631 self.cluster.enabled_disk_templates = \ 1632 list(self.op.enabled_disk_templates) 1633 # save the changes 1634 self.cfg.Update(self.cluster, feedback_fn) 1635 1636 self._SetVgName(feedback_fn) 1637 1638 self.cluster = self.cfg.GetClusterInfo() 1639 self._SetFileStorageDir(feedback_fn) 1640 self._SetSharedFileStorageDir(feedback_fn) 1641 self.cfg.Update(self.cluster, feedback_fn) 1642 self._SetDrbdHelper(feedback_fn) 1643 1644 # re-read the fresh configuration again 1645 self.cluster = self.cfg.GetClusterInfo() 1646 1647 ensure_kvmd = False 1648 1649 active = constants.DATA_COLLECTOR_STATE_ACTIVE 1650 if self.op.enabled_data_collectors is not None: 1651 for name, val in self.op.enabled_data_collectors.items(): 1652 self.cluster.data_collectors[name][active] = val 1653 1654 if self.op.data_collector_interval: 1655 internal = constants.DATA_COLLECTOR_PARAMETER_INTERVAL 1656 for name, val in self.op.data_collector_interval.items(): 1657 self.cluster.data_collectors[name][internal] = int(val) 1658 1659 if self.op.hvparams: 1660 self.cluster.hvparams = self.new_hvparams 1661 if self.op.os_hvp: 1662 self.cluster.os_hvp = self.new_os_hvp 1663 if self.op.enabled_hypervisors is not None: 1664 self.cluster.hvparams = self.new_hvparams 1665 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors 1666 ensure_kvmd = True 1667 if self.op.beparams: 1668 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams 1669 if self.op.nicparams: 1670 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams 1671 if self.op.ipolicy: 1672 self.cluster.ipolicy = self.new_ipolicy 1673 if self.op.osparams: 1674 self.cluster.osparams = self.new_osp 1675 if self.op.osparams_private_cluster: 1676 self.cluster.osparams_private_cluster = self.new_osp_private 1677 if self.op.ndparams: 1678 self.cluster.ndparams = self.new_ndparams 1679 if self.op.diskparams: 1680 self.cluster.diskparams = self.new_diskparams 1681 if self.op.hv_state: 1682 self.cluster.hv_state_static = self.new_hv_state 1683 if self.op.disk_state: 1684 self.cluster.disk_state_static = self.new_disk_state 1685 1686 if self.op.candidate_pool_size is not None: 1687 self.cluster.candidate_pool_size = self.op.candidate_pool_size 1688 # we need to update the pool size here, otherwise the save will fail 1689 AdjustCandidatePool(self, []) 1690 1691 if self.op.max_running_jobs is not None: 1692 self.cluster.max_running_jobs = self.op.max_running_jobs 1693 1694 if self.op.max_tracked_jobs is not None: 1695 self.cluster.max_tracked_jobs = self.op.max_tracked_jobs 1696 1697 if self.op.maintain_node_health is not None: 1698 self.cluster.maintain_node_health = self.op.maintain_node_health 1699 1700 if self.op.modify_etc_hosts is not None: 1701 self.cluster.modify_etc_hosts = self.op.modify_etc_hosts 1702 1703 if self.op.prealloc_wipe_disks is not None: 1704 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks 1705 1706 if self.op.add_uids is not None: 1707 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids) 1708 1709 if self.op.remove_uids is not None: 1710 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids) 1711 1712 if self.op.uid_pool is not None: 1713 self.cluster.uid_pool = self.op.uid_pool 1714 1715 if self.op.default_iallocator is not None: 1716 self.cluster.default_iallocator = self.op.default_iallocator 1717 1718 if self.op.default_iallocator_params is not None: 1719 self.cluster.default_iallocator_params = self.op.default_iallocator_params 1720 1721 if self.op.reserved_lvs is not None: 1722 self.cluster.reserved_lvs = self.op.reserved_lvs 1723 1724 if self.op.use_external_mip_script is not None: 1725 self.cluster.use_external_mip_script = self.op.use_external_mip_script 1726 1727 if self.op.enabled_user_shutdown is not None and \ 1728 self.cluster.enabled_user_shutdown != self.op.enabled_user_shutdown: 1729 self.cluster.enabled_user_shutdown = self.op.enabled_user_shutdown 1730 ensure_kvmd = True 1731 1732 def helper_os(aname, mods, desc): 1733 desc += " OS list" 1734 lst = getattr(self.cluster, aname) 1735 for key, val in mods: 1736 if key == constants.DDM_ADD: 1737 if val in lst: 1738 feedback_fn("OS %s already in %s, ignoring" % (val, desc)) 1739 else: 1740 lst.append(val) 1741 elif key == constants.DDM_REMOVE: 1742 if val in lst: 1743 lst.remove(val) 1744 else: 1745 feedback_fn("OS %s not found in %s, ignoring" % (val, desc)) 1746 else: 1747 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1748 1749 if self.op.hidden_os: 1750 helper_os("hidden_os", self.op.hidden_os, "hidden") 1751 1752 if self.op.blacklisted_os: 1753 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted") 1754 1755 if self.op.mac_prefix: 1756 self.cluster.mac_prefix = self.op.mac_prefix 1757 1758 if self.op.master_netdev: 1759 master_params = self.cfg.GetMasterNetworkParameters() 1760 ems = self.cfg.GetUseExternalMipScript() 1761 feedback_fn("Shutting down master ip on the current netdev (%s)" % 1762 self.cluster.master_netdev) 1763 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid, 1764 master_params, ems) 1765 if not self.op.force: 1766 result.Raise("Could not disable the master ip") 1767 else: 1768 if result.fail_msg: 1769 msg = ("Could not disable the master ip (continuing anyway): %s" % 1770 result.fail_msg) 1771 feedback_fn(msg) 1772 feedback_fn("Changing master_netdev from %s to %s" % 1773 (master_params.netdev, self.op.master_netdev)) 1774 self.cluster.master_netdev = self.op.master_netdev 1775 1776 if self.op.master_netmask: 1777 master_params = self.cfg.GetMasterNetworkParameters() 1778 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask) 1779 result = self.rpc.call_node_change_master_netmask( 1780 master_params.uuid, master_params.netmask, 1781 self.op.master_netmask, master_params.ip, 1782 master_params.netdev) 1783 result.Warn("Could not change the master IP netmask", feedback_fn) 1784 self.cluster.master_netmask = self.op.master_netmask 1785 1786 if self.op.install_image: 1787 self.cluster.install_image = self.op.install_image 1788 1789 if self.op.zeroing_image is not None: 1790 CheckImageValidity(self.op.zeroing_image, 1791 "Zeroing image must be an absolute path or a URL") 1792 self.cluster.zeroing_image = self.op.zeroing_image 1793 1794 self.cfg.Update(self.cluster, feedback_fn) 1795 1796 if self.op.master_netdev: 1797 master_params = self.cfg.GetMasterNetworkParameters() 1798 feedback_fn("Starting the master ip on the new master netdev (%s)" % 1799 self.op.master_netdev) 1800 ems = self.cfg.GetUseExternalMipScript() 1801 result = self.rpc.call_node_activate_master_ip(master_params.uuid, 1802 master_params, ems) 1803 result.Warn("Could not re-enable the master ip on the master," 1804 " please restart manually", self.LogWarning) 1805 1806 # Even though 'self.op.enabled_user_shutdown' is being tested 1807 # above, the RPCs can only be done after 'self.cfg.Update' because 1808 # this will update the cluster object and sync 'Ssconf', and kvmd 1809 # uses 'Ssconf'. 1810 if ensure_kvmd: 1811 EnsureKvmdOnNodes(self, feedback_fn) 1812 1813 if self.op.compression_tools is not None: 1814 self.cfg.SetCompressionTools(self.op.compression_tools) 1815 1816 network_name = self.op.instance_communication_network 1817 if network_name is not None: 1818 return self._ModifyInstanceCommunicationNetwork(self.cfg, 1819 network_name, feedback_fn) 1820 else: 1821 return None 1822
1823 1824 -class LUClusterVerify(NoHooksLU):
1825 """Submits all jobs necessary to verify the cluster. 1826 1827 """ 1828 REQ_BGL = False 1829
1830 - def ExpandNames(self):
1831 self.needed_locks = {}
1832
1833 - def Exec(self, feedback_fn):
1834 jobs = [] 1835 1836 if self.op.group_name: 1837 groups = [self.op.group_name] 1838 depends_fn = lambda: None 1839 else: 1840 groups = self.cfg.GetNodeGroupList() 1841 1842 # Verify global configuration 1843 jobs.append([ 1844 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors), 1845 ]) 1846 1847 # Always depend on global verification 1848 depends_fn = lambda: [(-len(jobs), [])] 1849 1850 jobs.extend( 1851 [opcodes.OpClusterVerifyGroup(group_name=group, 1852 ignore_errors=self.op.ignore_errors, 1853 depends=depends_fn(), 1854 verify_clutter=self.op.verify_clutter)] 1855 for group in groups) 1856 1857 # Fix up all parameters 1858 for op in itertools.chain(*jobs): # pylint: disable=W0142 1859 op.debug_simulate_errors = self.op.debug_simulate_errors 1860 op.verbose = self.op.verbose 1861 op.error_codes = self.op.error_codes 1862 try: 1863 op.skip_checks = self.op.skip_checks 1864 except AttributeError: 1865 assert not isinstance(op, opcodes.OpClusterVerifyGroup) 1866 1867 return ResultWithJobs(jobs)
1868
1869 1870 -class _VerifyErrors(object):
1871 """Mix-in for cluster/group verify LUs. 1872 1873 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects 1874 self.op and self._feedback_fn to be available.) 1875 1876 """ 1877 1878 ETYPE_FIELD = "code" 1879 ETYPE_ERROR = constants.CV_ERROR 1880 ETYPE_WARNING = constants.CV_WARNING 1881
1882 - def _Error(self, ecode, item, msg, *args, **kwargs):
1883 """Format an error message. 1884 1885 Based on the opcode's error_codes parameter, either format a 1886 parseable error code, or a simpler error string. 1887 1888 This must be called only from Exec and functions called from Exec. 1889 1890 """ 1891 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) 1892 itype, etxt, _ = ecode 1893 # If the error code is in the list of ignored errors, demote the error to a 1894 # warning 1895 if etxt in self.op.ignore_errors: # pylint: disable=E1101 1896 ltype = self.ETYPE_WARNING 1897 # first complete the msg 1898 if args: 1899 msg = msg % args 1900 # then format the whole message 1901 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101 1902 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) 1903 else: 1904 if item: 1905 item = " " + item 1906 else: 1907 item = "" 1908 msg = "%s: %s%s: %s" % (ltype, itype, item, msg) 1909 # and finally report it via the feedback_fn 1910 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101 1911 # do not mark the operation as failed for WARN cases only 1912 if ltype == self.ETYPE_ERROR: 1913 self.bad = True
1914
1915 - def _ErrorIf(self, cond, *args, **kwargs):
1916 """Log an error message if the passed condition is True. 1917 1918 """ 1919 if (bool(cond) 1920 or self.op.debug_simulate_errors): # pylint: disable=E1101 1921 self._Error(*args, **kwargs)
1922
1923 1924 -def _GetAllHypervisorParameters(cluster, instances):
1925 """Compute the set of all hypervisor parameters. 1926 1927 @type cluster: L{objects.Cluster} 1928 @param cluster: the cluster object 1929 @param instances: list of L{objects.Instance} 1930 @param instances: additional instances from which to obtain parameters 1931 @rtype: list of (origin, hypervisor, parameters) 1932 @return: a list with all parameters found, indicating the hypervisor they 1933 apply to, and the origin (can be "cluster", "os X", or "instance Y") 1934 1935 """ 1936 hvp_data = [] 1937 1938 for hv_name in cluster.enabled_hypervisors: 1939 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name))) 1940 1941 for os_name, os_hvp in cluster.os_hvp.items(): 1942 for hv_name, hv_params in os_hvp.items(): 1943 if hv_params: 1944 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name) 1945 hvp_data.append(("os %s" % os_name, hv_name, full_params)) 1946 1947 # TODO: collapse identical parameter values in a single one 1948 for instance in instances: 1949 if instance.hvparams: 1950 hvp_data.append(("instance %s" % instance.name, instance.hypervisor, 1951 cluster.FillHV(instance))) 1952 1953 return hvp_data
1954
1955 1956 -class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1957 """Verifies the cluster config. 1958 1959 """ 1960 REQ_BGL = False 1961
1962 - def _VerifyHVP(self, hvp_data):
1963 """Verifies locally the syntax of the hypervisor parameters. 1964 1965 """ 1966 for item, hv_name, hv_params in hvp_data: 1967 msg = ("hypervisor %s parameters syntax check (source %s): %%s" % 1968 (item, hv_name)) 1969 try: 1970 hv_class = hypervisor.GetHypervisorClass(hv_name) 1971 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) 1972 hv_class.CheckParameterSyntax(hv_params) 1973 except errors.GenericError, err: 1974 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1975
1976 - def ExpandNames(self):
1977 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET) 1978 self.share_locks = ShareAll()
1979
1980 - def CheckPrereq(self):
1981 """Check prerequisites. 1982 1983 """ 1984 # Retrieve all information 1985 self.all_group_info = self.cfg.GetAllNodeGroupsInfo() 1986 self.all_node_info = self.cfg.GetAllNodesInfo() 1987 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1988
1989 - def Exec(self, feedback_fn):
1990 """Verify integrity of cluster, performing various test on nodes. 1991 1992 """ 1993 self.bad = False 1994 self._feedback_fn = feedback_fn 1995 1996 feedback_fn("* Verifying cluster config") 1997 1998 for msg in self.cfg.VerifyConfig(): 1999 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg) 2000 2001 feedback_fn("* Verifying cluster certificate files") 2002 2003 for cert_filename in pathutils.ALL_CERT_FILES: 2004 (errcode, msg) = utils.VerifyCertificate(cert_filename) 2005 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode) 2006 2007 self._ErrorIf(not utils.CanRead(constants.LUXID_USER, 2008 pathutils.NODED_CERT_FILE), 2009 constants.CV_ECLUSTERCERT, 2010 None, 2011 pathutils.NODED_CERT_FILE + " must be accessible by the " + 2012 constants.LUXID_USER + " user") 2013 2014 feedback_fn("* Verifying hypervisor parameters") 2015 2016 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(), 2017 self.all_inst_info.values())) 2018 2019 feedback_fn("* Verifying all nodes belong to an existing group") 2020 2021 # We do this verification here because, should this bogus circumstance 2022 # occur, it would never be caught by VerifyGroup, which only acts on 2023 # nodes/instances reachable from existing node groups. 2024 2025 dangling_nodes = set(node for node in self.all_node_info.values() 2026 if node.group not in self.all_group_info) 2027 2028 dangling_instances = {} 2029 no_node_instances = [] 2030 2031 for inst in self.all_inst_info.values(): 2032 if inst.primary_node in [node.uuid for node in dangling_nodes]: 2033 dangling_instances.setdefault(inst.primary_node, []).append(inst) 2034 elif inst.primary_node not in self.all_node_info: 2035 no_node_instances.append(inst) 2036 2037 pretty_dangling = [ 2038 "%s (%s)" % 2039 (node.name, 2040 utils.CommaJoin(inst.name for 2041 inst in dangling_instances.get(node.uuid, []))) 2042 for node in dangling_nodes] 2043 2044 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES, 2045 None, 2046 "the following nodes (and their instances) belong to a non" 2047 " existing group: %s", utils.CommaJoin(pretty_dangling)) 2048 2049 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST, 2050 None, 2051 "the following instances have a non-existing primary-node:" 2052 " %s", utils.CommaJoin(inst.name for 2053 inst in no_node_instances)) 2054 2055 return not self.bad
2056
2057 2058 -class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
2059 """Verifies the status of a node group. 2060 2061 """ 2062 HPATH = "cluster-verify" 2063 HTYPE = constants.HTYPE_CLUSTER 2064 REQ_BGL = False 2065 2066 _HOOKS_INDENT_RE = re.compile("^", re.M) 2067
2068 - class NodeImage(object):
2069 """A class representing the logical and physical status of a node. 2070 2071 @type uuid: string 2072 @ivar uuid: the node UUID to which this object refers 2073 @ivar volumes: a structure as returned from 2074 L{ganeti.backend.GetVolumeList} (runtime) 2075 @ivar instances: a list of running instances (runtime) 2076 @ivar pinst: list of configured primary instances (config) 2077 @ivar sinst: list of configured secondary instances (config) 2078 @ivar sbp: dictionary of {primary-node: list of instances} for all 2079 instances for which this node is secondary (config) 2080 @ivar mfree: free memory, as reported by hypervisor (runtime) 2081 @ivar dfree: free disk, as reported by the node (runtime) 2082 @ivar offline: the offline status (config) 2083 @type rpc_fail: boolean 2084 @ivar rpc_fail: whether the RPC verify call was successfull (overall, 2085 not whether the individual keys were correct) (runtime) 2086 @type lvm_fail: boolean 2087 @ivar lvm_fail: whether the RPC call didn't return valid LVM data 2088 @type hyp_fail: boolean 2089 @ivar hyp_fail: whether the RPC call didn't return the instance list 2090 @type ghost: boolean 2091 @ivar ghost: whether this is a known node or not (config) 2092 @type os_fail: boolean 2093 @ivar os_fail: whether the RPC call didn't return valid OS data 2094 @type oslist: list 2095 @ivar oslist: list of OSes as diagnosed by DiagnoseOS 2096 @type vm_capable: boolean 2097 @ivar vm_capable: whether the node can host instances 2098 @type pv_min: float 2099 @ivar pv_min: size in MiB of the smallest PVs 2100 @type pv_max: float 2101 @ivar pv_max: size in MiB of the biggest PVs 2102 2103 """
2104 - def __init__(self, offline=False, uuid=None, vm_capable=True):
2105 self.uuid = uuid 2106 self.volumes = {} 2107 self.instances = [] 2108 self.pinst = [] 2109 self.sinst = [] 2110 self.sbp = {} 2111 self.mfree = 0 2112 self.dfree = 0 2113 self.offline = offline 2114 self.vm_capable = vm_capable 2115 self.rpc_fail = False 2116 self.lvm_fail = False 2117 self.hyp_fail = False 2118 self.ghost = False 2119 self.os_fail = False 2120 self.oslist = {} 2121 self.pv_min = None 2122 self.pv_max = None
2123
2124 - def ExpandNames(self):
2125 # This raises errors.OpPrereqError on its own: 2126 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) 2127 2128 # Get instances in node group; this is unsafe and needs verification later 2129 inst_uuids = \ 2130 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True) 2131 2132 self.needed_locks = { 2133 locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids), 2134 locking.LEVEL_NODEGROUP: [self.group_uuid], 2135 locking.LEVEL_NODE: [], 2136 2137 # This opcode is run by watcher every five minutes and acquires all nodes 2138 # for a group. It doesn't run for a long time, so it's better to acquire 2139 # the node allocation lock as well. 2140 locking.LEVEL_NODE_ALLOC: locking.ALL_SET, 2141 } 2142 2143 self.share_locks = ShareAll()
2144
2145 - def DeclareLocks(self, level):
2146 if level == locking.LEVEL_NODE: 2147 # Get members of node group; this is unsafe and needs verification later 2148 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members) 2149 2150 # In Exec(), we warn about mirrored instances that have primary and 2151 # secondary living in separate node groups. To fully verify that 2152 # volumes for these instances are healthy, we will need to do an 2153 # extra call to their secondaries. We ensure here those nodes will 2154 # be locked. 2155 for inst_name in self.owned_locks(locking.LEVEL_INSTANCE): 2156 # Important: access only the instances whose lock is owned 2157 instance = self.cfg.GetInstanceInfoByName(inst_name) 2158 if instance.disk_template in constants.DTS_INT_MIRROR: 2159 nodes.update(self.cfg.GetInstanceSecondaryNodes(instance.uuid)) 2160 2161 self.needed_locks[locking.LEVEL_NODE] = nodes
2162
2163 - def CheckPrereq(self):
2164 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) 2165 self.group_info = self.cfg.GetNodeGroup(self.group_uuid) 2166 2167 group_node_uuids = set(self.group_info.members) 2168 group_inst_uuids = \ 2169 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True) 2170 2171 unlocked_node_uuids = \ 2172 group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE)) 2173 2174 unlocked_inst_uuids = \ 2175 group_inst_uuids.difference( 2176 [self.cfg.GetInstanceInfoByName(name).uuid 2177 for name in self.owned_locks(locking.LEVEL_INSTANCE)]) 2178 2179 if unlocked_node_uuids: 2180 raise errors.OpPrereqError( 2181 "Missing lock for nodes: %s" % 2182 utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)), 2183 errors.ECODE_STATE) 2184 2185 if unlocked_inst_uuids: 2186 raise errors.OpPrereqError( 2187 "Missing lock for instances: %s" % 2188 utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)), 2189 errors.ECODE_STATE) 2190 2191 self.all_node_info = self.cfg.GetAllNodesInfo() 2192 self.all_inst_info = self.cfg.GetAllInstancesInfo() 2193 2194 self.my_node_uuids = group_node_uuids 2195 self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid]) 2196 for node_uuid in group_node_uuids) 2197 2198 self.my_inst_uuids = group_inst_uuids 2199 self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid]) 2200 for inst_uuid in group_inst_uuids) 2201 2202 # We detect here the nodes that will need the extra RPC calls for verifying 2203 # split LV volumes; they should be locked. 2204 extra_lv_nodes = set() 2205 2206 for inst in self.my_inst_info.values(): 2207 if inst.disk_template in constants.DTS_INT_MIRROR: 2208 inst_nodes = self.cfg.GetInstanceNodes(inst.uuid) 2209 for nuuid in inst_nodes: 2210 if self.all_node_info[nuuid].group != self.group_uuid: 2211 extra_lv_nodes.add(nuuid) 2212 2213 unlocked_lv_nodes = \ 2214 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE)) 2215 2216 if unlocked_lv_nodes: 2217 raise errors.OpPrereqError("Missing node locks for LV check: %s" % 2218 utils.CommaJoin(unlocked_lv_nodes), 2219 errors.ECODE_STATE) 2220 self.extra_lv_nodes = list(extra_lv_nodes)
2221
2222 - def _VerifyNode(self, ninfo, nresult):
2223 """Perform some basic validation on data returned from a node. 2224 2225 - check the result data structure is well formed and has all the 2226 mandatory fields 2227 - check ganeti version 2228 2229 @type ninfo: L{objects.Node} 2230 @param ninfo: the node to check 2231 @param nresult: the results from the node 2232 @rtype: boolean 2233 @return: whether overall this call was successful (and we can expect 2234 reasonable values in the respose) 2235 2236 """ 2237 # main result, nresult should be a non-empty dict 2238 test = not nresult or not isinstance(nresult, dict) 2239 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name, 2240 "unable to verify node: no data returned") 2241 if test: 2242 return False 2243 2244 # compares ganeti version 2245 local_version = constants.PROTOCOL_VERSION 2246 remote_version = nresult.get("version", None) 2247 test = not (remote_version and 2248 isinstance(remote_version, (list, tuple)) and 2249 len(remote_version) == 2) 2250 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name, 2251 "connection to node returned invalid data") 2252 if test: 2253 return False 2254 2255 test = local_version != remote_version[0] 2256 self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name, 2257 "incompatible protocol versions: master %s," 2258 " node %s", local_version, remote_version[0]) 2259 if test: 2260 return False 2261 2262 # node seems compatible, we can actually try to look into its results 2263 2264 # full package version 2265 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1], 2266 constants.CV_ENODEVERSION, ninfo.name, 2267 "software version mismatch: master %s, node %s", 2268 constants.RELEASE_VERSION, remote_version[1], 2269 code=self.ETYPE_WARNING) 2270 2271 hyp_result = nresult.get(constants.NV_HYPERVISOR, None) 2272 if ninfo.vm_capable and isinstance(hyp_result, dict): 2273 for hv_name, hv_result in hyp_result.iteritems(): 2274 test = hv_result is not None 2275 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name, 2276 "hypervisor %s verify failure: '%s'", hv_name, hv_result) 2277 2278 hvp_result = nresult.get(constants.NV_HVPARAMS, None) 2279 if ninfo.vm_capable and isinstance(hvp_result, list): 2280 for item, hv_name, hv_result in hvp_result: 2281 self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name, 2282 "hypervisor %s parameter verify failure (source %s): %s", 2283 hv_name, item, hv_result) 2284 2285 test = nresult.get(constants.NV_NODESETUP, 2286 ["Missing NODESETUP results"]) 2287 self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name, 2288 "node setup error: %s", "; ".join(test)) 2289 2290 return True
2291
2292 - def _VerifyNodeTime(self, ninfo, nresult, 2293 nvinfo_starttime, nvinfo_endtime):
2294 """Check the node time. 2295 2296 @type ninfo: L{objects.Node} 2297 @param ninfo: the node to check 2298 @param nresult: the remote results for the node 2299 @param nvinfo_starttime: the start time of the RPC call 2300 @param nvinfo_endtime: the end time of the RPC call 2301 2302 """ 2303 ntime = nresult.get(constants.NV_TIME, None) 2304 try: 2305 ntime_merged = utils.MergeTime(ntime) 2306 except (ValueError, TypeError): 2307 self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name, 2308 "Node returned invalid time") 2309 return 2310 2311 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW): 2312 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged) 2313 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW): 2314 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime) 2315 else: 2316 ntime_diff = None 2317 2318 self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name, 2319 "Node time diverges by at least %s from master node time", 2320 ntime_diff)
2321
2322 - def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
2323 """Check the node LVM results and update info for cross-node checks. 2324 2325 @type ninfo: L{objects.Node} 2326 @param ninfo: the node to check 2327 @param nresult: the remote results for the node 2328 @param vg_name: the configured VG name 2329 @type nimg: L{NodeImage} 2330 @param nimg: node image 2331 2332 """ 2333 if vg_name is None: 2334 return 2335 2336 # checks vg existence and size > 20G 2337 vglist = nresult.get(constants.NV_VGLIST, None) 2338 test = not vglist 2339 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name, 2340 "unable to check volume groups") 2341 if not test: 2342 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, 2343 constants.MIN_VG_SIZE) 2344 self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus) 2345 2346 # Check PVs 2347 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage) 2348 for em in errmsgs: 2349 self._Error(constants.CV_ENODELVM, ninfo.name, em) 2350 if pvminmax is not None: 2351 (nimg.pv_min, nimg.pv_max) = pvminmax
2352
2353 - def _VerifyGroupDRBDVersion(self, node_verify_infos):
2354 """Check cross-node DRBD version consistency. 2355 2356 @type node_verify_infos: dict 2357 @param node_verify_infos: infos about nodes as returned from the 2358 node_verify call. 2359 2360 """ 2361 node_versions = {} 2362 for node_uuid, ndata in node_verify_infos.items(): 2363 nresult = ndata.payload 2364 if nresult: 2365 version = nresult.get(constants.NV_DRBDVERSION, None) 2366 if version: 2367 node_versions[node_uuid] = version 2368 2369 if len(set(node_versions.values())) > 1: 2370 for node_uuid, version in sorted(node_versions.items()): 2371 msg = "DRBD version mismatch: %s" % version 2372 self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg, 2373 code=self.ETYPE_WARNING)
2374
2375 - def _VerifyGroupLVM(self, node_image, vg_name):
2376 """Check cross-node consistency in LVM. 2377 2378 @type node_image: dict 2379 @param node_image: info about nodes, mapping from node to names to 2380 L{NodeImage} objects 2381 @param vg_name: the configured VG name 2382 2383 """ 2384 if vg_name is None: 2385 return 2386 2387 # Only exclusive storage needs this kind of checks 2388 if not self._exclusive_storage: 2389 return 2390 2391 # exclusive_storage wants all PVs to have the same size (approximately), 2392 # if the smallest and the biggest ones are okay, everything is fine. 2393 # pv_min is None iff pv_max is None 2394 vals = filter((lambda ni: ni.pv_min is not None), node_image.values()) 2395 if not vals: 2396 return 2397 (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals) 2398 (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals) 2399 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax) 2400 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name, 2401 "PV sizes differ too much in the group; smallest (%s MB) is" 2402 " on %s, biggest (%s MB) is on %s", 2403 pvmin, self.cfg.GetNodeName(minnode_uuid), 2404 pvmax, self.cfg.GetNodeName(maxnode_uuid))
2405
2406 - def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2407 """Check the node bridges. 2408 2409 @type ninfo: L{objects.Node} 2410 @param ninfo: the node to check 2411 @param nresult: the remote results for the node 2412 @param bridges: the expected list of bridges 2413 2414 """ 2415 if not bridges: 2416 return 2417 2418 missing = nresult.get(constants.NV_BRIDGES, None) 2419 test = not isinstance(missing, list) 2420 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name, 2421 "did not return valid bridge information") 2422 if not test: 2423 self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name, 2424 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2425
2426 - def _VerifyNodeUserScripts(self, ninfo, nresult):
2427 """Check the results of user scripts presence and executability on the node 2428 2429 @type ninfo: L{objects.Node} 2430 @param ninfo: the node to check 2431 @param nresult: the remote results for the node 2432 2433 """ 2434 test = not constants.NV_USERSCRIPTS in nresult 2435 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name, 2436 "did not return user scripts information") 2437 2438 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None) 2439 if not test: 2440 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name, 2441 "user scripts not present or not executable: %s" % 2442 utils.CommaJoin(sorted(broken_scripts)))
2443
2444 - def _VerifyNodeNetwork(self, ninfo, nresult):
2445 """Check the node network connectivity results. 2446 2447 @type ninfo: L{objects.Node} 2448 @param ninfo: the node to check 2449 @param nresult: the remote results for the node 2450 2451 """ 2452 test = constants.NV_NODELIST not in nresult 2453 self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name, 2454 "node hasn't returned node ssh connectivity data") 2455 if not test: 2456 if nresult[constants.NV_NODELIST]: 2457 for a_node, a_msg in nresult[constants.NV_NODELIST].items(): 2458 self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name, 2459 "ssh communication with node '%s': %s", a_node, a_msg) 2460 2461 test = constants.NV_NODENETTEST not in nresult 2462 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name, 2463 "node hasn't returned node tcp connectivity data") 2464 if not test: 2465 if nresult[constants.NV_NODENETTEST]: 2466 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys()) 2467 for anode in nlist: 2468 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, 2469 "tcp communication with node '%s': %s", 2470 anode, nresult[constants.NV_NODENETTEST][anode]) 2471 2472 test = constants.NV_MASTERIP not in nresult 2473 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name, 2474 "node hasn't returned node master IP reachability data") 2475 if not test: 2476 if not nresult[constants.NV_MASTERIP]: 2477 if ninfo.uuid == self.master_node: 2478 msg = "the master node cannot reach the master IP (not configured?)" 2479 else: 2480 msg = "cannot reach the master IP" 2481 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2482
2483 - def _VerifyInstance(self, instance, node_image, diskstatus):
2484 """Verify an instance. 2485 2486 This function checks to see if the required block devices are 2487 available on the instance's node, and that the nodes are in the correct 2488 state. 2489 2490 """ 2491 pnode_uuid = instance.primary_node 2492 pnode_img = node_image[pnode_uuid] 2493 groupinfo = self.cfg.GetAllNodeGroupsInfo() 2494 2495 node_vol_should = {} 2496 self.cfg.GetInstanceLVsByNode(instance.uuid, lvmap=node_vol_should) 2497 2498 cluster = self.cfg.GetClusterInfo() 2499 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, 2500 self.group_info) 2501 err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg) 2502 self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name, 2503 utils.CommaJoin(err), code=self.ETYPE_WARNING) 2504 2505 for node_uuid in node_vol_should: 2506 n_img = node_image[node_uuid] 2507 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: 2508 # ignore missing volumes on offline or broken nodes 2509 continue 2510 for volume in node_vol_should[node_uuid]: 2511 test = volume not in n_img.volumes 2512 self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name, 2513 "volume %s missing on node %s", volume, 2514 self.cfg.GetNodeName(node_uuid)) 2515 2516 if instance.admin_state == constants.ADMINST_UP: 2517 test = instance.uuid not in pnode_img.instances and not pnode_img.offline 2518 self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name, 2519 "instance not running on its primary node %s", 2520 self.cfg.GetNodeName(pnode_uuid)) 2521 self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, 2522 instance.name, "instance is marked as running and lives on" 2523 " offline node %s", self.cfg.GetNodeName(pnode_uuid)) 2524 2525 diskdata = [(nname, success, status, idx) 2526 for (nname, disks) in diskstatus.items() 2527 for idx, (success, status) in enumerate(disks)] 2528 2529 for nname, success, bdev_status, idx in diskdata: 2530 # the 'ghost node' construction in Exec() ensures that we have a 2531 # node here 2532 snode = node_image[nname] 2533 bad_snode = snode.ghost or snode.offline 2534 self._ErrorIf(instance.disks_active and 2535 not success and not bad_snode, 2536 constants.CV_EINSTANCEFAULTYDISK, instance.name, 2537 "couldn't retrieve status for disk/%s on %s: %s", 2538 idx, self.cfg.GetNodeName(nname), bdev_status) 2539 2540 if instance.disks_active and success and bdev_status.is_degraded: 2541 msg = "disk/%s on %s is degraded" % (idx, self.cfg.GetNodeName(nname)) 2542 2543 code = self.ETYPE_ERROR 2544 accepted_lds = [constants.LDS_OKAY, constants.LDS_SYNC] 2545 2546 if bdev_status.ldisk_status in accepted_lds: 2547 code = self.ETYPE_WARNING 2548 2549 msg += "; local disk state is '%s'" % \ 2550 constants.LDS_NAMES[bdev_status.ldisk_status] 2551 2552 self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg, 2553 code=code) 2554 2555 self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline, 2556 constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid), 2557 "instance %s, connection to primary node failed", 2558 instance.name) 2559 2560 secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid) 2561 self._ErrorIf(len(secondary_nodes) > 1, 2562 constants.CV_EINSTANCELAYOUT, instance.name, 2563 "instance has multiple secondary nodes: %s", 2564 utils.CommaJoin(secondary_nodes), 2565 code=self.ETYPE_WARNING) 2566 2567 inst_nodes = self.cfg.GetInstanceNodes(instance.uuid) 2568 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, inst_nodes) 2569 if any(es_flags.values()): 2570 if instance.disk_template not in constants.DTS_EXCL_STORAGE: 2571 # Disk template not compatible with exclusive_storage: no instance 2572 # node should have the flag set 2573 es_nodes = [n 2574 for (n, es) in es_flags.items() 2575 if es] 2576 self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name, 2577 "instance has template %s, which is not supported on nodes" 2578 " that have exclusive storage set: %s", 2579 instance.disk_template, 2580 utils.CommaJoin(self.cfg.GetNodeNames(es_nodes))) 2581 for (idx, disk) in enumerate(self.cfg.GetInstanceDisks(instance.uuid)): 2582 self._ErrorIf(disk.spindles is None, 2583 constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name, 2584 "number of spindles not configured for disk %s while" 2585 " exclusive storage is enabled, try running" 2586 " gnt-cluster repair-disk-sizes", idx) 2587 2588 if instance.disk_template in constants.DTS_INT_MIRROR: 2589 instance_nodes = utils.NiceSort(inst_nodes) 2590 instance_groups = {} 2591 2592 for node_uuid in instance_nodes: 2593 instance_groups.setdefault(self.all_node_info[node_uuid].group, 2594 []).append(node_uuid) 2595 2596 pretty_list = [ 2597 "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)), 2598 groupinfo[group].name) 2599 # Sort so that we always list the primary node first. 2600 for group, nodes in sorted(instance_groups.items(), 2601 key=lambda (_, nodes): pnode_uuid in nodes, 2602 reverse=True)] 2603 2604 self._ErrorIf(len(instance_groups) > 1, 2605 constants.CV_EINSTANCESPLITGROUPS, 2606 instance.name, "instance has primary and secondary nodes in" 2607 " different groups: %s", utils.CommaJoin(pretty_list), 2608 code=self.ETYPE_WARNING) 2609 2610 inst_nodes_offline = [] 2611 for snode in secondary_nodes: 2612 s_img = node_image[snode] 2613 self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC, 2614 self.cfg.GetNodeName(snode), 2615 "instance %s, connection to secondary node failed", 2616 instance.name) 2617 2618 if s_img.offline: 2619 inst_nodes_offline.append(snode) 2620 2621 # warn that the instance lives on offline nodes 2622 self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, 2623 instance.name, "instance has offline secondary node(s) %s", 2624 utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline))) 2625 # ... or ghost/non-vm_capable nodes 2626 for node_uuid in inst_nodes: 2627 self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE, 2628 instance.name, "instance lives on ghost node %s", 2629 self.cfg.GetNodeName(node_uuid)) 2630 self._ErrorIf(not node_image[node_uuid].vm_capable, 2631 constants.CV_EINSTANCEBADNODE, instance.name, 2632 "instance lives on non-vm_capable node %s", 2633 self.cfg.GetNodeName(node_uuid))
2634
2635 - def _VerifyOrphanVolumes(self, vg_name, node_vol_should, node_image, 2636 reserved):
2637 """Verify if there are any unknown volumes in the cluster. 2638 2639 The .os, .swap and backup volumes are ignored. All other volumes are 2640 reported as unknown. 2641 2642 @type vg_name: string 2643 @param vg_name: the name of the Ganeti-administered volume group 2644 @type reserved: L{ganeti.utils.FieldSet} 2645 @param reserved: a FieldSet of reserved volume names 2646 2647 """ 2648 for node_uuid, n_img in node_image.items(): 2649 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or 2650 self.all_node_info[node_uuid].group != self.group_uuid): 2651 # skip non-healthy nodes 2652 continue 2653 for volume in n_img.volumes: 2654 # skip volumes not belonging to the ganeti-administered volume group 2655 if volume.split('/')[0] != vg_name: 2656 continue 2657 2658 test = ((node_uuid not in node_vol_should or 2659 volume not in node_vol_should[node_uuid]) and 2660 not reserved.Matches(volume)) 2661 self._ErrorIf(test, constants.CV_ENODEORPHANLV, 2662 self.cfg.GetNodeName(node_uuid), 2663 "volume %s is unknown", volume, 2664 code=_VerifyErrors.ETYPE_WARNING)
2665
2666 - def _VerifyNPlusOneMemory(self, node_image, all_insts):
2667 """Verify N+1 Memory Resilience. 2668 2669 Check that if one single node dies we can still start all the 2670 instances it was primary for. 2671 2672 """ 2673 cluster_info = self.cfg.GetClusterInfo() 2674 for node_uuid, n_img in node_image.items(): 2675 # This code checks that every node which is now listed as 2676 # secondary has enough memory to host all instances it is 2677 # supposed to should a single other node in the cluster fail. 2678 # FIXME: not ready for failover to an arbitrary node 2679 # FIXME: does not support file-backed instances 2680 # WARNING: we currently take into account down instances as well 2681 # as up ones, considering that even if they're down someone 2682 # might want to start them even in the event of a node failure. 2683 if n_img.offline or \ 2684 self.all_node_info[node_uuid].group != self.group_uuid: 2685 # we're skipping nodes marked offline and nodes in other groups from 2686 # the N+1 warning, since most likely we don't have good memory 2687 # information from them; we already list instances living on such 2688 # nodes, and that's enough warning 2689 continue 2690 #TODO(dynmem): also consider ballooning out other instances 2691 for prinode, inst_uuids in n_img.sbp.items(): 2692 needed_mem = 0 2693 for inst_uuid in inst_uuids: 2694 bep = cluster_info.FillBE(all_insts[inst_uuid]) 2695 if bep[constants.BE_AUTO_BALANCE]: 2696 needed_mem += bep[constants.BE_MINMEM] 2697 test = n_img.mfree < needed_mem 2698 self._ErrorIf(test, constants.CV_ENODEN1, 2699 self.cfg.GetNodeName(node_uuid), 2700 "not enough memory to accomodate instance failovers" 2701 " should node %s fail (%dMiB needed, %dMiB available)", 2702 self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2703
2704 - def _VerifyClientCertificates(self, nodes, all_nvinfo):
2705 """Verifies the consistency of the client certificates. 2706 2707 This includes several aspects: 2708 - the individual validation of all nodes' certificates 2709 - the consistency of the master candidate certificate map 2710 - the consistency of the master candidate certificate map with the 2711 certificates that the master candidates are actually using. 2712 2713 @param nodes: the list of nodes to consider in this verification 2714 @param all_nvinfo: the map of results of the verify_node call to 2715 all nodes 2716 2717 """ 2718 candidate_certs = self.cfg.GetClusterInfo().candidate_certs 2719 if candidate_certs is None or len(candidate_certs) == 0: 2720 self._ErrorIf( 2721 True, constants.CV_ECLUSTERCLIENTCERT, None, 2722 "The cluster's list of master candidate certificates is empty." 2723 " If you just updated the cluster, please run" 2724 " 'gnt-cluster renew-crypto --new-node-certificates'.") 2725 return 2726 2727 self._ErrorIf( 2728 len(candidate_certs) != len(set(candidate_certs.values())), 2729 constants.CV_ECLUSTERCLIENTCERT, None, 2730 "There are at least two master candidates configured to use the same" 2731 " certificate.") 2732 2733 # collect the client certificate 2734 for node in nodes: 2735 if node.offline: 2736 continue 2737 2738 nresult = all_nvinfo[node.uuid] 2739 if nresult.fail_msg or not nresult.payload: 2740 continue 2741 2742 (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None) 2743 2744 self._ErrorIf( 2745 errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None, 2746 "Client certificate of node '%s' failed validation: %s (code '%s')", 2747 node.uuid, msg, errcode) 2748 2749 if not errcode: 2750 digest = msg 2751 if node.master_candidate: 2752 if node.uuid in candidate_certs: 2753 self._ErrorIf( 2754 digest != candidate_certs[node.uuid], 2755 constants.CV_ECLUSTERCLIENTCERT, None, 2756 "Client certificate digest of master candidate '%s' does not" 2757 " match its entry in the cluster's map of master candidate" 2758 " certificates. Expected: %s Got: %s", node.uuid, 2759 digest, candidate_certs[node.uuid]) 2760 else: 2761 self._ErrorIf( 2762 True, constants.CV_ECLUSTERCLIENTCERT, None, 2763 "The master candidate '%s' does not have an entry in the" 2764 " map of candidate certificates.", node.uuid) 2765 self._ErrorIf( 2766 digest in candidate_certs.values(), 2767 constants.CV_ECLUSTERCLIENTCERT, None, 2768 "Master candidate '%s' is using a certificate of another node.", 2769 node.uuid) 2770 else: 2771 self._ErrorIf( 2772 node.uuid in candidate_certs, 2773 constants.CV_ECLUSTERCLIENTCERT, None, 2774 "Node '%s' is not a master candidate, but still listed in the" 2775 " map of master candidate certificates.", node.uuid) 2776 self._ErrorIf( 2777 (node.uuid not in candidate_certs) and 2778 (digest in candidate_certs.values()), 2779 constants.CV_ECLUSTERCLIENTCERT, None, 2780 "Node '%s' is not a master candidate and is incorrectly using a" 2781 " certificate of another node which is master candidate.", 2782 node.uuid)
2783
2784 - def _VerifySshSetup(self, nodes, all_nvinfo):
2785 """Evaluates the verification results of the SSH setup and clutter test. 2786 2787 @param nodes: List of L{objects.Node} objects 2788 @param all_nvinfo: RPC results 2789 2790 """ 2791 for node in nodes: 2792 if not node.offline: 2793 nresult = all_nvinfo[node.uuid] 2794 if nresult.fail_msg or not nresult.payload: 2795 self._ErrorIf(True, constants.CV_ENODESSH, node.name, 2796 "Could not verify the SSH setup of this node.") 2797 return 2798 for ssh_test in [constants.NV_SSH_SETUP, constants.NV_SSH_CLUTTER]: 2799 result = nresult.payload.get(ssh_test, None) 2800 error_msg = "" 2801 if isinstance(result, list): 2802 error_msg = " ".join(result) 2803 self._ErrorIf(result, 2804 constants.CV_ENODESSH, None, error_msg)
2805
2806 - def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo, 2807 (files_all, files_opt, files_mc, files_vm)):
2808 """Verifies file checksums collected from all nodes. 2809 2810 @param nodes: List of L{objects.Node} objects 2811 @param master_node_uuid: UUID of master node 2812 @param all_nvinfo: RPC results 2813 2814 """ 2815 # Define functions determining which nodes to consider for a file 2816 files2nodefn = [ 2817 (files_all, None), 2818 (files_mc, lambda node: (node.master_candidate or 2819 node.uuid == master_node_uuid)), 2820 (files_vm, lambda node: node.vm_capable), 2821 ] 2822 2823 # Build mapping from filename to list of nodes which should have the file 2824 nodefiles = {} 2825 for (files, fn) in files2nodefn: 2826 if fn is None: 2827 filenodes = nodes 2828 else: 2829 filenodes = filter(fn, nodes) 2830 nodefiles.update((filename, 2831 frozenset(map(operator.attrgetter("uuid"), filenodes))) 2832 for filename in files) 2833 2834 assert set(nodefiles) == (files_all | files_mc | files_vm) 2835 2836 fileinfo = dict((filename, {}) for filename in nodefiles) 2837 ignore_nodes = set() 2838 2839 for node in nodes: 2840 if node.offline: 2841 ignore_nodes.add(node.uuid) 2842 continue 2843 2844 nresult = all_nvinfo[node.uuid] 2845 2846 if nresult.fail_msg or not nresult.payload: 2847 node_files = None 2848 else: 2849 fingerprints = nresult.payload.get(constants.NV_FILELIST, {}) 2850 node_files = dict((vcluster.LocalizeVirtualPath(key), value) 2851 for (key, value) in fingerprints.items()) 2852 del fingerprints 2853 2854 test = not (node_files and isinstance(node_files, dict)) 2855 self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name, 2856 "Node did not return file checksum data") 2857 if test: 2858 ignore_nodes.add(node.uuid) 2859 continue 2860 2861 # Build per-checksum mapping from filename to nodes having it 2862 for (filename, checksum) in node_files.items(): 2863 assert filename in nodefiles 2864 fileinfo[filename].setdefault(checksum, set()).add(node.uuid) 2865 2866 for (filename, checksums) in fileinfo.items(): 2867 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum" 2868 2869 # Nodes having the file 2870 with_file = frozenset(node_uuid 2871 for node_uuids in fileinfo[filename].values() 2872 for node_uuid in node_uuids) - ignore_nodes 2873 2874 expected_nodes = nodefiles[filename] - ignore_nodes 2875 2876 # Nodes missing file 2877 missing_file = expected_nodes - with_file 2878 2879 if filename in files_opt: 2880 # All or no nodes 2881 self._ErrorIf(missing_file and missing_file != expected_nodes, 2882 constants.CV_ECLUSTERFILECHECK, None, 2883 "File %s is optional, but it must exist on all or no" 2884 " nodes (not found on %s)", 2885 filename, 2886 utils.CommaJoin( 2887 utils.NiceSort( 2888 map(self.cfg.GetNodeName, missing_file)))) 2889 else: 2890 self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None, 2891 "File %s is missing from node(s) %s", filename, 2892 utils.CommaJoin( 2893 utils.NiceSort( 2894 map(self.cfg.GetNodeName, missing_file)))) 2895 2896 # Warn if a node has a file it shouldn't 2897 unexpected = with_file - expected_nodes 2898 self._ErrorIf(unexpected, 2899 constants.CV_ECLUSTERFILECHECK, None, 2900 "File %s should not exist on node(s) %s", 2901 filename, utils.CommaJoin( 2902 utils.NiceSort(map(self.cfg.GetNodeName, unexpected)))) 2903 2904 # See if there are multiple versions of the file 2905 test = len(checksums) > 1 2906 if test: 2907 variants = ["variant %s on %s" % 2908 (idx + 1, 2909 utils.CommaJoin(utils.NiceSort( 2910 map(self.cfg.GetNodeName, node_uuids)))) 2911 for (idx, (checksum, node_uuids)) in 2912 enumerate(sorted(checksums.items()))] 2913 else: 2914 variants = [] 2915 2916 self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None, 2917 "File %s found with %s different checksums (%s)", 2918 filename, len(checksums), "; ".join(variants))
2919
2920 - def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2921 """Verify the drbd helper. 2922 2923 """ 2924 if drbd_helper: 2925 helper_result = nresult.get(constants.NV_DRBDHELPER, None) 2926 test = (helper_result is None) 2927 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name, 2928 "no drbd usermode helper returned") 2929 if helper_result: 2930 status, payload = helper_result 2931 test = not status 2932 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name, 2933 "drbd usermode helper check unsuccessful: %s", payload) 2934 test = status and (payload != drbd_helper) 2935 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name, 2936 "wrong drbd usermode helper: %s", payload)
2937
2938 - def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper, 2939 drbd_map):
2940 """Verifies and the node DRBD status. 2941 2942 @type ninfo: L{objects.Node} 2943 @param ninfo: the node to check 2944 @param nresult: the remote results for the node 2945 @param instanceinfo: the dict of instances 2946 @param drbd_helper: the configured DRBD usermode helper 2947 @param drbd_map: the DRBD map as returned by 2948 L{ganeti.config.ConfigWriter.ComputeDRBDMap} 2949 2950 """ 2951 self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper) 2952 2953 # compute the DRBD minors 2954 node_drbd = {} 2955 for minor, inst_uuid in drbd_map[ninfo.uuid].items(): 2956 test = inst_uuid not in instanceinfo 2957 self._ErrorIf(test, constants.CV_ECLUSTERCFG, None, 2958 "ghost instance '%s' in temporary DRBD map", inst_uuid) 2959 # ghost instance should not be running, but otherwise we 2960 # don't give double warnings (both ghost instance and 2961 # unallocated minor in use) 2962 if test: 2963 node_drbd[minor] = (inst_uuid, False) 2964 else: 2965 instance = instanceinfo[inst_uuid] 2966 node_drbd[minor] = (inst_uuid, instance.disks_active) 2967 2968 # and now check them 2969 used_minors = nresult.get(constants.NV_DRBDLIST, []) 2970 test = not isinstance(used_minors, (tuple, list)) 2971 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name, 2972 "cannot parse drbd status file: %s", str(used_minors)) 2973 if test: 2974 # we cannot check drbd status 2975 return 2976 2977 for minor, (inst_uuid, must_exist) in node_drbd.items(): 2978 test = minor not in used_minors and must_exist 2979 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name, 2980 "drbd minor %d of instance %s is not active", minor, 2981 self.cfg.GetInstanceName(inst_uuid)) 2982 for minor in used_minors: 2983 test = minor not in node_drbd 2984 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name, 2985 "unallocated drbd minor %d is in use", minor)
2986
2987 - def _UpdateNodeOS(self, ninfo, nresult, nimg):
2988 """Builds the node OS structures. 2989 2990 @type ninfo: L{objects.Node} 2991 @param ninfo: the node to check 2992 @param nresult: the remote results for the node 2993 @param nimg: the node image object 2994 2995 """ 2996 remote_os = nresult.get(constants.NV_OSLIST, None) 2997 test = (not isinstance(remote_os, list) or 2998 not compat.all(isinstance(v, list) and len(v) == 8 2999 for v in remote_os)) 3000 3001 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name, 3002 "node hasn't returned valid OS data") 3003 3004 nimg.os_fail = test 3005 3006 if test: 3007 return 3008 3009 os_dict = {} 3010 3011 for (name, os_path, status, diagnose, 3012 variants, parameters, api_ver, 3013 trusted) in nresult[constants.NV_OSLIST]: 3014 3015 if name not in os_dict: 3016 os_dict[name] = [] 3017 3018 # parameters is a list of lists instead of list of tuples due to 3019 # JSON lacking a real tuple type, fix it: 3020 parameters = [tuple(v) for v in parameters] 3021 os_dict[name].append((os_path, status, diagnose, 3022 set(variants), set(parameters), set(api_ver), 3023 trusted)) 3024 3025 nimg.oslist = os_dict
3026
3027 - def _VerifyNodeOS(self, ninfo, nimg, base):
3028 """Verifies the node OS list. 3029 3030 @type ninfo: L{objects.Node} 3031 @param ninfo: the node to check 3032 @param nimg: the node image object 3033 @param base: the 'template' node we match against (e.g. from the master) 3034 3035 """ 3036 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?" 3037 3038 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l] 3039 for os_name, os_data in nimg.oslist.items(): 3040 assert os_data, "Empty OS status for OS %s?!" % os_name 3041 f_path, f_status, f_diag, f_var, f_param, f_api, f_trusted = os_data[0] 3042 self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name, 3043 "Invalid OS %s (located at %s): %s", 3044 os_name, f_path, f_diag) 3045 self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name, 3046 "OS '%s' has multiple entries" 3047 " (first one shadows the rest): %s", 3048 os_name, utils.CommaJoin([v[0] for v in os_data])) 3049 # comparisons with the 'base' image 3050 test = os_name not in base.oslist 3051 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name, 3052 "Extra OS %s not present on reference node (%s)", 3053 os_name, self.cfg.GetNodeName(base.uuid)) 3054 if test: 3055 continue 3056 assert base.oslist[os_name], "Base node has empty OS status?" 3057 _, b_status, _, b_var, b_param, b_api, b_trusted = base.oslist[os_name][0] 3058 if not b_status: 3059 # base OS is invalid, skipping 3060 continue 3061 for kind, a, b in [("API version", f_api, b_api), 3062 ("variants list", f_var, b_var), 3063 ("parameters", beautify_params(f_param), 3064 beautify_params(b_param))]: 3065 self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name, 3066 "OS %s for %s differs from reference node %s:" 3067 " [%s] vs. [%s]", kind, os_name, 3068 self.cfg.GetNodeName(base.uuid), 3069 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b))) 3070 for kind, a, b in [("trusted", f_trusted, b_trusted)]: 3071 self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name, 3072 "OS %s for %s differs from reference node %s:" 3073 " %s vs. %s", kind, os_name, 3074 self.cfg.GetNodeName(base.uuid), a, b) 3075 3076 # check any missing OSes 3077 missing = set(base.oslist.keys()).difference(nimg.oslist.keys()) 3078 self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name, 3079 "OSes present on reference node %s" 3080 " but missing on this node: %s", 3081 self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
3082
3083 - def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
3084 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}. 3085 3086 @type ninfo: L{objects.Node} 3087 @param ninfo: the node to check 3088 @param nresult: the remote results for the node 3089 @type is_master: bool 3090 @param is_master: Whether node is the master node 3091 3092 """ 3093 cluster = self.cfg.GetClusterInfo() 3094 if (is_master and 3095 (cluster.IsFileStorageEnabled() or 3096 cluster.IsSharedFileStorageEnabled())): 3097 try: 3098 fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS] 3099 except KeyError: 3100 # This should never happen 3101 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name, 3102 "Node did not return forbidden file storage paths") 3103 else: 3104 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name, 3105 "Found forbidden file storage paths: %s", 3106 utils.CommaJoin(fspaths)) 3107 else: 3108 self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult, 3109 constants.CV_ENODEFILESTORAGEPATHS, ninfo.name, 3110 "Node should not have returned forbidden file storage" 3111 " paths")
3112
3113 - def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template, 3114 verify_key, error_key):
3115 """Verifies (file) storage paths. 3116 3117 @type ninfo: L{objects.Node} 3118 @param ninfo: the node to check 3119 @param nresult: the remote results for the node 3120 @type file_disk_template: string 3121 @param file_disk_template: file-based disk template, whose directory 3122 is supposed to be verified 3123 @type verify_key: string 3124 @param verify_key: key for the verification map of this file 3125 verification step 3126 @param error_key: error key to be added to the verification results 3127 in case something goes wrong in this verification step 3128 3129 """ 3130 assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes( 3131 constants.ST_FILE, constants.ST_SHARED_FILE, constants.ST_GLUSTER 3132 )) 3133 3134 cluster = self.cfg.GetClusterInfo() 3135 if cluster.IsDiskTemplateEnabled(file_disk_template): 3136 self._ErrorIf( 3137 verify_key in nresult, 3138 error_key, ninfo.name, 3139 "The configured %s storage path is unusable: %s" % 3140 (file_disk_template, nresult.get(verify_key)))
3141
3142 - def _VerifyFileStoragePaths(self, ninfo, nresult):
3143 """Verifies (file) storage paths. 3144 3145 @see: C{_VerifyStoragePaths} 3146 3147 """ 3148 self._VerifyStoragePaths( 3149 ninfo, nresult, constants.DT_FILE, 3150 constants.NV_FILE_STORAGE_PATH, 3151 constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
3152
3153 - def _VerifySharedFileStoragePaths(self, ninfo, nresult):
3154 """Verifies (file) storage paths. 3155 3156 @see: C{_VerifyStoragePaths} 3157 3158 """ 3159 self._VerifyStoragePaths( 3160 ninfo, nresult, constants.DT_SHARED_FILE, 3161 constants.NV_SHARED_FILE_STORAGE_PATH, 3162 constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
3163
3164 - def _VerifyGlusterStoragePaths(self, ninfo, nresult):
3165 """Verifies (file) storage paths. 3166 3167 @see: C{_VerifyStoragePaths} 3168 3169 """ 3170 self._VerifyStoragePaths( 3171 ninfo, nresult, constants.DT_GLUSTER, 3172 constants.NV_GLUSTER_STORAGE_PATH, 3173 constants.CV_ENODEGLUSTERSTORAGEPATHUNUSABLE)
3174
3175 - def _VerifyOob(self, ninfo, nresult):
3176 """Verifies out of band functionality of a node. 3177 3178 @type ninfo: L{objects.Node} 3179 @param ninfo: the node to check 3180 @param nresult: the remote results for the node 3181 3182 """ 3183 # We just have to verify the paths on master and/or master candidates 3184 # as the oob helper is invoked on the master 3185 if ((ninfo.master_candidate or ninfo.master_capable) and 3186 constants.NV_OOB_PATHS in nresult): 3187 for path_result in nresult[constants.NV_OOB_PATHS]: 3188 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, 3189 ninfo.name, path_result)
3190
3191 - def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
3192 """Verifies and updates the node volume data. 3193 3194 This function will update a L{NodeImage}'s internal structures 3195 with data from the remote call. 3196 3197 @type ninfo: L{objects.Node} 3198 @param ninfo: the node to check 3199 @param nresult: the remote results for the node 3200 @param nimg: the node image object 3201 @param vg_name: the configured VG name 3202 3203 """ 3204 nimg.lvm_fail = True 3205 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") 3206 if vg_name is None: 3207 pass 3208 elif isinstance(lvdata, basestring): 3209 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name, 3210 "LVM problem on node: %s", utils.SafeEncode(lvdata)) 3211 elif not isinstance(lvdata, dict): 3212 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name, 3213 "rpc call to node failed (lvlist)") 3214 else: 3215 nimg.volumes = lvdata 3216 nimg.lvm_fail = False
3217
3218 - def _UpdateNodeInstances(self, ninfo, nresult, nimg):
3219 """Verifies and updates the node instance list. 3220 3221 If the listing was successful, then updates this node's instance 3222 list. Otherwise, it marks the RPC call as failed for the instance 3223 list key. 3224 3225 @type ninfo: L{objects.Node} 3226 @param ninfo: the node to check 3227 @param nresult: the remote results for the node 3228 @param nimg: the node image object 3229 3230 """ 3231 idata = nresult.get(constants.NV_INSTANCELIST, None) 3232 test = not isinstance(idata, list) 3233 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name, 3234 "rpc call to node failed (instancelist): %s", 3235 utils.SafeEncode(str(idata))) 3236 if test: 3237 nimg.hyp_fail = True 3238 else: 3239 nimg.instances = [uuid for (uuid, _) in 3240 self.cfg.GetMultiInstanceInfoByName(idata)]
3241
3242 - def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
3243 """Verifies and computes a node information map 3244 3245 @type ninfo: L{objects.Node} 3246 @param ninfo: the node to check 3247 @param nresult: the remote results for the node 3248 @param nimg: the node image object 3249 @param vg_name: the configured VG name 3250 3251 """ 3252 # try to read free memory (from the hypervisor) 3253 hv_info = nresult.get(constants.NV_HVINFO, None) 3254 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info 3255 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name, 3256 "rpc call to node failed (hvinfo)") 3257 if not test: 3258 try: 3259 nimg.mfree = int(hv_info["memory_free"]) 3260 except (ValueError, TypeError): 3261 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name, 3262 "node returned invalid nodeinfo, check hypervisor") 3263 3264 # FIXME: devise a free space model for file based instances as well 3265 if vg_name is not None: 3266 test = (constants.NV_VGLIST not in nresult or 3267 vg_name not in nresult[constants.NV_VGLIST]) 3268 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name, 3269 "node didn't return data for the volume group '%s'" 3270 " - it is either missing or broken", vg_name) 3271 if not test: 3272 try: 3273 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name]) 3274 except (ValueError, TypeError): 3275 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name, 3276 "node returned invalid LVM info, check LVM status")
3277
3278 - def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
3279 """Gets per-disk status information for all instances. 3280 3281 @type node_uuids: list of strings 3282 @param node_uuids: Node UUIDs 3283 @type node_image: dict of (UUID, L{objects.Node}) 3284 @param node_image: Node objects 3285 @type instanceinfo: dict of (UUID, L{objects.Instance}) 3286 @param instanceinfo: Instance objects 3287 @rtype: {instance: {node: [(succes, payload)]}} 3288 @return: a dictionary of per-instance dictionaries with nodes as 3289 keys and disk information as values; the disk information is a 3290 list of tuples (success, payload) 3291 3292 """ 3293 node_disks = {} 3294 node_disks_dev_inst_only = {} 3295 diskless_instances = set() 3296 nodisk_instances = set() 3297 diskless = constants.DT_DISKLESS 3298 3299 for nuuid in node_uuids: 3300 node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst, 3301 node_image[nuuid].sinst)) 3302 diskless_instances.update(uuid for uuid in node_inst_uuids 3303 if instanceinfo[uuid].disk_template == diskless) 3304 disks = [(inst_uuid, disk) 3305 for inst_uuid in node_inst_uuids 3306 for disk in self.cfg.GetInstanceDisks(inst_uuid)] 3307 3308 if not disks: 3309 nodisk_instances.update(uuid for uuid in node_inst_uuids 3310 if instanceinfo[uuid].disk_template != diskless) 3311 # No need to collect data 3312 continue 3313 3314 node_disks[nuuid] = disks 3315 3316 # _AnnotateDiskParams makes already copies of the disks 3317 dev_inst_only = [] 3318 for (inst_uuid, dev) in disks: 3319 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev], 3320 self.cfg) 3321 dev_inst_only.append((anno_disk, instanceinfo[inst_uuid])) 3322 3323 node_disks_dev_inst_only[nuuid] = dev_inst_only 3324 3325 assert len(node_disks) == len(node_disks_dev_inst_only) 3326 3327 # Collect data from all nodes with disks 3328 result = self.rpc.call_blockdev_getmirrorstatus_multi( 3329 node_disks.keys(), node_disks_dev_inst_only) 3330 3331 assert len(result) == len(node_disks) 3332 3333 instdisk = {} 3334 3335 for (nuuid, nres) in result.items(): 3336 node = self.cfg.GetNodeInfo(nuuid) 3337 disks = node_disks[node.uuid] 3338 3339 if nres.offline: 3340 # No data from this node 3341 data = len(disks) * [(False, "node offline")] 3342 else: 3343 msg = nres.fail_msg 3344 self._ErrorIf(msg, constants.CV_ENODERPC, node.name, 3345 "while getting disk information: %s", msg) 3346 if msg: 3347 # No data from this node 3348 data = len(disks) * [(False, msg)] 3349 else: 3350 data = [] 3351 for idx, i in enumerate(nres.payload): 3352 if isinstance(i, (tuple, list)) and len(i) == 2: 3353 data.append(i) 3354 else: 3355 logging.warning("Invalid result from node %s, entry %d: %s", 3356 node.name, idx, i) 3357 data.append((False, "Invalid result from the remote node")) 3358 3359 for ((inst_uuid, _), status) in zip(disks, data): 3360 instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \ 3361 .append(status) 3362 3363 # Add empty entries for diskless instances. 3364 for inst_uuid in diskless_instances: 3365 assert inst_uuid not in instdisk 3366 instdisk[inst_uuid] = {} 3367 # ...and disk-full instances that happen to have no disks 3368 for inst_uuid in nodisk_instances: 3369 assert inst_uuid not in instdisk 3370 instdisk[inst_uuid] = {} 3371 3372 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and 3373 len(nuuids) <= len( 3374 self.cfg.GetInstanceNodes(instanceinfo[inst].uuid)) and 3375 compat.all(isinstance(s, (tuple, list)) and 3376 len(s) == 2 for s in statuses) 3377 for inst, nuuids in instdisk.items() 3378 for nuuid, statuses in nuuids.items()) 3379 if __debug__: 3380 instdisk_keys = set(instdisk) 3381 instanceinfo_keys = set(instanceinfo) 3382 assert instdisk_keys == instanceinfo_keys, \ 3383 ("instdisk keys (%s) do not match instanceinfo keys (%s)" % 3384 (instdisk_keys, instanceinfo_keys)) 3385 3386 return instdisk
3387 3388 @staticmethod
3389 - def _SshNodeSelector(group_uuid, all_nodes):
3390 """Create endless iterators for all potential SSH check hosts. 3391 3392 """ 3393 nodes = [node for node in all_nodes 3394 if (node.group != group_uuid and 3395 not node.offline)] 3396 keyfunc = operator.attrgetter("group") 3397 3398 return map(itertools.cycle, 3399 [sorted(map(operator.attrgetter("name"), names)) 3400 for _, names in itertools.groupby(sorted(nodes, key=keyfunc), 3401 keyfunc)])
3402 3403 @classmethod
3404 - def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
3405 """Choose which nodes should talk to which other nodes. 3406 3407 We will make nodes contact all nodes in their group, and one node from 3408 every other group. 3409 3410 @rtype: tuple of (string, dict of strings to list of strings, string) 3411 @return: a tuple containing the list of all online nodes, a dictionary 3412 mapping node names to additional nodes of other node groups to which 3413 connectivity should be tested, and a list of all online master 3414 candidates 3415 3416 @warning: This algorithm has a known issue if one node group is much 3417 smaller than others (e.g. just one node). In such a case all other 3418 nodes will talk to the single node. 3419 3420 """ 3421 online_nodes = sorted(node.name for node in group_nodes if not node.offline) 3422 online_mcs = sorted(node.name for node in group_nodes 3423 if (node.master_candidate and not node.offline)) 3424 sel = cls._SshNodeSelector(group_uuid, all_nodes) 3425 3426 return (online_nodes, 3427 dict((name, sorted([i.next() for i in sel])) 3428 for name in online_nodes), 3429 online_mcs)
3430
3431 - def _PrepareSshSetupCheck(self):
3432 """Prepare the input data for the SSH setup verification. 3433 3434 """ 3435 all_nodes_info = self.cfg.GetAllNodesInfo() 3436 potential_master_candidates = self.cfg.GetPotentialMasterCandidates() 3437 node_status = [ 3438 (uuid, node_info.name, node_info.master_candidate, 3439 node_info.name in potential_master_candidates, not node_info.offline) 3440 for (uuid, node_info) in all_nodes_info.items()] 3441 return node_status
3442
3443 - def BuildHooksEnv(self):
3444 """Build hooks env. 3445 3446 Cluster-Verify hooks just ran in the post phase and their failure makes 3447 the output be logged in the verify output and the verification to fail. 3448 3449 """ 3450 env = { 3451 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()), 3452 } 3453 3454 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags())) 3455 for node in self.my_node_info.values()) 3456 3457 return env
3458
3459 - def BuildHooksNodes(self):
3460 """Build hooks nodes. 3461 3462 """ 3463 return ([], list(self.my_node_info.keys()))
3464 3465 @staticmethod
3466 - def _VerifyOtherNotes(feedback_fn, i_non_redundant, i_non_a_balanced, 3467 i_offline, n_offline, n_drained):
3468 feedback_fn("* Other Notes") 3469 if i_non_redundant: 3470 feedback_fn(" - NOTICE: %d non-redundant instance(s) found." 3471 % len(i_non_redundant)) 3472 3473 if i_non_a_balanced: 3474 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found." 3475 % len(i_non_a_balanced)) 3476 3477 if i_offline: 3478 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline) 3479 3480 if n_offline: 3481 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline) 3482 3483 if n_drained: 3484 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
3485
3486 - def Exec(self, feedback_fn): # pylint: disable=R0915
3487 """Verify integrity of the node group, performing various test on nodes. 3488 3489 """ 3490 # This method has too many local variables. pylint: disable=R0914 3491 feedback_fn("* Verifying group '%s'" % self.group_info.name) 3492 3493 if not self.my_node_uuids: 3494 # empty node group 3495 feedback_fn("* Empty node group, skipping verification") 3496 return True 3497 3498 self.bad = False 3499 verbose = self.op.verbose 3500 self._feedback_fn = feedback_fn 3501 3502 vg_name = self.cfg.GetVGName() 3503 drbd_helper = self.cfg.GetDRBDHelper() 3504 cluster = self.cfg.GetClusterInfo() 3505 hypervisors = cluster.enabled_hypervisors 3506 node_data_list = self.my_node_info.values() 3507 3508 i_non_redundant = [] # Non redundant instances 3509 i_non_a_balanced = [] # Non auto-balanced instances 3510 i_offline = 0 # Count of offline instances 3511 n_offline = 0 # Count of offline nodes 3512 n_drained = 0 # Count of nodes being drained 3513 node_vol_should = {} 3514 3515 # FIXME: verify OS list 3516 3517 # File verification 3518 filemap = ComputeAncillaryFiles(cluster, False) 3519 3520 # do local checksums 3521 master_node_uuid = self.master_node = self.cfg.GetMasterNode() 3522 master_ip = self.cfg.GetMasterIP() 3523 3524 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids)) 3525 3526 user_scripts = [] 3527 if self.cfg.GetUseExternalMipScript(): 3528 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT) 3529 3530 node_verify_param = { 3531 constants.NV_FILELIST: 3532 map(vcluster.MakeVirtualPath, 3533 utils.UniqueSequence(filename 3534 for files in filemap 3535 for filename in files)), 3536 constants.NV_NODELIST: 3537 self._SelectSshCheckNodes(node_data_list, self.group_uuid, 3538 self.all_node_info.values()), 3539 constants.NV_HYPERVISOR: hypervisors, 3540 constants.NV_HVPARAMS: 3541 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()), 3542 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip) 3543 for node in node_data_list 3544 if not node.offline], 3545 constants.NV_INSTANCELIST: hypervisors, 3546 constants.NV_VERSION: None, 3547 constants.NV_HVINFO: self.cfg.GetHypervisorType(), 3548 constants.NV_NODESETUP: None, 3549 constants.NV_TIME: None, 3550 constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip), 3551 constants.NV_OSLIST: None, 3552 constants.NV_NONVMNODES: self.cfg.GetNonVmCapableNodeNameList(), 3553 constants.NV_USERSCRIPTS: user_scripts, 3554 constants.NV_CLIENT_CERT: None, 3555 } 3556 3557 if self.cfg.GetClusterInfo().modify_ssh_setup: 3558 node_verify_param[constants.NV_SSH_SETUP] = self._PrepareSshSetupCheck() 3559 if self.op.verify_clutter: 3560 node_verify_param[constants.NV_SSH_CLUTTER] = True 3561 3562 if vg_name is not None: 3563 node_verify_param[constants.NV_VGLIST] = None 3564 node_verify_param[constants.NV_LVLIST] = vg_name 3565 node_verify_param[constants.NV_PVLIST] = [vg_name] 3566 3567 if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8): 3568 if drbd_helper: 3569 node_verify_param[constants.NV_DRBDVERSION] = None 3570 node_verify_param[constants.NV_DRBDLIST] = None 3571 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper 3572 3573 if cluster.IsFileStorageEnabled() or \ 3574 cluster.IsSharedFileStorageEnabled(): 3575 # Load file storage paths only from master node 3576 node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \ 3577 self.cfg.GetMasterNodeName() 3578 if cluster.IsFileStorageEnabled(): 3579 node_verify_param[constants.NV_FILE_STORAGE_PATH] = \ 3580 cluster.file_storage_dir 3581 if cluster.IsSharedFileStorageEnabled(): 3582 node_verify_param[constants.NV_SHARED_FILE_STORAGE_PATH] = \ 3583 cluster.shared_file_storage_dir 3584 3585 # bridge checks 3586 # FIXME: this needs to be changed per node-group, not cluster-wide 3587 bridges = set() 3588 default_nicpp = cluster.nicparams[constants.PP_DEFAULT] 3589 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: 3590 bridges.add(default_nicpp[constants.NIC_LINK]) 3591 for inst_uuid in self.my_inst_info.values(): 3592 for nic in inst_uuid.nics: 3593 full_nic = cluster.SimpleFillNIC(nic.nicparams) 3594 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: 3595 bridges.add(full_nic[constants.NIC_LINK]) 3596 3597 if bridges: 3598 node_verify_param[constants.NV_BRIDGES] = list(bridges) 3599 3600 # Build our expected cluster state 3601 node_image = dict((node.uuid, self.NodeImage(offline=node.offline, 3602 uuid=node.uuid, 3603 vm_capable=node.vm_capable)) 3604 for node in node_data_list) 3605 3606 # Gather OOB paths 3607 oob_paths = [] 3608 for node in self.all_node_info.values(): 3609 path = SupportsOob(self.cfg, node) 3610 if path and path not in oob_paths: 3611 oob_paths.append(path) 3612 3613 if oob_paths: 3614 node_verify_param[constants.NV_OOB_PATHS] = oob_paths 3615 3616 for inst_uuid in self.my_inst_uuids: 3617 instance = self.my_inst_info[inst_uuid] 3618 if instance.admin_state == constants.ADMINST_OFFLINE: 3619 i_offline += 1 3620 3621 inst_nodes = self.cfg.GetInstanceNodes(instance.uuid) 3622 for nuuid in inst_nodes: 3623 if nuuid not in node_image: 3624 gnode = self.NodeImage(uuid=nuuid) 3625 gnode.ghost = (nuuid not in self.all_node_info) 3626 node_image[nuuid] = gnode 3627 3628 self.cfg.GetInstanceLVsByNode(instance.uuid, lvmap=node_vol_should) 3629 3630 pnode = instance.primary_node 3631 node_image[pnode].pinst.append(instance.uuid) 3632 3633 for snode in self.cfg.GetInstanceSecondaryNodes(instance.uuid): 3634 nimg = node_image[snode] 3635 nimg.sinst.append(instance.uuid) 3636 if pnode not in nimg.sbp: 3637 nimg.sbp[pnode] = [] 3638 nimg.sbp[pnode].append(instance.uuid) 3639 3640 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, 3641 self.my_node_info.keys()) 3642 # The value of exclusive_storage should be the same across the group, so if 3643 # it's True for at least a node, we act as if it were set for all the nodes 3644 self._exclusive_storage = compat.any(es_flags.values()) 3645 if self._exclusive_storage: 3646 node_verify_param[constants.NV_EXCLUSIVEPVS] = True 3647 3648 node_group_uuids = dict(map(lambda n: (n.name, n.group), 3649 self.cfg.GetAllNodesInfo().values())) 3650 groups_config = self.cfg.GetAllNodeGroupsInfoDict() 3651 3652 # At this point, we have the in-memory data structures complete, 3653 # except for the runtime information, which we'll gather next 3654 3655 # NOTE: Here we lock the configuration for the duration of RPC calls, 3656 # which means that the cluster configuration changes are blocked during 3657 # this period. 3658 # This is something that should be done only exceptionally and only for 3659 # justified cases! 3660 # In this case, we need the lock as we can only verify the integrity of 3661 # configuration files on MCs only if we know nobody else is modifying it. 3662 # FIXME: The check for integrity of config.data should be moved to 3663 # WConfD, which is the only one who can otherwise ensure nobody 3664 # will modify the configuration during the check. 3665 with self.cfg.GetConfigManager(shared=True): 3666 feedback_fn("* Gathering information about nodes (%s nodes)" % 3667 len(self.my_node_uuids)) 3668 # Force the configuration to be fully distributed before doing any tests 3669 self.cfg.FlushConfig() 3670 # Due to the way our RPC system works, exact response times cannot be 3671 # guaranteed (e.g. a broken node could run into a timeout). By keeping 3672 # the time before and after executing the request, we can at least have 3673 # a time window. 3674 nvinfo_starttime = time.time() 3675 # Get lock on the configuration so that nobody modifies it concurrently. 3676 # Otherwise it can be modified by other jobs, failing the consistency 3677 # test. 3678 # NOTE: This is an exceptional situation, we should otherwise avoid 3679 # locking the configuration for something but very fast, pure operations. 3680 cluster_name = self.cfg.GetClusterName() 3681 hvparams = self.cfg.GetClusterInfo().hvparams 3682 all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids, 3683 node_verify_param, 3684 cluster_name, 3685 hvparams, 3686 node_group_uuids, 3687 groups_config) 3688 nvinfo_endtime = time.time() 3689 3690 if self.extra_lv_nodes and vg_name is not None: 3691 feedback_fn("* Gathering information about extra nodes (%s nodes)" % 3692 len(self.extra_lv_nodes)) 3693 extra_lv_nvinfo = \ 3694 self.rpc.call_node_verify(self.extra_lv_nodes, 3695 {constants.NV_LVLIST: vg_name}, 3696 self.cfg.GetClusterName(), 3697 self.cfg.GetClusterInfo().hvparams, 3698 node_group_uuids, 3699 groups_config) 3700 else: 3701 extra_lv_nvinfo = {} 3702 3703 # If not all nodes are being checked, we need to make sure the master 3704 # node and a non-checked vm_capable node are in the list. 3705 absent_node_uuids = set(self.all_node_info).difference(self.my_node_info) 3706 if absent_node_uuids: 3707 vf_nvinfo = all_nvinfo.copy() 3708 vf_node_info = list(self.my_node_info.values()) 3709 additional_node_uuids = [] 3710 if master_node_uuid not in self.my_node_info: 3711 additional_node_uuids.append(master_node_uuid) 3712 vf_node_info.append(self.all_node_info[master_node_uuid]) 3713 # Add the first vm_capable node we find which is not included, 3714 # excluding the master node (which we already have) 3715 for node_uuid in absent_node_uuids: 3716 nodeinfo = self.all_node_info[node_uuid] 3717 if (nodeinfo.vm_capable and not nodeinfo.offline and 3718 node_uuid != master_node_uuid): 3719 additional_node_uuids.append(node_uuid) 3720 vf_node_info.append(self.all_node_info[node_uuid]) 3721 break 3722 key = constants.NV_FILELIST 3723 3724 feedback_fn("* Gathering information about the master node") 3725 vf_nvinfo.update(self.rpc.call_node_verify( 3726 additional_node_uuids, {key: node_verify_param[key]}, 3727 self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams, 3728 node_group_uuids, 3729 groups_config)) 3730 else: 3731 vf_nvinfo = all_nvinfo 3732 vf_node_info = self.my_node_info.values() 3733 3734 all_drbd_map = self.cfg.ComputeDRBDMap() 3735 3736 feedback_fn("* Gathering disk information (%s nodes)" % 3737 len(self.my_node_uuids)) 3738 instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image, 3739 self.my_inst_info) 3740 3741 feedback_fn("* Verifying configuration file consistency") 3742 3743 self._VerifyClientCertificates(self.my_node_info.values(), all_nvinfo) 3744 if self.cfg.GetClusterInfo().modify_ssh_setup: 3745 self._VerifySshSetup(self.my_node_info.values(), all_nvinfo) 3746 self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap) 3747 3748 feedback_fn("* Verifying node status") 3749 3750 refos_img = None 3751 3752 for node_i in node_data_list: 3753 nimg = node_image[node_i.uuid] 3754 3755 if node_i.offline: 3756 if verbose: 3757 feedback_fn("* Skipping offline node %s" % (node_i.name,)) 3758 n_offline += 1 3759 continue 3760 3761 if node_i.uuid == master_node_uuid: 3762 ntype = "master" 3763 elif node_i.master_candidate: 3764 ntype = "master candidate" 3765 elif node_i.drained: 3766 ntype = "drained" 3767 n_drained += 1 3768 else: 3769 ntype = "regular" 3770 if verbose: 3771 feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype)) 3772 3773 msg = all_nvinfo[node_i.uuid].fail_msg 3774 self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name, 3775 "while contacting node: %s", msg) 3776 if msg: 3777 nimg.rpc_fail = True 3778 continue 3779 3780 nresult = all_nvinfo[node_i.uuid].payload 3781 3782 nimg.call_ok = self._VerifyNode(node_i, nresult) 3783 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime) 3784 self._VerifyNodeNetwork(node_i, nresult) 3785 self._VerifyNodeUserScripts(node_i, nresult) 3786 self._VerifyOob(node_i, nresult) 3787 self._VerifyAcceptedFileStoragePaths(node_i, nresult, 3788 node_i.uuid == master_node_uuid) 3789 self._VerifyFileStoragePaths(node_i, nresult) 3790 self._VerifySharedFileStoragePaths(node_i, nresult) 3791 self._VerifyGlusterStoragePaths(node_i, nresult) 3792 3793 if nimg.vm_capable: 3794 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg) 3795 if constants.DT_DRBD8 in cluster.enabled_disk_templates: 3796 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper, 3797 all_drbd_map) 3798 3799 if (constants.DT_PLAIN in cluster.enabled_disk_templates) or \ 3800 (constants.DT_DRBD8 in cluster.enabled_disk_templates): 3801 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name) 3802 self._UpdateNodeInstances(node_i, nresult, nimg) 3803 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name) 3804 self._UpdateNodeOS(node_i, nresult, nimg) 3805 3806 if not nimg.os_fail: 3807 if refos_img is None: 3808 refos_img = nimg 3809 self._VerifyNodeOS(node_i, nimg, refos_img) 3810 self._VerifyNodeBridges(node_i, nresult, bridges) 3811 3812 # Check whether all running instances are primary for the node. (This 3813 # can no longer be done from _VerifyInstance below, since some of the 3814 # wrong instances could be from other node groups.) 3815 non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst) 3816 3817 for inst_uuid in non_primary_inst_uuids: 3818 test = inst_uuid in self.all_inst_info 3819 self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, 3820 self.cfg.GetInstanceName(inst_uuid), 3821 "instance should not run on node %s", node_i.name) 3822 self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name, 3823 "node is running unknown instance %s", inst_uuid) 3824 3825 self._VerifyGroupDRBDVersion(all_nvinfo) 3826 self._VerifyGroupLVM(node_image, vg_name) 3827 3828 for node_uuid, result in extra_lv_nvinfo.items(): 3829 self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload, 3830 node_image[node_uuid], vg_name) 3831 3832 feedback_fn("* Verifying instance status") 3833 for inst_uuid in self.my_inst_uuids: 3834 instance = self.my_inst_info[inst_uuid] 3835 if verbose: 3836 feedback_fn("* Verifying instance %s" % instance.name) 3837 self._VerifyInstance(instance, node_image, instdisk[inst_uuid]) 3838 3839 # If the instance is non-redundant we cannot survive losing its primary 3840 # node, so we are not N+1 compliant. 3841 if instance.disk_template not in constants.DTS_MIRRORED: 3842 i_non_redundant.append(instance) 3843 3844 if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]: 3845 i_non_a_balanced.append(instance) 3846 3847 feedback_fn("* Verifying orphan volumes") 3848 reserved = utils.FieldSet(*cluster.reserved_lvs) 3849 3850 # We will get spurious "unknown volume" warnings if any node of this group 3851 # is secondary for an instance whose primary is in another group. To avoid 3852 # them, we find these instances and add their volumes to node_vol_should. 3853 for instance in self.all_inst_info.values(): 3854 for secondary in self.cfg.GetInstanceSecondaryNodes(instance.uuid): 3855 if (secondary in self.my_node_info 3856 and instance.name not in self.my_inst_info): 3857 self.cfg.GetInstanceLVsByNode(instance.uuid, lvmap=node_vol_should) 3858 break 3859 3860 self._VerifyOrphanVolumes(vg_name, node_vol_should, node_image, reserved) 3861 3862 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks: 3863 feedback_fn("* Verifying N+1 Memory redundancy") 3864 self._VerifyNPlusOneMemory(node_image, self.my_inst_info) 3865 3866 self._VerifyOtherNotes(feedback_fn, i_non_redundant, i_non_a_balanced, 3867 i_offline, n_offline, n_drained) 3868 3869 return not self.bad
3870
3871 - def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3872 """Analyze the post-hooks' result 3873 3874 This method analyses the hook result, handles it, and sends some 3875 nicely-formatted feedback back to the user. 3876 3877 @param phase: one of L{constants.HOOKS_PHASE_POST} or 3878 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase 3879 @param hooks_results: the results of the multi-node hooks rpc call 3880 @param feedback_fn: function used send feedback back to the caller 3881 @param lu_result: previous Exec result 3882 @return: the new Exec result, based on the previous result 3883 and hook results 3884 3885 """ 3886 # We only really run POST phase hooks, only for non-empty groups, 3887 # and are only interested in their results 3888 if not self.my_node_uuids: 3889 # empty node group 3890 pass 3891 elif phase == constants.HOOKS_PHASE_POST: 3892 # Used to change hooks' output to proper indentation 3893 feedback_fn("* Hooks Results") 3894 assert hooks_results, "invalid result from hooks" 3895 3896 for node_name in hooks_results: 3897 res = hooks_results[node_name] 3898 msg = res.fail_msg 3899 test = msg and not res.offline 3900 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name, 3901 "Communication failure in hooks execution: %s", msg) 3902 if test: 3903 lu_result = False 3904 continue 3905 if res.offline: 3906 # No need to investigate payload if node is offline 3907 continue 3908 for script, hkr, output in res.payload: 3909 test = hkr == constants.HKR_FAIL 3910 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name, 3911 "Script %s failed, output:", script) 3912 if test: 3913 output = self._HOOKS_INDENT_RE.sub(" ", output) 3914 feedback_fn("%s" % output) 3915 lu_result = False 3916 3917 return lu_result
3918
3919 3920 -class LUClusterVerifyDisks(NoHooksLU):
3921 """Verifies the cluster disks status. 3922 3923 """ 3924 REQ_BGL = False 3925
3926 - def ExpandNames(self):
3927 self.share_locks = ShareAll() 3928 self.needed_locks = { 3929 locking.LEVEL_NODEGROUP: locking.ALL_SET, 3930 }
3931
3932 - def Exec(self, feedback_fn):
3933 group_names = self.owned_locks(locking.LEVEL_NODEGROUP) 3934 3935 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group 3936 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)] 3937 for group in group_names])
3938