Package ganeti :: Package tools :: Module burnin
[hide private]
[frames] | no frames]

Source Code for Module ganeti.tools.burnin

   1  #!/usr/bin/python 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Burnin program 
  32   
  33  """ 
  34   
  35  import sys 
  36  import optparse 
  37  import time 
  38  import socket 
  39  import urllib 
  40  from itertools import izip, islice, cycle 
  41  from cStringIO import StringIO 
  42   
  43  from ganeti import opcodes 
  44  from ganeti import constants 
  45  from ganeti import cli 
  46  from ganeti import errors 
  47  from ganeti import utils 
  48  from ganeti import hypervisor 
  49  from ganeti import compat 
  50  from ganeti import pathutils 
  51   
  52  from ganeti.confd import client as confd_client 
  53  from ganeti.runtime import (GetClient) 
  54   
  55   
  56  USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") 
  57   
  58  MAX_RETRIES = 3 
  59  LOG_HEADERS = { 
  60    0: "- ", 
  61    1: "* ", 
  62    2: "", 
  63    } 
  64   
  65  #: Disk templates supporting a single node 
  66  _SINGLE_NODE_DISK_TEMPLATES = compat.UniqueFrozenset([ 
  67    constants.DT_DISKLESS, 
  68    constants.DT_PLAIN, 
  69    constants.DT_FILE, 
  70    constants.DT_SHARED_FILE, 
  71    constants.DT_EXT, 
  72    constants.DT_RBD, 
  73    constants.DT_GLUSTER 
  74    ]) 
  75   
  76  _SUPPORTED_DISK_TEMPLATES = compat.UniqueFrozenset([ 
  77    constants.DT_DISKLESS, 
  78    constants.DT_DRBD8, 
  79    constants.DT_EXT, 
  80    constants.DT_FILE, 
  81    constants.DT_PLAIN, 
  82    constants.DT_RBD, 
  83    constants.DT_SHARED_FILE, 
  84    constants.DT_GLUSTER 
  85    ]) 
  86   
  87  #: Disk templates for which import/export is tested 
  88  _IMPEXP_DISK_TEMPLATES = (_SUPPORTED_DISK_TEMPLATES - frozenset([ 
  89    constants.DT_DISKLESS, 
  90    constants.DT_FILE, 
  91    constants.DT_SHARED_FILE, 
  92    constants.DT_GLUSTER 
  93    ])) 
94 95 96 -class InstanceDown(Exception):
97 """The checked instance was not up"""
98
99 100 -class BurninFailure(Exception):
101 """Failure detected during burning"""
102
103 104 -def Usage():
105 """Shows program usage information and exits the program.""" 106 107 print >> sys.stderr, "Usage:" 108 print >> sys.stderr, USAGE 109 sys.exit(2)
110
111 112 -def Log(msg, *args, **kwargs):
113 """Simple function that prints out its argument. 114 115 """ 116 if args: 117 msg = msg % args 118 indent = kwargs.get("indent", 0) 119 sys.stdout.write("%*s%s%s\n" % (2 * indent, "", 120 LOG_HEADERS.get(indent, " "), msg)) 121 sys.stdout.flush()
122
123 124 -def Err(msg, exit_code=1):
125 """Simple error logging that prints to stderr. 126 127 """ 128 sys.stderr.write(msg + "\n") 129 sys.stderr.flush() 130 sys.exit(exit_code)
131
132 133 -class SimpleOpener(urllib.FancyURLopener):
134 """A simple url opener""" 135 # pylint: disable=W0221 136
137 - def prompt_user_passwd(self, host, realm, clear_cache=0):
138 """No-interaction version of prompt_user_passwd.""" 139 # we follow parent class' API 140 # pylint: disable=W0613 141 return None, None
142
143 - def http_error_default(self, url, fp, errcode, errmsg, headers):
144 """Custom error handling""" 145 # make sure sockets are not left in CLOSE_WAIT, this is similar 146 # but with a different exception to the BasicURLOpener class 147 _ = fp.read() # throw away data 148 fp.close() 149 raise InstanceDown("HTTP error returned: code %s, msg %s" % 150 (errcode, errmsg))
151 152 153 OPTIONS = [ 154 cli.cli_option("-o", "--os", dest="os", default=None, 155 help="OS to use during burnin", 156 metavar="<OS>", 157 completion_suggest=cli.OPT_COMPL_ONE_OS), 158 cli.HYPERVISOR_OPT, 159 cli.OSPARAMS_OPT, 160 cli.cli_option("--disk-size", dest="disk_size", 161 help="Disk size (determines disk count)", 162 default="1G", type="string", metavar="<size,size,...>", 163 completion_suggest=("512M 1G 4G 1G,256M" 164 " 4G,1G,1G 10G").split()), 165 cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth", 166 default="128m", type="string", metavar="<size,size,...>"), 167 cli.cli_option("--mem-size", dest="mem_size", help="Memory size", 168 default=None, type="unit", metavar="<size>", 169 completion_suggest=("128M 256M 512M 1G 4G 8G" 170 " 12G 16G").split()), 171 cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size", 172 default=256, type="unit", metavar="<size>", 173 completion_suggest=("128M 256M 512M 1G 4G 8G" 174 " 12G 16G").split()), 175 cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size", 176 default=128, type="unit", metavar="<size>", 177 completion_suggest=("128M 256M 512M 1G 4G 8G" 178 " 12G 16G").split()), 179 cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count", 180 default=3, type="unit", metavar="<count>", 181 completion_suggest=("1 2 3 4").split()), 182 cli.DEBUG_OPT, 183 cli.VERBOSE_OPT, 184 cli.NOIPCHECK_OPT, 185 cli.NONAMECHECK_OPT, 186 cli.EARLY_RELEASE_OPT, 187 cli.cli_option("--no-replace1", dest="do_replace1", 188 help="Skip disk replacement with the same secondary", 189 action="store_false", default=True), 190 cli.cli_option("--no-replace2", dest="do_replace2", 191 help="Skip disk replacement with a different secondary", 192 action="store_false", default=True), 193 cli.cli_option("--no-failover", dest="do_failover", 194 help="Skip instance failovers", action="store_false", 195 default=True), 196 cli.cli_option("--no-migrate", dest="do_migrate", 197 help="Skip instance live migration", 198 action="store_false", default=True), 199 cli.cli_option("--no-move", dest="do_move", 200 help="Skip instance moves", action="store_false", 201 default=True), 202 cli.cli_option("--no-importexport", dest="do_importexport", 203 help="Skip instance export/import", action="store_false", 204 default=True), 205 cli.cli_option("--no-startstop", dest="do_startstop", 206 help="Skip instance stop/start", action="store_false", 207 default=True), 208 cli.cli_option("--no-reinstall", dest="do_reinstall", 209 help="Skip instance reinstall", action="store_false", 210 default=True), 211 cli.cli_option("--no-reboot", dest="do_reboot", 212 help="Skip instance reboot", action="store_false", 213 default=True), 214 cli.cli_option("--no-renamesame", dest="do_renamesame", 215 help="Skip instance rename to same name", action="store_false", 216 default=True), 217 cli.cli_option("--reboot-types", dest="reboot_types", 218 help="Specify the reboot types", default=None), 219 cli.cli_option("--no-activate-disks", dest="do_activate_disks", 220 help="Skip disk activation/deactivation", 221 action="store_false", default=True), 222 cli.cli_option("--no-add-disks", dest="do_addremove_disks", 223 help="Skip disk addition/removal", 224 action="store_false", default=True), 225 cli.cli_option("--no-add-nics", dest="do_addremove_nics", 226 help="Skip NIC addition/removal", 227 action="store_false", default=True), 228 cli.cli_option("--no-nics", dest="nics", 229 help="No network interfaces", action="store_const", 230 const=[], default=[{}]), 231 cli.cli_option("--no-confd", dest="do_confd_tests", 232 help="Skip confd queries", 233 action="store_false", default=True), 234 cli.cli_option("--rename", dest="rename", default=None, 235 help=("Give one unused instance name which is taken" 236 " to start the renaming sequence"), 237 metavar="<instance_name>"), 238 cli.cli_option("-t", "--disk-template", dest="disk_template", 239 choices=list(_SUPPORTED_DISK_TEMPLATES), 240 default=constants.DT_DRBD8, 241 help=("Disk template (default %s, otherwise one of %s)" % 242 (constants.DT_DRBD8, 243 utils.CommaJoin(_SUPPORTED_DISK_TEMPLATES)))), 244 cli.cli_option("-n", "--nodes", dest="nodes", default="", 245 help=("Comma separated list of nodes to perform" 246 " the burnin on (defaults to all nodes)"), 247 completion_suggest=cli.OPT_COMPL_MANY_NODES), 248 cli.cli_option("-I", "--iallocator", dest="iallocator", 249 default=None, type="string", 250 help=("Perform the allocation using an iallocator" 251 " instead of fixed node spread (node restrictions no" 252 " longer apply, therefore -n/--nodes must not be" 253 " used"), 254 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR), 255 cli.cli_option("-p", "--parallel", default=False, action="store_true", 256 dest="parallel", 257 help=("Enable parallelization of some operations in" 258 " order to speed burnin or to test granular locking")), 259 cli.cli_option("--net-timeout", default=15, type="int", 260 dest="net_timeout", 261 help=("The instance check network timeout in seconds" 262 " (defaults to 15 seconds)"), 263 completion_suggest="15 60 300 900".split()), 264 cli.cli_option("-C", "--http-check", default=False, action="store_true", 265 dest="http_check", 266 help=("Enable checking of instance status via http," 267 " looking for /hostname.txt that should contain the" 268 " name of the instance")), 269 cli.cli_option("-K", "--keep-instances", default=False, 270 action="store_true", 271 dest="keep_instances", 272 help=("Leave instances on the cluster after burnin," 273 " for investigation in case of errors or simply" 274 " to use them")), 275 cli.REASON_OPT, 276 ] 277 278 # Mainly used for bash completion 279 ARGUMENTS = [cli.ArgInstance(min=1)]
280 281 282 -def _DoCheckInstances(fn):
283 """Decorator for checking instances. 284 285 """ 286 def wrapper(self, *args, **kwargs): 287 val = fn(self, *args, **kwargs) 288 for instance in self.instances: 289 self._CheckInstanceAlive(instance) # pylint: disable=W0212 290 return val
291 292 return wrapper 293
294 295 -def _DoBatch(retry):
296 """Decorator for possible batch operations. 297 298 Must come after the _DoCheckInstances decorator (if any). 299 300 @param retry: whether this is a retryable batch, will be 301 passed to StartBatch 302 303 """ 304 def wrap(fn): 305 def batched(self, *args, **kwargs): 306 self.StartBatch(retry) 307 val = fn(self, *args, **kwargs) 308 self.CommitQueue() 309 return val
310 return batched 311 312 return wrap 313
314 315 -class Burner(object):
316 """Burner class.""" 317
318 - def __init__(self):
319 """Constructor.""" 320 self.url_opener = SimpleOpener() 321 self._feed_buf = StringIO() 322 self.nodes = [] 323 self.instances = [] 324 self.to_rem = [] 325 self.queued_ops = [] 326 self.opts = None 327 self.queue_retry = False 328 self.disk_count = self.disk_growth = self.disk_size = None 329 self.hvp = self.bep = None 330 self.ParseOptions() 331 self.cl = cli.GetClient() 332 self.GetState()
333
334 - def ClearFeedbackBuf(self):
335 """Clear the feedback buffer.""" 336 self._feed_buf.truncate(0)
337
338 - def GetFeedbackBuf(self):
339 """Return the contents of the buffer.""" 340 return self._feed_buf.getvalue()
341
342 - def Feedback(self, msg):
343 """Acumulate feedback in our buffer.""" 344 formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2]) 345 self._feed_buf.write(formatted_msg + "\n") 346 if self.opts.verbose: 347 Log(formatted_msg, indent=3)
348
349 - def MaybeRetry(self, retry_count, msg, fn, *args):
350 """Possibly retry a given function execution. 351 352 @type retry_count: int 353 @param retry_count: retry counter: 354 - 0: non-retryable action 355 - 1: last retry for a retryable action 356 - MAX_RETRIES: original try for a retryable action 357 @type msg: str 358 @param msg: the kind of the operation 359 @type fn: callable 360 @param fn: the function to be called 361 362 """ 363 try: 364 val = fn(*args) 365 if retry_count > 0 and retry_count < MAX_RETRIES: 366 Log("Idempotent %s succeeded after %d retries", 367 msg, MAX_RETRIES - retry_count) 368 return val 369 except Exception, err: # pylint: disable=W0703 370 if retry_count == 0: 371 Log("Non-idempotent %s failed, aborting", msg) 372 raise 373 elif retry_count == 1: 374 Log("Idempotent %s repeated failure, aborting", msg) 375 raise 376 else: 377 Log("Idempotent %s failed, retry #%d/%d: %s", 378 msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err) 379 self.MaybeRetry(retry_count - 1, msg, fn, *args)
380
381 - def _ExecOp(self, *ops):
382 """Execute one or more opcodes and manage the exec buffer. 383 384 @return: if only opcode has been passed, we return its result; 385 otherwise we return the list of results 386 387 """ 388 job_id = cli.SendJob(ops, cl=self.cl) 389 results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback) 390 if len(ops) == 1: 391 return results[0] 392 else: 393 return results
394
395 - def ExecOp(self, retry, *ops):
396 """Execute one or more opcodes and manage the exec buffer. 397 398 @return: if only opcode has been passed, we return its result; 399 otherwise we return the list of results 400 401 """ 402 if retry: 403 rval = MAX_RETRIES 404 else: 405 rval = 0 406 cli.SetGenericOpcodeOpts(ops, self.opts) 407 return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
408
409 - def ExecOrQueue(self, name, ops, post_process=None):
410 """Execute an opcode and manage the exec buffer.""" 411 if self.opts.parallel: 412 cli.SetGenericOpcodeOpts(ops, self.opts) 413 self.queued_ops.append((ops, name, post_process)) 414 else: 415 val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142 416 if post_process is not None: 417 post_process() 418 return val
419
420 - def StartBatch(self, retry):
421 """Start a new batch of jobs. 422 423 @param retry: whether this is a retryable batch 424 425 """ 426 self.queued_ops = [] 427 self.queue_retry = retry
428
429 - def CommitQueue(self):
430 """Execute all submitted opcodes in case of parallel burnin""" 431 if not self.opts.parallel or not self.queued_ops: 432 return 433 434 if self.queue_retry: 435 rval = MAX_RETRIES 436 else: 437 rval = 0 438 439 try: 440 results = self.MaybeRetry(rval, "jobset", self.ExecJobSet, 441 self.queued_ops) 442 finally: 443 self.queued_ops = [] 444 return results
445
446 - def ExecJobSet(self, jobs):
447 """Execute a set of jobs and return once all are done. 448 449 The method will return the list of results, if all jobs are 450 successful. Otherwise, OpExecError will be raised from within 451 cli.py. 452 453 """ 454 self.ClearFeedbackBuf() 455 jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback) 456 for ops, name, _ in jobs: 457 jex.QueueJob(name, *ops) # pylint: disable=W0142 458 try: 459 results = jex.GetResults() 460 except Exception, err: # pylint: disable=W0703 461 Log("Jobs failed: %s", err) 462 raise BurninFailure() 463 464 fail = False 465 val = [] 466 for (_, name, post_process), (success, result) in zip(jobs, results): 467 if success: 468 if post_process: 469 try: 470 post_process() 471 except Exception, err: # pylint: disable=W0703 472 Log("Post process call for job %s failed: %s", name, err) 473 fail = True 474 val.append(result) 475 else: 476 fail = True 477 478 if fail: 479 raise BurninFailure() 480 481 return val
482
483 - def ParseOptions(self):
484 """Parses the command line options. 485 486 In case of command line errors, it will show the usage and exit the 487 program. 488 489 """ 490 parser = optparse.OptionParser(usage="\n%s" % USAGE, 491 version=("%%prog (ganeti) %s" % 492 constants.RELEASE_VERSION), 493 option_list=OPTIONS) 494 495 options, args = parser.parse_args() 496 if len(args) < 1 or options.os is None: 497 Usage() 498 499 if options.mem_size: 500 options.maxmem_size = options.mem_size 501 options.minmem_size = options.mem_size 502 elif options.minmem_size > options.maxmem_size: 503 Err("Maximum memory lower than minimum memory") 504 505 if options.disk_template not in _SUPPORTED_DISK_TEMPLATES: 506 Err("Unknown or unsupported disk template '%s'" % options.disk_template) 507 508 if options.disk_template == constants.DT_DISKLESS: 509 disk_size = disk_growth = [] 510 options.do_addremove_disks = False 511 else: 512 disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")] 513 disk_growth = [utils.ParseUnit(v) 514 for v in options.disk_growth.split(",")] 515 if len(disk_growth) != len(disk_size): 516 Err("Wrong disk sizes/growth combination") 517 if ((disk_size and options.disk_template == constants.DT_DISKLESS) or 518 (not disk_size and options.disk_template != constants.DT_DISKLESS)): 519 Err("Wrong disk count/disk template combination") 520 521 self.disk_size = disk_size 522 self.disk_growth = disk_growth 523 self.disk_count = len(disk_size) 524 525 if options.nodes and options.iallocator: 526 Err("Give either the nodes option or the iallocator option, not both") 527 528 if options.http_check and not options.name_check: 529 Err("Can't enable HTTP checks without name checks") 530 531 self.opts = options 532 self.instances = args 533 self.bep = { 534 constants.BE_MINMEM: options.minmem_size, 535 constants.BE_MAXMEM: options.maxmem_size, 536 constants.BE_VCPUS: options.vcpu_count, 537 } 538 539 self.hypervisor = None 540 self.hvp = {} 541 if options.hypervisor: 542 self.hypervisor, self.hvp = options.hypervisor 543 544 if options.reboot_types is None: 545 options.reboot_types = constants.REBOOT_TYPES 546 else: 547 options.reboot_types = options.reboot_types.split(",") 548 rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES) 549 if rt_diff: 550 Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff)) 551 552 socket.setdefaulttimeout(options.net_timeout)
553
554 - def GetState(self):
555 """Read the cluster state from the master daemon.""" 556 if self.opts.nodes: 557 names = self.opts.nodes.split(",") 558 else: 559 names = [] 560 try: 561 qcl = GetClient() 562 result = qcl.QueryNodes(names, ["name", "offline", "drained"], False) 563 except errors.GenericError, err: 564 err_code, msg = cli.FormatError(err) 565 Err(msg, exit_code=err_code) 566 finally: 567 qcl.Close() 568 self.nodes = [data[0] for data in result if not (data[1] or data[2])] 569 570 op_diagnose = opcodes.OpOsDiagnose(output_fields=["name", 571 "variants", 572 "hidden"], 573 names=[]) 574 result = self.ExecOp(True, op_diagnose) 575 576 if not result: 577 Err("Can't get the OS list") 578 579 found = False 580 for (name, variants, _) in result: 581 if self.opts.os in cli.CalculateOSNames(name, variants): 582 found = True 583 break 584 585 if not found: 586 Err("OS '%s' not found" % self.opts.os) 587 588 cluster_info = self.cl.QueryClusterInfo() 589 self.cluster_info = cluster_info 590 if not self.cluster_info: 591 Err("Can't get cluster info") 592 593 default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT] 594 self.cluster_default_nicparams = default_nic_params 595 if self.hypervisor is None: 596 self.hypervisor = self.cluster_info["default_hypervisor"] 597 self.hv_can_migrate = \ 598 hypervisor.GetHypervisorClass(self.hypervisor).CAN_MIGRATE
599 600 @_DoCheckInstances 601 @_DoBatch(False)
602 - def BurnCreateInstances(self):
603 """Create the given instances. 604 605 """ 606 self.to_rem = [] 607 mytor = izip(cycle(self.nodes), 608 islice(cycle(self.nodes), 1, None), 609 self.instances) 610 611 Log("Creating instances") 612 for pnode, snode, instance in mytor: 613 Log("instance %s", instance, indent=1) 614 if self.opts.iallocator: 615 pnode = snode = None 616 msg = "with iallocator %s" % self.opts.iallocator 617 elif self.opts.disk_template not in constants.DTS_INT_MIRROR: 618 snode = None 619 msg = "on %s" % pnode 620 else: 621 msg = "on %s, %s" % (pnode, snode) 622 623 Log(msg, indent=2) 624 625 op = opcodes.OpInstanceCreate(instance_name=instance, 626 disks=[{"size": size} 627 for size in self.disk_size], 628 disk_template=self.opts.disk_template, 629 nics=self.opts.nics, 630 mode=constants.INSTANCE_CREATE, 631 os_type=self.opts.os, 632 pnode=pnode, 633 snode=snode, 634 start=True, 635 ip_check=self.opts.ip_check, 636 name_check=self.opts.name_check, 637 wait_for_sync=True, 638 file_driver="loop", 639 file_storage_dir=None, 640 iallocator=self.opts.iallocator, 641 beparams=self.bep, 642 hvparams=self.hvp, 643 hypervisor=self.hypervisor, 644 osparams=self.opts.osparams, 645 ) 646 remove_instance = lambda name: lambda: self.to_rem.append(name) 647 self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
648 649 @_DoBatch(False)
650 - def BurnModifyRuntimeMemory(self):
651 """Alter the runtime memory.""" 652 Log("Setting instance runtime memory") 653 for instance in self.instances: 654 Log("instance %s", instance, indent=1) 655 tgt_mem = self.bep[constants.BE_MINMEM] 656 op = opcodes.OpInstanceSetParams(instance_name=instance, 657 runtime_mem=tgt_mem) 658 Log("Set memory to %s MB", tgt_mem, indent=2) 659 self.ExecOrQueue(instance, [op])
660 661 @_DoBatch(False)
662 - def BurnGrowDisks(self):
663 """Grow both the os and the swap disks by the requested amount, if any.""" 664 Log("Growing disks") 665 for instance in self.instances: 666 Log("instance %s", instance, indent=1) 667 for idx, growth in enumerate(self.disk_growth): 668 if growth > 0: 669 op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx, 670 amount=growth, wait_for_sync=True, 671 ignore_ipolicy=True) 672 Log("increase disk/%s by %s MB", idx, growth, indent=2) 673 self.ExecOrQueue(instance, [op])
674 675 @_DoBatch(True)
676 - def BurnReplaceDisks1D8(self):
677 """Replace disks on primary and secondary for drbd8.""" 678 Log("Replacing disks on the same nodes") 679 early_release = self.opts.early_release 680 for instance in self.instances: 681 Log("instance %s", instance, indent=1) 682 ops = [] 683 for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI: 684 op = opcodes.OpInstanceReplaceDisks(instance_name=instance, 685 mode=mode, 686 disks=list(range(self.disk_count)), 687 early_release=early_release) 688 Log("run %s", mode, indent=2) 689 ops.append(op) 690 self.ExecOrQueue(instance, ops)
691 692 @_DoBatch(True)
693 - def BurnReplaceDisks2(self):
694 """Replace secondary node.""" 695 Log("Changing the secondary node") 696 mode = constants.REPLACE_DISK_CHG 697 698 mytor = izip(islice(cycle(self.nodes), 2, None), 699 self.instances) 700 for tnode, instance in mytor: 701 Log("instance %s", instance, indent=1) 702 if self.opts.iallocator: 703 tnode = None 704 msg = "with iallocator %s" % self.opts.iallocator 705 else: 706 msg = tnode 707 op = opcodes.OpInstanceReplaceDisks(instance_name=instance, 708 mode=mode, 709 remote_node=tnode, 710 iallocator=self.opts.iallocator, 711 disks=[], 712 early_release=self.opts.early_release) 713 Log("run %s %s", mode, msg, indent=2) 714 self.ExecOrQueue(instance, [op])
715 716 @_DoCheckInstances 717 @_DoBatch(False)
718 - def BurnFailover(self):
719 """Failover the instances.""" 720 Log("Failing over instances") 721 for instance in self.instances: 722 Log("instance %s", instance, indent=1) 723 op = opcodes.OpInstanceFailover(instance_name=instance, 724 ignore_consistency=False) 725 self.ExecOrQueue(instance, [op])
726 727 @_DoCheckInstances 728 @_DoBatch(False)
729 - def BurnMove(self):
730 """Move the instances.""" 731 Log("Moving instances") 732 mytor = izip(islice(cycle(self.nodes), 1, None), 733 self.instances) 734 for tnode, instance in mytor: 735 Log("instance %s", instance, indent=1) 736 op = opcodes.OpInstanceMove(instance_name=instance, 737 target_node=tnode) 738 self.ExecOrQueue(instance, [op])
739 740 @_DoBatch(False)
741 - def BurnMigrate(self):
742 """Migrate the instances.""" 743 Log("Migrating instances") 744 for instance in self.instances: 745 Log("instance %s", instance, indent=1) 746 op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, 747 cleanup=False) 748 749 op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, 750 cleanup=True) 751 Log("migration and migration cleanup", indent=2) 752 self.ExecOrQueue(instance, [op1, op2])
753 754 @_DoCheckInstances 755 @_DoBatch(False)
756 - def BurnImportExport(self):
757 """Export the instance, delete it, and import it back. 758 759 """ 760 Log("Exporting and re-importing instances") 761 mytor = izip(cycle(self.nodes), 762 islice(cycle(self.nodes), 1, None), 763 islice(cycle(self.nodes), 2, None), 764 self.instances) 765 766 qcl = GetClient() 767 for pnode, snode, enode, instance in mytor: 768 Log("instance %s", instance, indent=1) 769 # read the full name of the instance 770 ((full_name, ), ) = qcl.QueryInstances([instance], ["name"], False) 771 772 if self.opts.iallocator: 773 pnode = snode = None 774 import_log_msg = ("import from %s" 775 " with iallocator %s" % 776 (enode, self.opts.iallocator)) 777 elif self.opts.disk_template not in constants.DTS_INT_MIRROR: 778 snode = None 779 import_log_msg = ("import from %s to %s" % 780 (enode, pnode)) 781 else: 782 import_log_msg = ("import from %s to %s, %s" % 783 (enode, pnode, snode)) 784 785 exp_op = opcodes.OpBackupExport(instance_name=instance, 786 target_node=enode, 787 mode=constants.EXPORT_MODE_LOCAL, 788 shutdown=True) 789 rem_op = opcodes.OpInstanceRemove(instance_name=instance, 790 ignore_failures=True) 791 imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name) 792 imp_op = opcodes.OpInstanceCreate(instance_name=instance, 793 disks=[{"size": size} 794 for size in self.disk_size], 795 disk_template=self.opts.disk_template, 796 nics=self.opts.nics, 797 mode=constants.INSTANCE_IMPORT, 798 src_node=enode, 799 src_path=imp_dir, 800 pnode=pnode, 801 snode=snode, 802 start=True, 803 ip_check=self.opts.ip_check, 804 name_check=self.opts.name_check, 805 wait_for_sync=True, 806 file_storage_dir=None, 807 file_driver="loop", 808 iallocator=self.opts.iallocator, 809 beparams=self.bep, 810 hvparams=self.hvp, 811 osparams=self.opts.osparams, 812 ) 813 814 erem_op = opcodes.OpBackupRemove(instance_name=instance) 815 816 Log("export to node %s", enode, indent=2) 817 Log("remove instance", indent=2) 818 Log(import_log_msg, indent=2) 819 Log("remove export", indent=2) 820 self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op]) 821 qcl.Close()
822 823 @staticmethod
824 - def StopInstanceOp(instance):
825 """Stop given instance.""" 826 return opcodes.OpInstanceShutdown(instance_name=instance)
827 828 @staticmethod
829 - def StartInstanceOp(instance):
830 """Start given instance.""" 831 return opcodes.OpInstanceStartup(instance_name=instance, force=False)
832 833 @staticmethod
834 - def RenameInstanceOp(instance, instance_new, name_check, ip_check):
835 """Rename instance.""" 836 return opcodes.OpInstanceRename(instance_name=instance, 837 new_name=instance_new, 838 name_check=name_check, 839 ip_check=ip_check)
840 841 @_DoCheckInstances 842 @_DoBatch(True)
843 - def BurnStopStart(self):
844 """Stop/start the instances.""" 845 Log("Stopping and starting instances") 846 for instance in self.instances: 847 Log("instance %s", instance, indent=1) 848 op1 = self.StopInstanceOp(instance) 849 op2 = self.StartInstanceOp(instance) 850 self.ExecOrQueue(instance, [op1, op2])
851 852 @_DoBatch(False)
853 - def BurnRemove(self):
854 """Remove the instances.""" 855 Log("Removing instances") 856 for instance in self.to_rem: 857 Log("instance %s", instance, indent=1) 858 op = opcodes.OpInstanceRemove(instance_name=instance, 859 ignore_failures=True) 860 self.ExecOrQueue(instance, [op])
861
862 - def BurnRename(self, name_check, ip_check):
863 """Rename the instances. 864 865 Note that this function will not execute in parallel, since we 866 only have one target for rename. 867 868 """ 869 Log("Renaming instances") 870 rename = self.opts.rename 871 for instance in self.instances: 872 Log("instance %s", instance, indent=1) 873 op_stop1 = self.StopInstanceOp(instance) 874 op_stop2 = self.StopInstanceOp(rename) 875 op_rename1 = self.RenameInstanceOp(instance, rename, name_check, ip_check) 876 op_rename2 = self.RenameInstanceOp(rename, instance, name_check, ip_check) 877 op_start1 = self.StartInstanceOp(rename) 878 op_start2 = self.StartInstanceOp(instance) 879 self.ExecOp(False, op_stop1, op_rename1, op_start1) 880 self._CheckInstanceAlive(rename) 881 self.ExecOp(False, op_stop2, op_rename2, op_start2) 882 self._CheckInstanceAlive(instance)
883 884 @_DoCheckInstances 885 @_DoBatch(True)
886 - def BurnReinstall(self):
887 """Reinstall the instances.""" 888 Log("Reinstalling instances") 889 for instance in self.instances: 890 Log("instance %s", instance, indent=1) 891 op1 = self.StopInstanceOp(instance) 892 op2 = opcodes.OpInstanceReinstall(instance_name=instance) 893 Log("reinstall without passing the OS", indent=2) 894 op3 = opcodes.OpInstanceReinstall(instance_name=instance, 895 os_type=self.opts.os) 896 Log("reinstall specifying the OS", indent=2) 897 op4 = self.StartInstanceOp(instance) 898 self.ExecOrQueue(instance, [op1, op2, op3, op4])
899 900 @_DoCheckInstances 901 @_DoBatch(True)
902 - def BurnReboot(self):
903 """Reboot the instances.""" 904 Log("Rebooting instances") 905 for instance in self.instances: 906 Log("instance %s", instance, indent=1) 907 ops = [] 908 for reboot_type in self.opts.reboot_types: 909 op = opcodes.OpInstanceReboot(instance_name=instance, 910 reboot_type=reboot_type, 911 ignore_secondaries=False) 912 Log("reboot with type '%s'", reboot_type, indent=2) 913 ops.append(op) 914 self.ExecOrQueue(instance, ops)
915 916 @_DoCheckInstances 917 @_DoBatch(True)
918 - def BurnRenameSame(self, name_check, ip_check):
919 """Rename the instances to their own name.""" 920 Log("Renaming the instances to their own name") 921 for instance in self.instances: 922 Log("instance %s", instance, indent=1) 923 op1 = self.StopInstanceOp(instance) 924 op2 = self.RenameInstanceOp(instance, instance, name_check, ip_check) 925 Log("rename to the same name", indent=2) 926 op4 = self.StartInstanceOp(instance) 927 self.ExecOrQueue(instance, [op1, op2, op4])
928 929 @_DoCheckInstances 930 @_DoBatch(True)
931 - def BurnActivateDisks(self):
932 """Activate and deactivate disks of the instances.""" 933 Log("Activating/deactivating disks") 934 for instance in self.instances: 935 Log("instance %s", instance, indent=1) 936 op_start = self.StartInstanceOp(instance) 937 op_act = opcodes.OpInstanceActivateDisks(instance_name=instance) 938 op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance) 939 op_stop = self.StopInstanceOp(instance) 940 Log("activate disks when online", indent=2) 941 Log("activate disks when offline", indent=2) 942 Log("deactivate disks (when offline)", indent=2) 943 self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
944 945 @_DoCheckInstances 946 @_DoBatch(False)
947 - def BurnAddRemoveDisks(self):
948 """Add and remove an extra disk for the instances.""" 949 Log("Adding and removing disks") 950 for instance in self.instances: 951 Log("instance %s", instance, indent=1) 952 op_add = opcodes.OpInstanceSetParams( 953 instance_name=instance, 954 disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})]) 955 op_rem = opcodes.OpInstanceSetParams( 956 instance_name=instance, disks=[(constants.DDM_REMOVE, {})]) 957 op_stop = self.StopInstanceOp(instance) 958 op_start = self.StartInstanceOp(instance) 959 Log("adding a disk", indent=2) 960 Log("removing last disk", indent=2) 961 self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
962 963 @_DoBatch(False)
964 - def BurnAddRemoveNICs(self):
965 """Add, change and remove an extra NIC for the instances.""" 966 Log("Adding and removing NICs") 967 for instance in self.instances: 968 Log("instance %s", instance, indent=1) 969 op_add = opcodes.OpInstanceSetParams( 970 instance_name=instance, nics=[(constants.DDM_ADD, {})]) 971 op_chg = opcodes.OpInstanceSetParams( 972 instance_name=instance, nics=[(constants.DDM_MODIFY, 973 -1, {"mac": constants.VALUE_GENERATE})]) 974 op_rem = opcodes.OpInstanceSetParams( 975 instance_name=instance, nics=[(constants.DDM_REMOVE, {})]) 976 Log("adding a NIC", indent=2) 977 Log("changing a NIC", indent=2) 978 Log("removing last NIC", indent=2) 979 self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
980
981 - def ConfdCallback(self, reply):
982 """Callback for confd queries""" 983 if reply.type == confd_client.UPCALL_REPLY: 984 if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK: 985 Err("Query %s gave non-ok status %s: %s" % (reply.orig_request, 986 reply.server_reply.status, 987 reply.server_reply)) 988 if reply.orig_request.type == constants.CONFD_REQ_PING: 989 Log("Ping: OK", indent=1) 990 elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER: 991 if reply.server_reply.answer == self.cluster_info["master"]: 992 Log("Master: OK", indent=1) 993 else: 994 Err("Master: wrong: %s" % reply.server_reply.answer) 995 elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME: 996 if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER: 997 Log("Node role for master: OK", indent=1) 998 else: 999 Err("Node role for master: wrong: %s" % reply.server_reply.answer)
1000
1001 - def DoConfdRequestReply(self, req):
1002 self.confd_counting_callback.RegisterQuery(req.rsalt) 1003 self.confd_client.SendRequest(req, async=False) 1004 while not self.confd_counting_callback.AllAnswered(): 1005 if not self.confd_client.ReceiveReply(): 1006 Err("Did not receive all expected confd replies") 1007 break
1008
1009 - def BurnConfd(self):
1010 """Run confd queries for our instances. 1011 1012 The following confd queries are tested: 1013 - CONFD_REQ_PING: simple ping 1014 - CONFD_REQ_CLUSTER_MASTER: cluster master 1015 - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master 1016 1017 """ 1018 Log("Checking confd results") 1019 1020 filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback) 1021 counting_callback = confd_client.ConfdCountingCallback(filter_callback) 1022 self.confd_counting_callback = counting_callback 1023 1024 self.confd_client = confd_client.GetConfdClient(counting_callback) 1025 1026 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) 1027 self.DoConfdRequestReply(req) 1028 1029 req = confd_client.ConfdClientRequest( 1030 type=constants.CONFD_REQ_CLUSTER_MASTER) 1031 self.DoConfdRequestReply(req) 1032 1033 req = confd_client.ConfdClientRequest( 1034 type=constants.CONFD_REQ_NODE_ROLE_BYNAME, 1035 query=self.cluster_info["master"]) 1036 self.DoConfdRequestReply(req)
1037
1038 - def _CheckInstanceAlive(self, instance):
1039 """Check if an instance is alive by doing http checks. 1040 1041 This will try to retrieve the url on the instance /hostname.txt 1042 and check that it contains the hostname of the instance. In case 1043 we get ECONNREFUSED, we retry up to the net timeout seconds, for 1044 any other error we abort. 1045 1046 """ 1047 if not self.opts.http_check: 1048 return 1049 end_time = time.time() + self.opts.net_timeout 1050 url = None 1051 while time.time() < end_time and url is None: 1052 try: 1053 url = self.url_opener.open("http://%s/hostname.txt" % instance) 1054 except IOError: 1055 # here we can have connection refused, no route to host, etc. 1056 time.sleep(1) 1057 if url is None: 1058 raise InstanceDown(instance, "Cannot contact instance") 1059 hostname = url.read().strip() 1060 url.close() 1061 if hostname != instance: 1062 raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" % 1063 (instance, hostname)))
1064
1065 - def BurninCluster(self):
1066 """Test a cluster intensively. 1067 1068 This will create instances and then start/stop/failover them. 1069 It is safe for existing instances but could impact performance. 1070 1071 """ 1072 1073 Log("Testing global parameters") 1074 1075 if (len(self.nodes) == 1 and 1076 self.opts.disk_template not in _SINGLE_NODE_DISK_TEMPLATES): 1077 Err("When one node is available/selected the disk template must" 1078 " be one of %s" % utils.CommaJoin(_SINGLE_NODE_DISK_TEMPLATES)) 1079 1080 has_err = True 1081 try: 1082 self.BurnCreateInstances() 1083 1084 if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]: 1085 self.BurnModifyRuntimeMemory() 1086 1087 if self.opts.do_replace1 and \ 1088 self.opts.disk_template in constants.DTS_INT_MIRROR: 1089 self.BurnReplaceDisks1D8() 1090 if (self.opts.do_replace2 and len(self.nodes) > 2 and 1091 self.opts.disk_template in constants.DTS_INT_MIRROR): 1092 self.BurnReplaceDisks2() 1093 1094 if (self.opts.disk_template in constants.DTS_GROWABLE and 1095 compat.any(n > 0 for n in self.disk_growth)): 1096 self.BurnGrowDisks() 1097 1098 if self.opts.do_failover and \ 1099 self.opts.disk_template in constants.DTS_MIRRORED: 1100 self.BurnFailover() 1101 1102 if self.opts.do_migrate: 1103 if self.opts.disk_template not in constants.DTS_MIRRORED: 1104 Log("Skipping migration (disk template %s does not support it)", 1105 self.opts.disk_template) 1106 elif not self.hv_can_migrate: 1107 Log("Skipping migration (hypervisor %s does not support it)", 1108 self.hypervisor) 1109 else: 1110 self.BurnMigrate() 1111 1112 if (self.opts.do_move and len(self.nodes) > 1 and 1113 self.opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]): 1114 self.BurnMove() 1115 1116 if (self.opts.do_importexport and 1117 self.opts.disk_template in _IMPEXP_DISK_TEMPLATES): 1118 self.BurnImportExport() 1119 1120 if self.opts.do_reinstall: 1121 self.BurnReinstall() 1122 1123 if self.opts.do_reboot: 1124 self.BurnReboot() 1125 1126 if self.opts.do_renamesame: 1127 self.BurnRenameSame(self.opts.name_check, self.opts.ip_check) 1128 1129 if self.opts.do_addremove_disks: 1130 self.BurnAddRemoveDisks() 1131 1132 default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE] 1133 # Don't add/remove nics in routed mode, as we would need an ip to add 1134 # them with 1135 if self.opts.do_addremove_nics: 1136 if default_nic_mode == constants.NIC_MODE_BRIDGED: 1137 self.BurnAddRemoveNICs() 1138 else: 1139 Log("Skipping nic add/remove as the cluster is not in bridged mode") 1140 1141 if self.opts.do_activate_disks: 1142 self.BurnActivateDisks() 1143 1144 if self.opts.rename: 1145 self.BurnRename(self.opts.name_check, self.opts.ip_check) 1146 1147 if self.opts.do_confd_tests: 1148 self.BurnConfd() 1149 1150 if self.opts.do_startstop: 1151 self.BurnStopStart() 1152 1153 has_err = False 1154 finally: 1155 if has_err: 1156 Log("Error detected: opcode buffer follows:\n\n") 1157 Log(self.GetFeedbackBuf()) 1158 Log("\n\n") 1159 if not self.opts.keep_instances: 1160 try: 1161 self.BurnRemove() 1162 except Exception, err: # pylint: disable=W0703 1163 if has_err: # already detected errors, so errors in removal 1164 # are quite expected 1165 Log("Note: error detected during instance remove: %s", err) 1166 else: # non-expected error 1167 raise 1168 1169 return constants.EXIT_SUCCESS
1170
1171 1172 -def Main():
1173 """Main function. 1174 1175 """ 1176 utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0], 1177 debug=False, stderr_logging=True) 1178 1179 return Burner().BurninCluster()
1180