Package ganeti :: Package tools :: Module burnin
[hide private]
[frames] | no frames]

Source Code for Module ganeti.tools.burnin

   1  #!/usr/bin/python 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30   
  31  """Burnin program 
  32   
  33  """ 
  34   
  35  import sys 
  36  import optparse 
  37  import time 
  38  import socket 
  39  import urllib 
  40  from itertools import izip, islice, cycle 
  41  from cStringIO import StringIO 
  42   
  43  from ganeti import opcodes 
  44  from ganeti import constants 
  45  from ganeti import cli 
  46  from ganeti import errors 
  47  from ganeti import utils 
  48  from ganeti import hypervisor 
  49  from ganeti import compat 
  50  from ganeti import pathutils 
  51   
  52  from ganeti.confd import client as confd_client 
  53   
  54   
  55  USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") 
  56   
  57  MAX_RETRIES = 3 
  58  LOG_HEADERS = { 
  59    0: "- ", 
  60    1: "* ", 
  61    2: "", 
  62    } 
  63   
  64  #: Disk templates supporting a single node 
  65  _SINGLE_NODE_DISK_TEMPLATES = compat.UniqueFrozenset([ 
  66    constants.DT_DISKLESS, 
  67    constants.DT_PLAIN, 
  68    constants.DT_FILE, 
  69    constants.DT_SHARED_FILE, 
  70    constants.DT_EXT, 
  71    constants.DT_RBD, 
  72    ]) 
  73   
  74  _SUPPORTED_DISK_TEMPLATES = compat.UniqueFrozenset([ 
  75    constants.DT_DISKLESS, 
  76    constants.DT_DRBD8, 
  77    constants.DT_EXT, 
  78    constants.DT_FILE, 
  79    constants.DT_PLAIN, 
  80    constants.DT_RBD, 
  81    constants.DT_SHARED_FILE, 
  82    ]) 
  83   
  84  #: Disk templates for which import/export is tested 
  85  _IMPEXP_DISK_TEMPLATES = (_SUPPORTED_DISK_TEMPLATES - frozenset([ 
  86    constants.DT_DISKLESS, 
  87    constants.DT_FILE, 
  88    constants.DT_SHARED_FILE, 
  89    ])) 
90 91 92 -class InstanceDown(Exception):
93 """The checked instance was not up"""
94
95 96 -class BurninFailure(Exception):
97 """Failure detected during burning"""
98
99 100 -def Usage():
101 """Shows program usage information and exits the program.""" 102 103 print >> sys.stderr, "Usage:" 104 print >> sys.stderr, USAGE 105 sys.exit(2)
106
107 108 -def Log(msg, *args, **kwargs):
109 """Simple function that prints out its argument. 110 111 """ 112 if args: 113 msg = msg % args 114 indent = kwargs.get("indent", 0) 115 sys.stdout.write("%*s%s%s\n" % (2 * indent, "", 116 LOG_HEADERS.get(indent, " "), msg)) 117 sys.stdout.flush()
118
119 120 -def Err(msg, exit_code=1):
121 """Simple error logging that prints to stderr. 122 123 """ 124 sys.stderr.write(msg + "\n") 125 sys.stderr.flush() 126 sys.exit(exit_code)
127
128 129 -class SimpleOpener(urllib.FancyURLopener):
130 """A simple url opener""" 131 # pylint: disable=W0221 132
133 - def prompt_user_passwd(self, host, realm, clear_cache=0):
134 """No-interaction version of prompt_user_passwd.""" 135 # we follow parent class' API 136 # pylint: disable=W0613 137 return None, None
138
139 - def http_error_default(self, url, fp, errcode, errmsg, headers):
140 """Custom error handling""" 141 # make sure sockets are not left in CLOSE_WAIT, this is similar 142 # but with a different exception to the BasicURLOpener class 143 _ = fp.read() # throw away data 144 fp.close() 145 raise InstanceDown("HTTP error returned: code %s, msg %s" % 146 (errcode, errmsg))
147 148 149 OPTIONS = [ 150 cli.cli_option("-o", "--os", dest="os", default=None, 151 help="OS to use during burnin", 152 metavar="<OS>", 153 completion_suggest=cli.OPT_COMPL_ONE_OS), 154 cli.HYPERVISOR_OPT, 155 cli.OSPARAMS_OPT, 156 cli.cli_option("--disk-size", dest="disk_size", 157 help="Disk size (determines disk count)", 158 default="128m", type="string", metavar="<size,size,...>", 159 completion_suggest=("128M 512M 1G 4G 1G,256M" 160 " 4G,1G,1G 10G").split()), 161 cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth", 162 default="128m", type="string", metavar="<size,size,...>"), 163 cli.cli_option("--mem-size", dest="mem_size", help="Memory size", 164 default=None, type="unit", metavar="<size>", 165 completion_suggest=("128M 256M 512M 1G 4G 8G" 166 " 12G 16G").split()), 167 cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size", 168 default=256, type="unit", metavar="<size>", 169 completion_suggest=("128M 256M 512M 1G 4G 8G" 170 " 12G 16G").split()), 171 cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size", 172 default=128, type="unit", metavar="<size>", 173 completion_suggest=("128M 256M 512M 1G 4G 8G" 174 " 12G 16G").split()), 175 cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count", 176 default=3, type="unit", metavar="<count>", 177 completion_suggest=("1 2 3 4").split()), 178 cli.DEBUG_OPT, 179 cli.VERBOSE_OPT, 180 cli.NOIPCHECK_OPT, 181 cli.NONAMECHECK_OPT, 182 cli.EARLY_RELEASE_OPT, 183 cli.cli_option("--no-replace1", dest="do_replace1", 184 help="Skip disk replacement with the same secondary", 185 action="store_false", default=True), 186 cli.cli_option("--no-replace2", dest="do_replace2", 187 help="Skip disk replacement with a different secondary", 188 action="store_false", default=True), 189 cli.cli_option("--no-failover", dest="do_failover", 190 help="Skip instance failovers", action="store_false", 191 default=True), 192 cli.cli_option("--no-migrate", dest="do_migrate", 193 help="Skip instance live migration", 194 action="store_false", default=True), 195 cli.cli_option("--no-move", dest="do_move", 196 help="Skip instance moves", action="store_false", 197 default=True), 198 cli.cli_option("--no-importexport", dest="do_importexport", 199 help="Skip instance export/import", action="store_false", 200 default=True), 201 cli.cli_option("--no-startstop", dest="do_startstop", 202 help="Skip instance stop/start", action="store_false", 203 default=True), 204 cli.cli_option("--no-reinstall", dest="do_reinstall", 205 help="Skip instance reinstall", action="store_false", 206 default=True), 207 cli.cli_option("--no-reboot", dest="do_reboot", 208 help="Skip instance reboot", action="store_false", 209 default=True), 210 cli.cli_option("--no-renamesame", dest="do_renamesame", 211 help="Skip instance rename to same name", action="store_false", 212 default=True), 213 cli.cli_option("--reboot-types", dest="reboot_types", 214 help="Specify the reboot types", default=None), 215 cli.cli_option("--no-activate-disks", dest="do_activate_disks", 216 help="Skip disk activation/deactivation", 217 action="store_false", default=True), 218 cli.cli_option("--no-add-disks", dest="do_addremove_disks", 219 help="Skip disk addition/removal", 220 action="store_false", default=True), 221 cli.cli_option("--no-add-nics", dest="do_addremove_nics", 222 help="Skip NIC addition/removal", 223 action="store_false", default=True), 224 cli.cli_option("--no-nics", dest="nics", 225 help="No network interfaces", action="store_const", 226 const=[], default=[{}]), 227 cli.cli_option("--no-confd", dest="do_confd_tests", 228 help="Skip confd queries", 229 action="store_false", default=constants.ENABLE_CONFD), 230 cli.cli_option("--rename", dest="rename", default=None, 231 help=("Give one unused instance name which is taken" 232 " to start the renaming sequence"), 233 metavar="<instance_name>"), 234 cli.cli_option("-t", "--disk-template", dest="disk_template", 235 choices=list(_SUPPORTED_DISK_TEMPLATES), 236 default=constants.DT_DRBD8, 237 help=("Disk template (default %s, otherwise one of %s)" % 238 (constants.DT_DRBD8, 239 utils.CommaJoin(_SUPPORTED_DISK_TEMPLATES)))), 240 cli.cli_option("-n", "--nodes", dest="nodes", default="", 241 help=("Comma separated list of nodes to perform" 242 " the burnin on (defaults to all nodes)"), 243 completion_suggest=cli.OPT_COMPL_MANY_NODES), 244 cli.cli_option("-I", "--iallocator", dest="iallocator", 245 default=None, type="string", 246 help=("Perform the allocation using an iallocator" 247 " instead of fixed node spread (node restrictions no" 248 " longer apply, therefore -n/--nodes must not be" 249 " used"), 250 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR), 251 cli.cli_option("-p", "--parallel", default=False, action="store_true", 252 dest="parallel", 253 help=("Enable parallelization of some operations in" 254 " order to speed burnin or to test granular locking")), 255 cli.cli_option("--net-timeout", default=15, type="int", 256 dest="net_timeout", 257 help=("The instance check network timeout in seconds" 258 " (defaults to 15 seconds)"), 259 completion_suggest="15 60 300 900".split()), 260 cli.cli_option("-C", "--http-check", default=False, action="store_true", 261 dest="http_check", 262 help=("Enable checking of instance status via http," 263 " looking for /hostname.txt that should contain the" 264 " name of the instance")), 265 cli.cli_option("-K", "--keep-instances", default=False, 266 action="store_true", 267 dest="keep_instances", 268 help=("Leave instances on the cluster after burnin," 269 " for investigation in case of errors or simply" 270 " to use them")), 271 cli.REASON_OPT, 272 ] 273 274 # Mainly used for bash completion 275 ARGUMENTS = [cli.ArgInstance(min=1)]
276 277 278 -def _DoCheckInstances(fn):
279 """Decorator for checking instances. 280 281 """ 282 def wrapper(self, *args, **kwargs): 283 val = fn(self, *args, **kwargs) 284 for instance in self.instances: 285 self._CheckInstanceAlive(instance) # pylint: disable=W0212 286 return val
287 288 return wrapper 289
290 291 -def _DoBatch(retry):
292 """Decorator for possible batch operations. 293 294 Must come after the _DoCheckInstances decorator (if any). 295 296 @param retry: whether this is a retryable batch, will be 297 passed to StartBatch 298 299 """ 300 def wrap(fn): 301 def batched(self, *args, **kwargs): 302 self.StartBatch(retry) 303 val = fn(self, *args, **kwargs) 304 self.CommitQueue() 305 return val
306 return batched 307 308 return wrap 309
310 311 -class Burner(object):
312 """Burner class.""" 313
314 - def __init__(self):
315 """Constructor.""" 316 self.url_opener = SimpleOpener() 317 self._feed_buf = StringIO() 318 self.nodes = [] 319 self.instances = [] 320 self.to_rem = [] 321 self.queued_ops = [] 322 self.opts = None 323 self.queue_retry = False 324 self.disk_count = self.disk_growth = self.disk_size = None 325 self.hvp = self.bep = None 326 self.ParseOptions() 327 self.cl = cli.GetClient() 328 self.GetState()
329
330 - def ClearFeedbackBuf(self):
331 """Clear the feedback buffer.""" 332 self._feed_buf.truncate(0)
333
334 - def GetFeedbackBuf(self):
335 """Return the contents of the buffer.""" 336 return self._feed_buf.getvalue()
337
338 - def Feedback(self, msg):
339 """Acumulate feedback in our buffer.""" 340 formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2]) 341 self._feed_buf.write(formatted_msg + "\n") 342 if self.opts.verbose: 343 Log(formatted_msg, indent=3)
344
345 - def MaybeRetry(self, retry_count, msg, fn, *args):
346 """Possibly retry a given function execution. 347 348 @type retry_count: int 349 @param retry_count: retry counter: 350 - 0: non-retryable action 351 - 1: last retry for a retryable action 352 - MAX_RETRIES: original try for a retryable action 353 @type msg: str 354 @param msg: the kind of the operation 355 @type fn: callable 356 @param fn: the function to be called 357 358 """ 359 try: 360 val = fn(*args) 361 if retry_count > 0 and retry_count < MAX_RETRIES: 362 Log("Idempotent %s succeeded after %d retries", 363 msg, MAX_RETRIES - retry_count) 364 return val 365 except Exception, err: # pylint: disable=W0703 366 if retry_count == 0: 367 Log("Non-idempotent %s failed, aborting", msg) 368 raise 369 elif retry_count == 1: 370 Log("Idempotent %s repeated failure, aborting", msg) 371 raise 372 else: 373 Log("Idempotent %s failed, retry #%d/%d: %s", 374 msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err) 375 self.MaybeRetry(retry_count - 1, msg, fn, *args)
376
377 - def _ExecOp(self, *ops):
378 """Execute one or more opcodes and manage the exec buffer. 379 380 @return: if only opcode has been passed, we return its result; 381 otherwise we return the list of results 382 383 """ 384 job_id = cli.SendJob(ops, cl=self.cl) 385 results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback) 386 if len(ops) == 1: 387 return results[0] 388 else: 389 return results
390
391 - def ExecOp(self, retry, *ops):
392 """Execute one or more opcodes and manage the exec buffer. 393 394 @return: if only opcode has been passed, we return its result; 395 otherwise we return the list of results 396 397 """ 398 if retry: 399 rval = MAX_RETRIES 400 else: 401 rval = 0 402 cli.SetGenericOpcodeOpts(ops, self.opts) 403 return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
404
405 - def ExecOrQueue(self, name, ops, post_process=None):
406 """Execute an opcode and manage the exec buffer.""" 407 if self.opts.parallel: 408 cli.SetGenericOpcodeOpts(ops, self.opts) 409 self.queued_ops.append((ops, name, post_process)) 410 else: 411 val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142 412 if post_process is not None: 413 post_process() 414 return val
415
416 - def StartBatch(self, retry):
417 """Start a new batch of jobs. 418 419 @param retry: whether this is a retryable batch 420 421 """ 422 self.queued_ops = [] 423 self.queue_retry = retry
424
425 - def CommitQueue(self):
426 """Execute all submitted opcodes in case of parallel burnin""" 427 if not self.opts.parallel or not self.queued_ops: 428 return 429 430 if self.queue_retry: 431 rval = MAX_RETRIES 432 else: 433 rval = 0 434 435 try: 436 results = self.MaybeRetry(rval, "jobset", self.ExecJobSet, 437 self.queued_ops) 438 finally: 439 self.queued_ops = [] 440 return results
441
442 - def ExecJobSet(self, jobs):
443 """Execute a set of jobs and return once all are done. 444 445 The method will return the list of results, if all jobs are 446 successful. Otherwise, OpExecError will be raised from within 447 cli.py. 448 449 """ 450 self.ClearFeedbackBuf() 451 jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback) 452 for ops, name, _ in jobs: 453 jex.QueueJob(name, *ops) # pylint: disable=W0142 454 try: 455 results = jex.GetResults() 456 except Exception, err: # pylint: disable=W0703 457 Log("Jobs failed: %s", err) 458 raise BurninFailure() 459 460 fail = False 461 val = [] 462 for (_, name, post_process), (success, result) in zip(jobs, results): 463 if success: 464 if post_process: 465 try: 466 post_process() 467 except Exception, err: # pylint: disable=W0703 468 Log("Post process call for job %s failed: %s", name, err) 469 fail = True 470 val.append(result) 471 else: 472 fail = True 473 474 if fail: 475 raise BurninFailure() 476 477 return val
478
479 - def ParseOptions(self):
480 """Parses the command line options. 481 482 In case of command line errors, it will show the usage and exit the 483 program. 484 485 """ 486 parser = optparse.OptionParser(usage="\n%s" % USAGE, 487 version=("%%prog (ganeti) %s" % 488 constants.RELEASE_VERSION), 489 option_list=OPTIONS) 490 491 options, args = parser.parse_args() 492 if len(args) < 1 or options.os is None: 493 Usage() 494 495 if options.mem_size: 496 options.maxmem_size = options.mem_size 497 options.minmem_size = options.mem_size 498 elif options.minmem_size > options.maxmem_size: 499 Err("Maximum memory lower than minimum memory") 500 501 if options.disk_template not in _SUPPORTED_DISK_TEMPLATES: 502 Err("Unknown or unsupported disk template '%s'" % options.disk_template) 503 504 if options.disk_template == constants.DT_DISKLESS: 505 disk_size = disk_growth = [] 506 options.do_addremove_disks = False 507 else: 508 disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")] 509 disk_growth = [utils.ParseUnit(v) 510 for v in options.disk_growth.split(",")] 511 if len(disk_growth) != len(disk_size): 512 Err("Wrong disk sizes/growth combination") 513 if ((disk_size and options.disk_template == constants.DT_DISKLESS) or 514 (not disk_size and options.disk_template != constants.DT_DISKLESS)): 515 Err("Wrong disk count/disk template combination") 516 517 self.disk_size = disk_size 518 self.disk_growth = disk_growth 519 self.disk_count = len(disk_size) 520 521 if options.nodes and options.iallocator: 522 Err("Give either the nodes option or the iallocator option, not both") 523 524 if options.http_check and not options.name_check: 525 Err("Can't enable HTTP checks without name checks") 526 527 self.opts = options 528 self.instances = args 529 self.bep = { 530 constants.BE_MINMEM: options.minmem_size, 531 constants.BE_MAXMEM: options.maxmem_size, 532 constants.BE_VCPUS: options.vcpu_count, 533 } 534 535 self.hypervisor = None 536 self.hvp = {} 537 if options.hypervisor: 538 self.hypervisor, self.hvp = options.hypervisor 539 540 if options.reboot_types is None: 541 options.reboot_types = constants.REBOOT_TYPES 542 else: 543 options.reboot_types = options.reboot_types.split(",") 544 rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES) 545 if rt_diff: 546 Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff)) 547 548 socket.setdefaulttimeout(options.net_timeout)
549
550 - def GetState(self):
551 """Read the cluster state from the master daemon.""" 552 if self.opts.nodes: 553 names = self.opts.nodes.split(",") 554 else: 555 names = [] 556 try: 557 op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"], 558 names=names, use_locking=True) 559 result = self.ExecOp(True, op) 560 except errors.GenericError, err: 561 err_code, msg = cli.FormatError(err) 562 Err(msg, exit_code=err_code) 563 self.nodes = [data[0] for data in result if not (data[1] or data[2])] 564 565 op_diagnose = opcodes.OpOsDiagnose(output_fields=["name", 566 "variants", 567 "hidden"], 568 names=[]) 569 result = self.ExecOp(True, op_diagnose) 570 571 if not result: 572 Err("Can't get the OS list") 573 574 found = False 575 for (name, variants, _) in result: 576 if self.opts.os in cli.CalculateOSNames(name, variants): 577 found = True 578 break 579 580 if not found: 581 Err("OS '%s' not found" % self.opts.os) 582 583 cluster_info = self.cl.QueryClusterInfo() 584 self.cluster_info = cluster_info 585 if not self.cluster_info: 586 Err("Can't get cluster info") 587 588 default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT] 589 self.cluster_default_nicparams = default_nic_params 590 if self.hypervisor is None: 591 self.hypervisor = self.cluster_info["default_hypervisor"] 592 self.hv_can_migrate = \ 593 hypervisor.GetHypervisorClass(self.hypervisor).CAN_MIGRATE
594 595 @_DoCheckInstances 596 @_DoBatch(False)
597 - def BurnCreateInstances(self):
598 """Create the given instances. 599 600 """ 601 self.to_rem = [] 602 mytor = izip(cycle(self.nodes), 603 islice(cycle(self.nodes), 1, None), 604 self.instances) 605 606 Log("Creating instances") 607 for pnode, snode, instance in mytor: 608 Log("instance %s", instance, indent=1) 609 if self.opts.iallocator: 610 pnode = snode = None 611 msg = "with iallocator %s" % self.opts.iallocator 612 elif self.opts.disk_template not in constants.DTS_INT_MIRROR: 613 snode = None 614 msg = "on %s" % pnode 615 else: 616 msg = "on %s, %s" % (pnode, snode) 617 618 Log(msg, indent=2) 619 620 op = opcodes.OpInstanceCreate(instance_name=instance, 621 disks=[{"size": size} 622 for size in self.disk_size], 623 disk_template=self.opts.disk_template, 624 nics=self.opts.nics, 625 mode=constants.INSTANCE_CREATE, 626 os_type=self.opts.os, 627 pnode=pnode, 628 snode=snode, 629 start=True, 630 ip_check=self.opts.ip_check, 631 name_check=self.opts.name_check, 632 wait_for_sync=True, 633 file_driver="loop", 634 file_storage_dir=None, 635 iallocator=self.opts.iallocator, 636 beparams=self.bep, 637 hvparams=self.hvp, 638 hypervisor=self.hypervisor, 639 osparams=self.opts.osparams, 640 ) 641 remove_instance = lambda name: lambda: self.to_rem.append(name) 642 self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
643 644 @_DoBatch(False)
645 - def BurnModifyRuntimeMemory(self):
646 """Alter the runtime memory.""" 647 Log("Setting instance runtime memory") 648 for instance in self.instances: 649 Log("instance %s", instance, indent=1) 650 tgt_mem = self.bep[constants.BE_MINMEM] 651 op = opcodes.OpInstanceSetParams(instance_name=instance, 652 runtime_mem=tgt_mem) 653 Log("Set memory to %s MB", tgt_mem, indent=2) 654 self.ExecOrQueue(instance, [op])
655 656 @_DoBatch(False)
657 - def BurnGrowDisks(self):
658 """Grow both the os and the swap disks by the requested amount, if any.""" 659 Log("Growing disks") 660 for instance in self.instances: 661 Log("instance %s", instance, indent=1) 662 for idx, growth in enumerate(self.disk_growth): 663 if growth > 0: 664 op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx, 665 amount=growth, wait_for_sync=True) 666 Log("increase disk/%s by %s MB", idx, growth, indent=2) 667 self.ExecOrQueue(instance, [op])
668 669 @_DoBatch(True)
670 - def BurnReplaceDisks1D8(self):
671 """Replace disks on primary and secondary for drbd8.""" 672 Log("Replacing disks on the same nodes") 673 early_release = self.opts.early_release 674 for instance in self.instances: 675 Log("instance %s", instance, indent=1) 676 ops = [] 677 for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI: 678 op = opcodes.OpInstanceReplaceDisks(instance_name=instance, 679 mode=mode, 680 disks=list(range(self.disk_count)), 681 early_release=early_release) 682 Log("run %s", mode, indent=2) 683 ops.append(op) 684 self.ExecOrQueue(instance, ops)
685 686 @_DoBatch(True)
687 - def BurnReplaceDisks2(self):
688 """Replace secondary node.""" 689 Log("Changing the secondary node") 690 mode = constants.REPLACE_DISK_CHG 691 692 mytor = izip(islice(cycle(self.nodes), 2, None), 693 self.instances) 694 for tnode, instance in mytor: 695 Log("instance %s", instance, indent=1) 696 if self.opts.iallocator: 697 tnode = None 698 msg = "with iallocator %s" % self.opts.iallocator 699 else: 700 msg = tnode 701 op = opcodes.OpInstanceReplaceDisks(instance_name=instance, 702 mode=mode, 703 remote_node=tnode, 704 iallocator=self.opts.iallocator, 705 disks=[], 706 early_release=self.opts.early_release) 707 Log("run %s %s", mode, msg, indent=2) 708 self.ExecOrQueue(instance, [op])
709 710 @_DoCheckInstances 711 @_DoBatch(False)
712 - def BurnFailover(self):
713 """Failover the instances.""" 714 Log("Failing over instances") 715 for instance in self.instances: 716 Log("instance %s", instance, indent=1) 717 op = opcodes.OpInstanceFailover(instance_name=instance, 718 ignore_consistency=False) 719 self.ExecOrQueue(instance, [op])
720 721 @_DoCheckInstances 722 @_DoBatch(False)
723 - def BurnMove(self):
724 """Move the instances.""" 725 Log("Moving instances") 726 mytor = izip(islice(cycle(self.nodes), 1, None), 727 self.instances) 728 for tnode, instance in mytor: 729 Log("instance %s", instance, indent=1) 730 op = opcodes.OpInstanceMove(instance_name=instance, 731 target_node=tnode) 732 self.ExecOrQueue(instance, [op])
733 734 @_DoBatch(False)
735 - def BurnMigrate(self):
736 """Migrate the instances.""" 737 Log("Migrating instances") 738 for instance in self.instances: 739 Log("instance %s", instance, indent=1) 740 op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, 741 cleanup=False) 742 743 op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, 744 cleanup=True) 745 Log("migration and migration cleanup", indent=2) 746 self.ExecOrQueue(instance, [op1, op2])
747 748 @_DoCheckInstances 749 @_DoBatch(False)
750 - def BurnImportExport(self):
751 """Export the instance, delete it, and import it back. 752 753 """ 754 Log("Exporting and re-importing instances") 755 mytor = izip(cycle(self.nodes), 756 islice(cycle(self.nodes), 1, None), 757 islice(cycle(self.nodes), 2, None), 758 self.instances) 759 760 for pnode, snode, enode, instance in mytor: 761 Log("instance %s", instance, indent=1) 762 # read the full name of the instance 763 nam_op = opcodes.OpInstanceQuery(output_fields=["name"], 764 names=[instance], use_locking=True) 765 full_name = self.ExecOp(False, nam_op)[0][0] 766 767 if self.opts.iallocator: 768 pnode = snode = None 769 import_log_msg = ("import from %s" 770 " with iallocator %s" % 771 (enode, self.opts.iallocator)) 772 elif self.opts.disk_template not in constants.DTS_INT_MIRROR: 773 snode = None 774 import_log_msg = ("import from %s to %s" % 775 (enode, pnode)) 776 else: 777 import_log_msg = ("import from %s to %s, %s" % 778 (enode, pnode, snode)) 779 780 exp_op = opcodes.OpBackupExport(instance_name=instance, 781 target_node=enode, 782 mode=constants.EXPORT_MODE_LOCAL, 783 shutdown=True) 784 rem_op = opcodes.OpInstanceRemove(instance_name=instance, 785 ignore_failures=True) 786 imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name) 787 imp_op = opcodes.OpInstanceCreate(instance_name=instance, 788 disks=[{"size": size} 789 for size in self.disk_size], 790 disk_template=self.opts.disk_template, 791 nics=self.opts.nics, 792 mode=constants.INSTANCE_IMPORT, 793 src_node=enode, 794 src_path=imp_dir, 795 pnode=pnode, 796 snode=snode, 797 start=True, 798 ip_check=self.opts.ip_check, 799 name_check=self.opts.name_check, 800 wait_for_sync=True, 801 file_storage_dir=None, 802 file_driver="loop", 803 iallocator=self.opts.iallocator, 804 beparams=self.bep, 805 hvparams=self.hvp, 806 osparams=self.opts.osparams, 807 ) 808 809 erem_op = opcodes.OpBackupRemove(instance_name=instance) 810 811 Log("export to node %s", enode, indent=2) 812 Log("remove instance", indent=2) 813 Log(import_log_msg, indent=2) 814 Log("remove export", indent=2) 815 self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
816 817 @staticmethod
818 - def StopInstanceOp(instance):
819 """Stop given instance.""" 820 return opcodes.OpInstanceShutdown(instance_name=instance)
821 822 @staticmethod
823 - def StartInstanceOp(instance):
824 """Start given instance.""" 825 return opcodes.OpInstanceStartup(instance_name=instance, force=False)
826 827 @staticmethod
828 - def RenameInstanceOp(instance, instance_new):
829 """Rename instance.""" 830 return opcodes.OpInstanceRename(instance_name=instance, 831 new_name=instance_new)
832 833 @_DoCheckInstances 834 @_DoBatch(True)
835 - def BurnStopStart(self):
836 """Stop/start the instances.""" 837 Log("Stopping and starting instances") 838 for instance in self.instances: 839 Log("instance %s", instance, indent=1) 840 op1 = self.StopInstanceOp(instance) 841 op2 = self.StartInstanceOp(instance) 842 self.ExecOrQueue(instance, [op1, op2])
843 844 @_DoBatch(False)
845 - def BurnRemove(self):
846 """Remove the instances.""" 847 Log("Removing instances") 848 for instance in self.to_rem: 849 Log("instance %s", instance, indent=1) 850 op = opcodes.OpInstanceRemove(instance_name=instance, 851 ignore_failures=True) 852 self.ExecOrQueue(instance, [op])
853
854 - def BurnRename(self):
855 """Rename the instances. 856 857 Note that this function will not execute in parallel, since we 858 only have one target for rename. 859 860 """ 861 Log("Renaming instances") 862 rename = self.opts.rename 863 for instance in self.instances: 864 Log("instance %s", instance, indent=1) 865 op_stop1 = self.StopInstanceOp(instance) 866 op_stop2 = self.StopInstanceOp(rename) 867 op_rename1 = self.RenameInstanceOp(instance, rename) 868 op_rename2 = self.RenameInstanceOp(rename, instance) 869 op_start1 = self.StartInstanceOp(rename) 870 op_start2 = self.StartInstanceOp(instance) 871 self.ExecOp(False, op_stop1, op_rename1, op_start1) 872 self._CheckInstanceAlive(rename) 873 self.ExecOp(False, op_stop2, op_rename2, op_start2) 874 self._CheckInstanceAlive(instance)
875 876 @_DoCheckInstances 877 @_DoBatch(True)
878 - def BurnReinstall(self):
879 """Reinstall the instances.""" 880 Log("Reinstalling instances") 881 for instance in self.instances: 882 Log("instance %s", instance, indent=1) 883 op1 = self.StopInstanceOp(instance) 884 op2 = opcodes.OpInstanceReinstall(instance_name=instance) 885 Log("reinstall without passing the OS", indent=2) 886 op3 = opcodes.OpInstanceReinstall(instance_name=instance, 887 os_type=self.opts.os) 888 Log("reinstall specifying the OS", indent=2) 889 op4 = self.StartInstanceOp(instance) 890 self.ExecOrQueue(instance, [op1, op2, op3, op4])
891 892 @_DoCheckInstances 893 @_DoBatch(True)
894 - def BurnReboot(self):
895 """Reboot the instances.""" 896 Log("Rebooting instances") 897 for instance in self.instances: 898 Log("instance %s", instance, indent=1) 899 ops = [] 900 for reboot_type in self.opts.reboot_types: 901 op = opcodes.OpInstanceReboot(instance_name=instance, 902 reboot_type=reboot_type, 903 ignore_secondaries=False) 904 Log("reboot with type '%s'", reboot_type, indent=2) 905 ops.append(op) 906 self.ExecOrQueue(instance, ops)
907 908 @_DoCheckInstances 909 @_DoBatch(True)
910 - def BurnRenameSame(self):
911 """Rename the instances to their own name.""" 912 Log("Renaming the instances to their own name") 913 for instance in self.instances: 914 Log("instance %s", instance, indent=1) 915 op1 = self.StopInstanceOp(instance) 916 op2 = self.RenameInstanceOp(instance, instance) 917 Log("rename to the same name", indent=2) 918 op4 = self.StartInstanceOp(instance) 919 self.ExecOrQueue(instance, [op1, op2, op4])
920 921 @_DoCheckInstances 922 @_DoBatch(True)
923 - def BurnActivateDisks(self):
924 """Activate and deactivate disks of the instances.""" 925 Log("Activating/deactivating disks") 926 for instance in self.instances: 927 Log("instance %s", instance, indent=1) 928 op_start = self.StartInstanceOp(instance) 929 op_act = opcodes.OpInstanceActivateDisks(instance_name=instance) 930 op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance) 931 op_stop = self.StopInstanceOp(instance) 932 Log("activate disks when online", indent=2) 933 Log("activate disks when offline", indent=2) 934 Log("deactivate disks (when offline)", indent=2) 935 self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
936 937 @_DoCheckInstances 938 @_DoBatch(False)
939 - def BurnAddRemoveDisks(self):
940 """Add and remove an extra disk for the instances.""" 941 Log("Adding and removing disks") 942 for instance in self.instances: 943 Log("instance %s", instance, indent=1) 944 op_add = opcodes.OpInstanceSetParams( 945 instance_name=instance, 946 disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})]) 947 op_rem = opcodes.OpInstanceSetParams( 948 instance_name=instance, disks=[(constants.DDM_REMOVE, {})]) 949 op_stop = self.StopInstanceOp(instance) 950 op_start = self.StartInstanceOp(instance) 951 Log("adding a disk", indent=2) 952 Log("removing last disk", indent=2) 953 self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
954 955 @_DoBatch(False)
956 - def BurnAddRemoveNICs(self):
957 """Add, change and remove an extra NIC for the instances.""" 958 Log("Adding and removing NICs") 959 for instance in self.instances: 960 Log("instance %s", instance, indent=1) 961 op_add = opcodes.OpInstanceSetParams( 962 instance_name=instance, nics=[(constants.DDM_ADD, {})]) 963 op_chg = opcodes.OpInstanceSetParams( 964 instance_name=instance, nics=[(constants.DDM_MODIFY, 965 -1, {"mac": constants.VALUE_GENERATE})]) 966 op_rem = opcodes.OpInstanceSetParams( 967 instance_name=instance, nics=[(constants.DDM_REMOVE, {})]) 968 Log("adding a NIC", indent=2) 969 Log("changing a NIC", indent=2) 970 Log("removing last NIC", indent=2) 971 self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
972
973 - def ConfdCallback(self, reply):
974 """Callback for confd queries""" 975 if reply.type == confd_client.UPCALL_REPLY: 976 if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK: 977 Err("Query %s gave non-ok status %s: %s" % (reply.orig_request, 978 reply.server_reply.status, 979 reply.server_reply)) 980 if reply.orig_request.type == constants.CONFD_REQ_PING: 981 Log("Ping: OK", indent=1) 982 elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER: 983 if reply.server_reply.answer == self.cluster_info["master"]: 984 Log("Master: OK", indent=1) 985 else: 986 Err("Master: wrong: %s" % reply.server_reply.answer) 987 elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME: 988 if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER: 989 Log("Node role for master: OK", indent=1) 990 else: 991 Err("Node role for master: wrong: %s" % reply.server_reply.answer)
992
993 - def DoConfdRequestReply(self, req):
994 self.confd_counting_callback.RegisterQuery(req.rsalt) 995 self.confd_client.SendRequest(req, async=False) 996 while not self.confd_counting_callback.AllAnswered(): 997 if not self.confd_client.ReceiveReply(): 998 Err("Did not receive all expected confd replies") 999 break
1000
1001 - def BurnConfd(self):
1002 """Run confd queries for our instances. 1003 1004 The following confd queries are tested: 1005 - CONFD_REQ_PING: simple ping 1006 - CONFD_REQ_CLUSTER_MASTER: cluster master 1007 - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master 1008 1009 """ 1010 Log("Checking confd results") 1011 1012 filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback) 1013 counting_callback = confd_client.ConfdCountingCallback(filter_callback) 1014 self.confd_counting_callback = counting_callback 1015 1016 self.confd_client = confd_client.GetConfdClient(counting_callback) 1017 1018 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) 1019 self.DoConfdRequestReply(req) 1020 1021 req = confd_client.ConfdClientRequest( 1022 type=constants.CONFD_REQ_CLUSTER_MASTER) 1023 self.DoConfdRequestReply(req) 1024 1025 req = confd_client.ConfdClientRequest( 1026 type=constants.CONFD_REQ_NODE_ROLE_BYNAME, 1027 query=self.cluster_info["master"]) 1028 self.DoConfdRequestReply(req)
1029
1030 - def _CheckInstanceAlive(self, instance):
1031 """Check if an instance is alive by doing http checks. 1032 1033 This will try to retrieve the url on the instance /hostname.txt 1034 and check that it contains the hostname of the instance. In case 1035 we get ECONNREFUSED, we retry up to the net timeout seconds, for 1036 any other error we abort. 1037 1038 """ 1039 if not self.opts.http_check: 1040 return 1041 end_time = time.time() + self.opts.net_timeout 1042 url = None 1043 while time.time() < end_time and url is None: 1044 try: 1045 url = self.url_opener.open("http://%s/hostname.txt" % instance) 1046 except IOError: 1047 # here we can have connection refused, no route to host, etc. 1048 time.sleep(1) 1049 if url is None: 1050 raise InstanceDown(instance, "Cannot contact instance") 1051 hostname = url.read().strip() 1052 url.close() 1053 if hostname != instance: 1054 raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" % 1055 (instance, hostname)))
1056
1057 - def BurninCluster(self):
1058 """Test a cluster intensively. 1059 1060 This will create instances and then start/stop/failover them. 1061 It is safe for existing instances but could impact performance. 1062 1063 """ 1064 1065 Log("Testing global parameters") 1066 1067 if (len(self.nodes) == 1 and 1068 self.opts.disk_template not in _SINGLE_NODE_DISK_TEMPLATES): 1069 Err("When one node is available/selected the disk template must" 1070 " be one of %s" % utils.CommaJoin(_SINGLE_NODE_DISK_TEMPLATES)) 1071 1072 if self.opts.do_confd_tests and not constants.ENABLE_CONFD: 1073 Err("You selected confd tests but confd was disabled at configure time") 1074 1075 has_err = True 1076 try: 1077 self.BurnCreateInstances() 1078 1079 if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]: 1080 self.BurnModifyRuntimeMemory() 1081 1082 if self.opts.do_replace1 and \ 1083 self.opts.disk_template in constants.DTS_INT_MIRROR: 1084 self.BurnReplaceDisks1D8() 1085 if (self.opts.do_replace2 and len(self.nodes) > 2 and 1086 self.opts.disk_template in constants.DTS_INT_MIRROR): 1087 self.BurnReplaceDisks2() 1088 1089 if (self.opts.disk_template in constants.DTS_GROWABLE and 1090 compat.any(n > 0 for n in self.disk_growth)): 1091 self.BurnGrowDisks() 1092 1093 if self.opts.do_failover and \ 1094 self.opts.disk_template in constants.DTS_MIRRORED: 1095 self.BurnFailover() 1096 1097 if self.opts.do_migrate: 1098 if self.opts.disk_template not in constants.DTS_MIRRORED: 1099 Log("Skipping migration (disk template %s does not support it)", 1100 self.opts.disk_template) 1101 elif not self.hv_can_migrate: 1102 Log("Skipping migration (hypervisor %s does not support it)", 1103 self.hypervisor) 1104 else: 1105 self.BurnMigrate() 1106 1107 if (self.opts.do_move and len(self.nodes) > 1 and 1108 self.opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]): 1109 self.BurnMove() 1110 1111 if (self.opts.do_importexport and 1112 self.opts.disk_template in _IMPEXP_DISK_TEMPLATES): 1113 self.BurnImportExport() 1114 1115 if self.opts.do_reinstall: 1116 self.BurnReinstall() 1117 1118 if self.opts.do_reboot: 1119 self.BurnReboot() 1120 1121 if self.opts.do_renamesame: 1122 self.BurnRenameSame() 1123 1124 if self.opts.do_addremove_disks: 1125 self.BurnAddRemoveDisks() 1126 1127 default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE] 1128 # Don't add/remove nics in routed mode, as we would need an ip to add 1129 # them with 1130 if self.opts.do_addremove_nics: 1131 if default_nic_mode == constants.NIC_MODE_BRIDGED: 1132 self.BurnAddRemoveNICs() 1133 else: 1134 Log("Skipping nic add/remove as the cluster is not in bridged mode") 1135 1136 if self.opts.do_activate_disks: 1137 self.BurnActivateDisks() 1138 1139 if self.opts.rename: 1140 self.BurnRename() 1141 1142 if self.opts.do_confd_tests: 1143 self.BurnConfd() 1144 1145 if self.opts.do_startstop: 1146 self.BurnStopStart() 1147 1148 has_err = False 1149 finally: 1150 if has_err: 1151 Log("Error detected: opcode buffer follows:\n\n") 1152 Log(self.GetFeedbackBuf()) 1153 Log("\n\n") 1154 if not self.opts.keep_instances: 1155 try: 1156 self.BurnRemove() 1157 except Exception, err: # pylint: disable=W0703 1158 if has_err: # already detected errors, so errors in removal 1159 # are quite expected 1160 Log("Note: error detected during instance remove: %s", err) 1161 else: # non-expected error 1162 raise 1163 1164 return constants.EXIT_SUCCESS
1165
1166 1167 -def Main():
1168 """Main function. 1169 1170 """ 1171 utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0], 1172 debug=False, stderr_logging=True) 1173 1174 return Burner().BurninCluster()
1175