Package ganeti :: Package tools :: Module burnin
[hide private]
[frames] | no frames]

Source Code for Module ganeti.tools.burnin

   1  #!/usr/bin/python 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Burnin program 
  23   
  24  """ 
  25   
  26  import sys 
  27  import optparse 
  28  import time 
  29  import socket 
  30  import urllib 
  31  from itertools import izip, islice, cycle 
  32  from cStringIO import StringIO 
  33   
  34  from ganeti import opcodes 
  35  from ganeti import constants 
  36  from ganeti import cli 
  37  from ganeti import errors 
  38  from ganeti import utils 
  39  from ganeti import hypervisor 
  40  from ganeti import compat 
  41  from ganeti import pathutils 
  42   
  43  from ganeti.confd import client as confd_client 
  44   
  45   
  46  USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") 
  47   
  48  MAX_RETRIES = 3 
  49  LOG_HEADERS = { 
  50    0: "- ", 
  51    1: "* ", 
  52    2: "", 
  53    } 
  54   
  55  #: Disk templates supporting a single node 
  56  _SINGLE_NODE_DISK_TEMPLATES = compat.UniqueFrozenset([ 
  57    constants.DT_DISKLESS, 
  58    constants.DT_PLAIN, 
  59    constants.DT_FILE, 
  60    constants.DT_SHARED_FILE, 
  61    constants.DT_EXT, 
  62    constants.DT_RBD, 
  63    ]) 
  64   
  65  _SUPPORTED_DISK_TEMPLATES = compat.UniqueFrozenset([ 
  66    constants.DT_DISKLESS, 
  67    constants.DT_DRBD8, 
  68    constants.DT_EXT, 
  69    constants.DT_FILE, 
  70    constants.DT_PLAIN, 
  71    constants.DT_RBD, 
  72    constants.DT_SHARED_FILE, 
  73    ]) 
  74   
  75  #: Disk templates for which import/export is tested 
  76  _IMPEXP_DISK_TEMPLATES = (_SUPPORTED_DISK_TEMPLATES - frozenset([ 
  77    constants.DT_DISKLESS, 
  78    constants.DT_FILE, 
  79    constants.DT_SHARED_FILE, 
  80    ])) 
81 82 83 -class InstanceDown(Exception):
84 """The checked instance was not up"""
85
86 87 -class BurninFailure(Exception):
88 """Failure detected during burning"""
89
90 91 -def Usage():
92 """Shows program usage information and exits the program.""" 93 94 print >> sys.stderr, "Usage:" 95 print >> sys.stderr, USAGE 96 sys.exit(2)
97
98 99 -def Log(msg, *args, **kwargs):
100 """Simple function that prints out its argument. 101 102 """ 103 if args: 104 msg = msg % args 105 indent = kwargs.get("indent", 0) 106 sys.stdout.write("%*s%s%s\n" % (2 * indent, "", 107 LOG_HEADERS.get(indent, " "), msg)) 108 sys.stdout.flush()
109
110 111 -def Err(msg, exit_code=1):
112 """Simple error logging that prints to stderr. 113 114 """ 115 sys.stderr.write(msg + "\n") 116 sys.stderr.flush() 117 sys.exit(exit_code)
118
119 120 -class SimpleOpener(urllib.FancyURLopener):
121 """A simple url opener""" 122 # pylint: disable=W0221 123
124 - def prompt_user_passwd(self, host, realm, clear_cache=0):
125 """No-interaction version of prompt_user_passwd.""" 126 # we follow parent class' API 127 # pylint: disable=W0613 128 return None, None
129
130 - def http_error_default(self, url, fp, errcode, errmsg, headers):
131 """Custom error handling""" 132 # make sure sockets are not left in CLOSE_WAIT, this is similar 133 # but with a different exception to the BasicURLOpener class 134 _ = fp.read() # throw away data 135 fp.close() 136 raise InstanceDown("HTTP error returned: code %s, msg %s" % 137 (errcode, errmsg))
138 139 140 OPTIONS = [ 141 cli.cli_option("-o", "--os", dest="os", default=None, 142 help="OS to use during burnin", 143 metavar="<OS>", 144 completion_suggest=cli.OPT_COMPL_ONE_OS), 145 cli.HYPERVISOR_OPT, 146 cli.OSPARAMS_OPT, 147 cli.cli_option("--disk-size", dest="disk_size", 148 help="Disk size (determines disk count)", 149 default="128m", type="string", metavar="<size,size,...>", 150 completion_suggest=("128M 512M 1G 4G 1G,256M" 151 " 4G,1G,1G 10G").split()), 152 cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth", 153 default="128m", type="string", metavar="<size,size,...>"), 154 cli.cli_option("--mem-size", dest="mem_size", help="Memory size", 155 default=None, type="unit", metavar="<size>", 156 completion_suggest=("128M 256M 512M 1G 4G 8G" 157 " 12G 16G").split()), 158 cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size", 159 default=256, type="unit", metavar="<size>", 160 completion_suggest=("128M 256M 512M 1G 4G 8G" 161 " 12G 16G").split()), 162 cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size", 163 default=128, type="unit", metavar="<size>", 164 completion_suggest=("128M 256M 512M 1G 4G 8G" 165 " 12G 16G").split()), 166 cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count", 167 default=3, type="unit", metavar="<count>", 168 completion_suggest=("1 2 3 4").split()), 169 cli.DEBUG_OPT, 170 cli.VERBOSE_OPT, 171 cli.NOIPCHECK_OPT, 172 cli.NONAMECHECK_OPT, 173 cli.EARLY_RELEASE_OPT, 174 cli.cli_option("--no-replace1", dest="do_replace1", 175 help="Skip disk replacement with the same secondary", 176 action="store_false", default=True), 177 cli.cli_option("--no-replace2", dest="do_replace2", 178 help="Skip disk replacement with a different secondary", 179 action="store_false", default=True), 180 cli.cli_option("--no-failover", dest="do_failover", 181 help="Skip instance failovers", action="store_false", 182 default=True), 183 cli.cli_option("--no-migrate", dest="do_migrate", 184 help="Skip instance live migration", 185 action="store_false", default=True), 186 cli.cli_option("--no-move", dest="do_move", 187 help="Skip instance moves", action="store_false", 188 default=True), 189 cli.cli_option("--no-importexport", dest="do_importexport", 190 help="Skip instance export/import", action="store_false", 191 default=True), 192 cli.cli_option("--no-startstop", dest="do_startstop", 193 help="Skip instance stop/start", action="store_false", 194 default=True), 195 cli.cli_option("--no-reinstall", dest="do_reinstall", 196 help="Skip instance reinstall", action="store_false", 197 default=True), 198 cli.cli_option("--no-reboot", dest="do_reboot", 199 help="Skip instance reboot", action="store_false", 200 default=True), 201 cli.cli_option("--no-renamesame", dest="do_renamesame", 202 help="Skip instance rename to same name", action="store_false", 203 default=True), 204 cli.cli_option("--reboot-types", dest="reboot_types", 205 help="Specify the reboot types", default=None), 206 cli.cli_option("--no-activate-disks", dest="do_activate_disks", 207 help="Skip disk activation/deactivation", 208 action="store_false", default=True), 209 cli.cli_option("--no-add-disks", dest="do_addremove_disks", 210 help="Skip disk addition/removal", 211 action="store_false", default=True), 212 cli.cli_option("--no-add-nics", dest="do_addremove_nics", 213 help="Skip NIC addition/removal", 214 action="store_false", default=True), 215 cli.cli_option("--no-nics", dest="nics", 216 help="No network interfaces", action="store_const", 217 const=[], default=[{}]), 218 cli.cli_option("--no-confd", dest="do_confd_tests", 219 help="Skip confd queries", 220 action="store_false", default=constants.ENABLE_CONFD), 221 cli.cli_option("--rename", dest="rename", default=None, 222 help=("Give one unused instance name which is taken" 223 " to start the renaming sequence"), 224 metavar="<instance_name>"), 225 cli.cli_option("-t", "--disk-template", dest="disk_template", 226 choices=list(_SUPPORTED_DISK_TEMPLATES), 227 default=constants.DT_DRBD8, 228 help=("Disk template (default %s, otherwise one of %s)" % 229 (constants.DT_DRBD8, 230 utils.CommaJoin(_SUPPORTED_DISK_TEMPLATES)))), 231 cli.cli_option("-n", "--nodes", dest="nodes", default="", 232 help=("Comma separated list of nodes to perform" 233 " the burnin on (defaults to all nodes)"), 234 completion_suggest=cli.OPT_COMPL_MANY_NODES), 235 cli.cli_option("-I", "--iallocator", dest="iallocator", 236 default=None, type="string", 237 help=("Perform the allocation using an iallocator" 238 " instead of fixed node spread (node restrictions no" 239 " longer apply, therefore -n/--nodes must not be" 240 " used"), 241 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR), 242 cli.cli_option("-p", "--parallel", default=False, action="store_true", 243 dest="parallel", 244 help=("Enable parallelization of some operations in" 245 " order to speed burnin or to test granular locking")), 246 cli.cli_option("--net-timeout", default=15, type="int", 247 dest="net_timeout", 248 help=("The instance check network timeout in seconds" 249 " (defaults to 15 seconds)"), 250 completion_suggest="15 60 300 900".split()), 251 cli.cli_option("-C", "--http-check", default=False, action="store_true", 252 dest="http_check", 253 help=("Enable checking of instance status via http," 254 " looking for /hostname.txt that should contain the" 255 " name of the instance")), 256 cli.cli_option("-K", "--keep-instances", default=False, 257 action="store_true", 258 dest="keep_instances", 259 help=("Leave instances on the cluster after burnin," 260 " for investigation in case of errors or simply" 261 " to use them")), 262 ] 263 264 # Mainly used for bash completion 265 ARGUMENTS = [cli.ArgInstance(min=1)]
266 267 268 -def _DoCheckInstances(fn):
269 """Decorator for checking instances. 270 271 """ 272 def wrapper(self, *args, **kwargs): 273 val = fn(self, *args, **kwargs) 274 for instance in self.instances: 275 self._CheckInstanceAlive(instance) # pylint: disable=W0212 276 return val
277 278 return wrapper 279
280 281 -def _DoBatch(retry):
282 """Decorator for possible batch operations. 283 284 Must come after the _DoCheckInstances decorator (if any). 285 286 @param retry: whether this is a retryable batch, will be 287 passed to StartBatch 288 289 """ 290 def wrap(fn): 291 def batched(self, *args, **kwargs): 292 self.StartBatch(retry) 293 val = fn(self, *args, **kwargs) 294 self.CommitQueue() 295 return val
296 return batched 297 298 return wrap 299
300 301 -class Burner(object):
302 """Burner class.""" 303
304 - def __init__(self):
305 """Constructor.""" 306 self.url_opener = SimpleOpener() 307 self._feed_buf = StringIO() 308 self.nodes = [] 309 self.instances = [] 310 self.to_rem = [] 311 self.queued_ops = [] 312 self.opts = None 313 self.queue_retry = False 314 self.disk_count = self.disk_growth = self.disk_size = None 315 self.hvp = self.bep = None 316 self.ParseOptions() 317 self.cl = cli.GetClient() 318 self.GetState()
319
320 - def ClearFeedbackBuf(self):
321 """Clear the feedback buffer.""" 322 self._feed_buf.truncate(0)
323
324 - def GetFeedbackBuf(self):
325 """Return the contents of the buffer.""" 326 return self._feed_buf.getvalue()
327
328 - def Feedback(self, msg):
329 """Acumulate feedback in our buffer.""" 330 formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2]) 331 self._feed_buf.write(formatted_msg + "\n") 332 if self.opts.verbose: 333 Log(formatted_msg, indent=3)
334
335 - def MaybeRetry(self, retry_count, msg, fn, *args):
336 """Possibly retry a given function execution. 337 338 @type retry_count: int 339 @param retry_count: retry counter: 340 - 0: non-retryable action 341 - 1: last retry for a retryable action 342 - MAX_RETRIES: original try for a retryable action 343 @type msg: str 344 @param msg: the kind of the operation 345 @type fn: callable 346 @param fn: the function to be called 347 348 """ 349 try: 350 val = fn(*args) 351 if retry_count > 0 and retry_count < MAX_RETRIES: 352 Log("Idempotent %s succeeded after %d retries", 353 msg, MAX_RETRIES - retry_count) 354 return val 355 except Exception, err: # pylint: disable=W0703 356 if retry_count == 0: 357 Log("Non-idempotent %s failed, aborting", msg) 358 raise 359 elif retry_count == 1: 360 Log("Idempotent %s repeated failure, aborting", msg) 361 raise 362 else: 363 Log("Idempotent %s failed, retry #%d/%d: %s", 364 msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err) 365 self.MaybeRetry(retry_count - 1, msg, fn, *args)
366
367 - def _ExecOp(self, *ops):
368 """Execute one or more opcodes and manage the exec buffer. 369 370 @return: if only opcode has been passed, we return its result; 371 otherwise we return the list of results 372 373 """ 374 job_id = cli.SendJob(ops, cl=self.cl) 375 results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback) 376 if len(ops) == 1: 377 return results[0] 378 else: 379 return results
380
381 - def ExecOp(self, retry, *ops):
382 """Execute one or more opcodes and manage the exec buffer. 383 384 @return: if only opcode has been passed, we return its result; 385 otherwise we return the list of results 386 387 """ 388 if retry: 389 rval = MAX_RETRIES 390 else: 391 rval = 0 392 cli.SetGenericOpcodeOpts(ops, self.opts) 393 return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
394
395 - def ExecOrQueue(self, name, ops, post_process=None):
396 """Execute an opcode and manage the exec buffer.""" 397 if self.opts.parallel: 398 cli.SetGenericOpcodeOpts(ops, self.opts) 399 self.queued_ops.append((ops, name, post_process)) 400 else: 401 val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142 402 if post_process is not None: 403 post_process() 404 return val
405
406 - def StartBatch(self, retry):
407 """Start a new batch of jobs. 408 409 @param retry: whether this is a retryable batch 410 411 """ 412 self.queued_ops = [] 413 self.queue_retry = retry
414
415 - def CommitQueue(self):
416 """Execute all submitted opcodes in case of parallel burnin""" 417 if not self.opts.parallel or not self.queued_ops: 418 return 419 420 if self.queue_retry: 421 rval = MAX_RETRIES 422 else: 423 rval = 0 424 425 try: 426 results = self.MaybeRetry(rval, "jobset", self.ExecJobSet, 427 self.queued_ops) 428 finally: 429 self.queued_ops = [] 430 return results
431
432 - def ExecJobSet(self, jobs):
433 """Execute a set of jobs and return once all are done. 434 435 The method will return the list of results, if all jobs are 436 successful. Otherwise, OpExecError will be raised from within 437 cli.py. 438 439 """ 440 self.ClearFeedbackBuf() 441 jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback) 442 for ops, name, _ in jobs: 443 jex.QueueJob(name, *ops) # pylint: disable=W0142 444 try: 445 results = jex.GetResults() 446 except Exception, err: # pylint: disable=W0703 447 Log("Jobs failed: %s", err) 448 raise BurninFailure() 449 450 fail = False 451 val = [] 452 for (_, name, post_process), (success, result) in zip(jobs, results): 453 if success: 454 if post_process: 455 try: 456 post_process() 457 except Exception, err: # pylint: disable=W0703 458 Log("Post process call for job %s failed: %s", name, err) 459 fail = True 460 val.append(result) 461 else: 462 fail = True 463 464 if fail: 465 raise BurninFailure() 466 467 return val
468
469 - def ParseOptions(self):
470 """Parses the command line options. 471 472 In case of command line errors, it will show the usage and exit the 473 program. 474 475 """ 476 parser = optparse.OptionParser(usage="\n%s" % USAGE, 477 version=("%%prog (ganeti) %s" % 478 constants.RELEASE_VERSION), 479 option_list=OPTIONS) 480 481 options, args = parser.parse_args() 482 if len(args) < 1 or options.os is None: 483 Usage() 484 485 if options.mem_size: 486 options.maxmem_size = options.mem_size 487 options.minmem_size = options.mem_size 488 elif options.minmem_size > options.maxmem_size: 489 Err("Maximum memory lower than minimum memory") 490 491 if options.disk_template not in _SUPPORTED_DISK_TEMPLATES: 492 Err("Unknown or unsupported disk template '%s'" % options.disk_template) 493 494 if options.disk_template == constants.DT_DISKLESS: 495 disk_size = disk_growth = [] 496 options.do_addremove_disks = False 497 else: 498 disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")] 499 disk_growth = [utils.ParseUnit(v) 500 for v in options.disk_growth.split(",")] 501 if len(disk_growth) != len(disk_size): 502 Err("Wrong disk sizes/growth combination") 503 if ((disk_size and options.disk_template == constants.DT_DISKLESS) or 504 (not disk_size and options.disk_template != constants.DT_DISKLESS)): 505 Err("Wrong disk count/disk template combination") 506 507 self.disk_size = disk_size 508 self.disk_growth = disk_growth 509 self.disk_count = len(disk_size) 510 511 if options.nodes and options.iallocator: 512 Err("Give either the nodes option or the iallocator option, not both") 513 514 if options.http_check and not options.name_check: 515 Err("Can't enable HTTP checks without name checks") 516 517 self.opts = options 518 self.instances = args 519 self.bep = { 520 constants.BE_MINMEM: options.minmem_size, 521 constants.BE_MAXMEM: options.maxmem_size, 522 constants.BE_VCPUS: options.vcpu_count, 523 } 524 525 self.hypervisor = None 526 self.hvp = {} 527 if options.hypervisor: 528 self.hypervisor, self.hvp = options.hypervisor 529 530 if options.reboot_types is None: 531 options.reboot_types = constants.REBOOT_TYPES 532 else: 533 options.reboot_types = options.reboot_types.split(",") 534 rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES) 535 if rt_diff: 536 Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff)) 537 538 socket.setdefaulttimeout(options.net_timeout)
539
540 - def GetState(self):
541 """Read the cluster state from the master daemon.""" 542 if self.opts.nodes: 543 names = self.opts.nodes.split(",") 544 else: 545 names = [] 546 try: 547 op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"], 548 names=names, use_locking=True) 549 result = self.ExecOp(True, op) 550 except errors.GenericError, err: 551 err_code, msg = cli.FormatError(err) 552 Err(msg, exit_code=err_code) 553 self.nodes = [data[0] for data in result if not (data[1] or data[2])] 554 555 op_diagnose = opcodes.OpOsDiagnose(output_fields=["name", 556 "variants", 557 "hidden"], 558 names=[]) 559 result = self.ExecOp(True, op_diagnose) 560 561 if not result: 562 Err("Can't get the OS list") 563 564 found = False 565 for (name, variants, _) in result: 566 if self.opts.os in cli.CalculateOSNames(name, variants): 567 found = True 568 break 569 570 if not found: 571 Err("OS '%s' not found" % self.opts.os) 572 573 cluster_info = self.cl.QueryClusterInfo() 574 self.cluster_info = cluster_info 575 if not self.cluster_info: 576 Err("Can't get cluster info") 577 578 default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT] 579 self.cluster_default_nicparams = default_nic_params 580 if self.hypervisor is None: 581 self.hypervisor = self.cluster_info["default_hypervisor"] 582 self.hv_can_migrate = \ 583 hypervisor.GetHypervisorClass(self.hypervisor).CAN_MIGRATE
584 585 @_DoCheckInstances 586 @_DoBatch(False)
587 - def BurnCreateInstances(self):
588 """Create the given instances. 589 590 """ 591 self.to_rem = [] 592 mytor = izip(cycle(self.nodes), 593 islice(cycle(self.nodes), 1, None), 594 self.instances) 595 596 Log("Creating instances") 597 for pnode, snode, instance in mytor: 598 Log("instance %s", instance, indent=1) 599 if self.opts.iallocator: 600 pnode = snode = None 601 msg = "with iallocator %s" % self.opts.iallocator 602 elif self.opts.disk_template not in constants.DTS_INT_MIRROR: 603 snode = None 604 msg = "on %s" % pnode 605 else: 606 msg = "on %s, %s" % (pnode, snode) 607 608 Log(msg, indent=2) 609 610 op = opcodes.OpInstanceCreate(instance_name=instance, 611 disks=[{"size": size} 612 for size in self.disk_size], 613 disk_template=self.opts.disk_template, 614 nics=self.opts.nics, 615 mode=constants.INSTANCE_CREATE, 616 os_type=self.opts.os, 617 pnode=pnode, 618 snode=snode, 619 start=True, 620 ip_check=self.opts.ip_check, 621 name_check=self.opts.name_check, 622 wait_for_sync=True, 623 file_driver="loop", 624 file_storage_dir=None, 625 iallocator=self.opts.iallocator, 626 beparams=self.bep, 627 hvparams=self.hvp, 628 hypervisor=self.hypervisor, 629 osparams=self.opts.osparams, 630 ) 631 remove_instance = lambda name: lambda: self.to_rem.append(name) 632 self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
633 634 @_DoBatch(False)
635 - def BurnModifyRuntimeMemory(self):
636 """Alter the runtime memory.""" 637 Log("Setting instance runtime memory") 638 for instance in self.instances: 639 Log("instance %s", instance, indent=1) 640 tgt_mem = self.bep[constants.BE_MINMEM] 641 op = opcodes.OpInstanceSetParams(instance_name=instance, 642 runtime_mem=tgt_mem) 643 Log("Set memory to %s MB", tgt_mem, indent=2) 644 self.ExecOrQueue(instance, [op])
645 646 @_DoBatch(False)
647 - def BurnGrowDisks(self):
648 """Grow both the os and the swap disks by the requested amount, if any.""" 649 Log("Growing disks") 650 for instance in self.instances: 651 Log("instance %s", instance, indent=1) 652 for idx, growth in enumerate(self.disk_growth): 653 if growth > 0: 654 op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx, 655 amount=growth, wait_for_sync=True) 656 Log("increase disk/%s by %s MB", idx, growth, indent=2) 657 self.ExecOrQueue(instance, [op])
658 659 @_DoBatch(True)
660 - def BurnReplaceDisks1D8(self):
661 """Replace disks on primary and secondary for drbd8.""" 662 Log("Replacing disks on the same nodes") 663 early_release = self.opts.early_release 664 for instance in self.instances: 665 Log("instance %s", instance, indent=1) 666 ops = [] 667 for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI: 668 op = opcodes.OpInstanceReplaceDisks(instance_name=instance, 669 mode=mode, 670 disks=list(range(self.disk_count)), 671 early_release=early_release) 672 Log("run %s", mode, indent=2) 673 ops.append(op) 674 self.ExecOrQueue(instance, ops)
675 676 @_DoBatch(True)
677 - def BurnReplaceDisks2(self):
678 """Replace secondary node.""" 679 Log("Changing the secondary node") 680 mode = constants.REPLACE_DISK_CHG 681 682 mytor = izip(islice(cycle(self.nodes), 2, None), 683 self.instances) 684 for tnode, instance in mytor: 685 Log("instance %s", instance, indent=1) 686 if self.opts.iallocator: 687 tnode = None 688 msg = "with iallocator %s" % self.opts.iallocator 689 else: 690 msg = tnode 691 op = opcodes.OpInstanceReplaceDisks(instance_name=instance, 692 mode=mode, 693 remote_node=tnode, 694 iallocator=self.opts.iallocator, 695 disks=[], 696 early_release=self.opts.early_release) 697 Log("run %s %s", mode, msg, indent=2) 698 self.ExecOrQueue(instance, [op])
699 700 @_DoCheckInstances 701 @_DoBatch(False)
702 - def BurnFailover(self):
703 """Failover the instances.""" 704 Log("Failing over instances") 705 for instance in self.instances: 706 Log("instance %s", instance, indent=1) 707 op = opcodes.OpInstanceFailover(instance_name=instance, 708 ignore_consistency=False) 709 self.ExecOrQueue(instance, [op])
710 711 @_DoCheckInstances 712 @_DoBatch(False)
713 - def BurnMove(self):
714 """Move the instances.""" 715 Log("Moving instances") 716 mytor = izip(islice(cycle(self.nodes), 1, None), 717 self.instances) 718 for tnode, instance in mytor: 719 Log("instance %s", instance, indent=1) 720 op = opcodes.OpInstanceMove(instance_name=instance, 721 target_node=tnode) 722 self.ExecOrQueue(instance, [op])
723 724 @_DoBatch(False)
725 - def BurnMigrate(self):
726 """Migrate the instances.""" 727 Log("Migrating instances") 728 for instance in self.instances: 729 Log("instance %s", instance, indent=1) 730 op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, 731 cleanup=False) 732 733 op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, 734 cleanup=True) 735 Log("migration and migration cleanup", indent=2) 736 self.ExecOrQueue(instance, [op1, op2])
737 738 @_DoCheckInstances 739 @_DoBatch(False)
740 - def BurnImportExport(self):
741 """Export the instance, delete it, and import it back. 742 743 """ 744 Log("Exporting and re-importing instances") 745 mytor = izip(cycle(self.nodes), 746 islice(cycle(self.nodes), 1, None), 747 islice(cycle(self.nodes), 2, None), 748 self.instances) 749 750 for pnode, snode, enode, instance in mytor: 751 Log("instance %s", instance, indent=1) 752 # read the full name of the instance 753 nam_op = opcodes.OpInstanceQuery(output_fields=["name"], 754 names=[instance], use_locking=True) 755 full_name = self.ExecOp(False, nam_op)[0][0] 756 757 if self.opts.iallocator: 758 pnode = snode = None 759 import_log_msg = ("import from %s" 760 " with iallocator %s" % 761 (enode, self.opts.iallocator)) 762 elif self.opts.disk_template not in constants.DTS_INT_MIRROR: 763 snode = None 764 import_log_msg = ("import from %s to %s" % 765 (enode, pnode)) 766 else: 767 import_log_msg = ("import from %s to %s, %s" % 768 (enode, pnode, snode)) 769 770 exp_op = opcodes.OpBackupExport(instance_name=instance, 771 target_node=enode, 772 mode=constants.EXPORT_MODE_LOCAL, 773 shutdown=True) 774 rem_op = opcodes.OpInstanceRemove(instance_name=instance, 775 ignore_failures=True) 776 imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name) 777 imp_op = opcodes.OpInstanceCreate(instance_name=instance, 778 disks=[{"size": size} 779 for size in self.disk_size], 780 disk_template=self.opts.disk_template, 781 nics=self.opts.nics, 782 mode=constants.INSTANCE_IMPORT, 783 src_node=enode, 784 src_path=imp_dir, 785 pnode=pnode, 786 snode=snode, 787 start=True, 788 ip_check=self.opts.ip_check, 789 name_check=self.opts.name_check, 790 wait_for_sync=True, 791 file_storage_dir=None, 792 file_driver="loop", 793 iallocator=self.opts.iallocator, 794 beparams=self.bep, 795 hvparams=self.hvp, 796 osparams=self.opts.osparams, 797 ) 798 799 erem_op = opcodes.OpBackupRemove(instance_name=instance) 800 801 Log("export to node %s", enode, indent=2) 802 Log("remove instance", indent=2) 803 Log(import_log_msg, indent=2) 804 Log("remove export", indent=2) 805 self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
806 807 @staticmethod
808 - def StopInstanceOp(instance):
809 """Stop given instance.""" 810 return opcodes.OpInstanceShutdown(instance_name=instance)
811 812 @staticmethod
813 - def StartInstanceOp(instance):
814 """Start given instance.""" 815 return opcodes.OpInstanceStartup(instance_name=instance, force=False)
816 817 @staticmethod
818 - def RenameInstanceOp(instance, instance_new):
819 """Rename instance.""" 820 return opcodes.OpInstanceRename(instance_name=instance, 821 new_name=instance_new)
822 823 @_DoCheckInstances 824 @_DoBatch(True)
825 - def BurnStopStart(self):
826 """Stop/start the instances.""" 827 Log("Stopping and starting instances") 828 for instance in self.instances: 829 Log("instance %s", instance, indent=1) 830 op1 = self.StopInstanceOp(instance) 831 op2 = self.StartInstanceOp(instance) 832 self.ExecOrQueue(instance, [op1, op2])
833 834 @_DoBatch(False)
835 - def BurnRemove(self):
836 """Remove the instances.""" 837 Log("Removing instances") 838 for instance in self.to_rem: 839 Log("instance %s", instance, indent=1) 840 op = opcodes.OpInstanceRemove(instance_name=instance, 841 ignore_failures=True) 842 self.ExecOrQueue(instance, [op])
843
844 - def BurnRename(self):
845 """Rename the instances. 846 847 Note that this function will not execute in parallel, since we 848 only have one target for rename. 849 850 """ 851 Log("Renaming instances") 852 rename = self.opts.rename 853 for instance in self.instances: 854 Log("instance %s", instance, indent=1) 855 op_stop1 = self.StopInstanceOp(instance) 856 op_stop2 = self.StopInstanceOp(rename) 857 op_rename1 = self.RenameInstanceOp(instance, rename) 858 op_rename2 = self.RenameInstanceOp(rename, instance) 859 op_start1 = self.StartInstanceOp(rename) 860 op_start2 = self.StartInstanceOp(instance) 861 self.ExecOp(False, op_stop1, op_rename1, op_start1) 862 self._CheckInstanceAlive(rename) 863 self.ExecOp(False, op_stop2, op_rename2, op_start2) 864 self._CheckInstanceAlive(instance)
865 866 @_DoCheckInstances 867 @_DoBatch(True)
868 - def BurnReinstall(self):
869 """Reinstall the instances.""" 870 Log("Reinstalling instances") 871 for instance in self.instances: 872 Log("instance %s", instance, indent=1) 873 op1 = self.StopInstanceOp(instance) 874 op2 = opcodes.OpInstanceReinstall(instance_name=instance) 875 Log("reinstall without passing the OS", indent=2) 876 op3 = opcodes.OpInstanceReinstall(instance_name=instance, 877 os_type=self.opts.os) 878 Log("reinstall specifying the OS", indent=2) 879 op4 = self.StartInstanceOp(instance) 880 self.ExecOrQueue(instance, [op1, op2, op3, op4])
881 882 @_DoCheckInstances 883 @_DoBatch(True)
884 - def BurnReboot(self):
885 """Reboot the instances.""" 886 Log("Rebooting instances") 887 for instance in self.instances: 888 Log("instance %s", instance, indent=1) 889 ops = [] 890 for reboot_type in self.opts.reboot_types: 891 op = opcodes.OpInstanceReboot(instance_name=instance, 892 reboot_type=reboot_type, 893 ignore_secondaries=False) 894 Log("reboot with type '%s'", reboot_type, indent=2) 895 ops.append(op) 896 self.ExecOrQueue(instance, ops)
897 898 @_DoCheckInstances 899 @_DoBatch(True)
900 - def BurnRenameSame(self):
901 """Rename the instances to their own name.""" 902 Log("Renaming the instances to their own name") 903 for instance in self.instances: 904 Log("instance %s", instance, indent=1) 905 op1 = self.StopInstanceOp(instance) 906 op2 = self.RenameInstanceOp(instance, instance) 907 Log("rename to the same name", indent=2) 908 op4 = self.StartInstanceOp(instance) 909 self.ExecOrQueue(instance, [op1, op2, op4])
910 911 @_DoCheckInstances 912 @_DoBatch(True)
913 - def BurnActivateDisks(self):
914 """Activate and deactivate disks of the instances.""" 915 Log("Activating/deactivating disks") 916 for instance in self.instances: 917 Log("instance %s", instance, indent=1) 918 op_start = self.StartInstanceOp(instance) 919 op_act = opcodes.OpInstanceActivateDisks(instance_name=instance) 920 op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance) 921 op_stop = self.StopInstanceOp(instance) 922 Log("activate disks when online", indent=2) 923 Log("activate disks when offline", indent=2) 924 Log("deactivate disks (when offline)", indent=2) 925 self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
926 927 @_DoCheckInstances 928 @_DoBatch(False)
929 - def BurnAddRemoveDisks(self):
930 """Add and remove an extra disk for the instances.""" 931 Log("Adding and removing disks") 932 for instance in self.instances: 933 Log("instance %s", instance, indent=1) 934 op_add = opcodes.OpInstanceSetParams( 935 instance_name=instance, 936 disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})]) 937 op_rem = opcodes.OpInstanceSetParams( 938 instance_name=instance, disks=[(constants.DDM_REMOVE, {})]) 939 op_stop = self.StopInstanceOp(instance) 940 op_start = self.StartInstanceOp(instance) 941 Log("adding a disk", indent=2) 942 Log("removing last disk", indent=2) 943 self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
944 945 @_DoBatch(False)
946 - def BurnAddRemoveNICs(self):
947 """Add, change and remove an extra NIC for the instances.""" 948 Log("Adding and removing NICs") 949 for instance in self.instances: 950 Log("instance %s", instance, indent=1) 951 op_add = opcodes.OpInstanceSetParams( 952 instance_name=instance, nics=[(constants.DDM_ADD, {})]) 953 op_chg = opcodes.OpInstanceSetParams( 954 instance_name=instance, nics=[(constants.DDM_MODIFY, 955 -1, {"mac": constants.VALUE_GENERATE})]) 956 op_rem = opcodes.OpInstanceSetParams( 957 instance_name=instance, nics=[(constants.DDM_REMOVE, {})]) 958 Log("adding a NIC", indent=2) 959 Log("changing a NIC", indent=2) 960 Log("removing last NIC", indent=2) 961 self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
962
963 - def ConfdCallback(self, reply):
964 """Callback for confd queries""" 965 if reply.type == confd_client.UPCALL_REPLY: 966 if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK: 967 Err("Query %s gave non-ok status %s: %s" % (reply.orig_request, 968 reply.server_reply.status, 969 reply.server_reply)) 970 if reply.orig_request.type == constants.CONFD_REQ_PING: 971 Log("Ping: OK", indent=1) 972 elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER: 973 if reply.server_reply.answer == self.cluster_info["master"]: 974 Log("Master: OK", indent=1) 975 else: 976 Err("Master: wrong: %s" % reply.server_reply.answer) 977 elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME: 978 if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER: 979 Log("Node role for master: OK", indent=1) 980 else: 981 Err("Node role for master: wrong: %s" % reply.server_reply.answer)
982
983 - def DoConfdRequestReply(self, req):
984 self.confd_counting_callback.RegisterQuery(req.rsalt) 985 self.confd_client.SendRequest(req, async=False) 986 while not self.confd_counting_callback.AllAnswered(): 987 if not self.confd_client.ReceiveReply(): 988 Err("Did not receive all expected confd replies") 989 break
990
991 - def BurnConfd(self):
992 """Run confd queries for our instances. 993 994 The following confd queries are tested: 995 - CONFD_REQ_PING: simple ping 996 - CONFD_REQ_CLUSTER_MASTER: cluster master 997 - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master 998 999 """ 1000 Log("Checking confd results") 1001 1002 filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback) 1003 counting_callback = confd_client.ConfdCountingCallback(filter_callback) 1004 self.confd_counting_callback = counting_callback 1005 1006 self.confd_client = confd_client.GetConfdClient(counting_callback) 1007 1008 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) 1009 self.DoConfdRequestReply(req) 1010 1011 req = confd_client.ConfdClientRequest( 1012 type=constants.CONFD_REQ_CLUSTER_MASTER) 1013 self.DoConfdRequestReply(req) 1014 1015 req = confd_client.ConfdClientRequest( 1016 type=constants.CONFD_REQ_NODE_ROLE_BYNAME, 1017 query=self.cluster_info["master"]) 1018 self.DoConfdRequestReply(req)
1019
1020 - def _CheckInstanceAlive(self, instance):
1021 """Check if an instance is alive by doing http checks. 1022 1023 This will try to retrieve the url on the instance /hostname.txt 1024 and check that it contains the hostname of the instance. In case 1025 we get ECONNREFUSED, we retry up to the net timeout seconds, for 1026 any other error we abort. 1027 1028 """ 1029 if not self.opts.http_check: 1030 return 1031 end_time = time.time() + self.opts.net_timeout 1032 url = None 1033 while time.time() < end_time and url is None: 1034 try: 1035 url = self.url_opener.open("http://%s/hostname.txt" % instance) 1036 except IOError: 1037 # here we can have connection refused, no route to host, etc. 1038 time.sleep(1) 1039 if url is None: 1040 raise InstanceDown(instance, "Cannot contact instance") 1041 hostname = url.read().strip() 1042 url.close() 1043 if hostname != instance: 1044 raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" % 1045 (instance, hostname)))
1046
1047 - def BurninCluster(self):
1048 """Test a cluster intensively. 1049 1050 This will create instances and then start/stop/failover them. 1051 It is safe for existing instances but could impact performance. 1052 1053 """ 1054 1055 opts = self.opts 1056 1057 Log("Testing global parameters") 1058 1059 if (len(self.nodes) == 1 and 1060 opts.disk_template not in _SINGLE_NODE_DISK_TEMPLATES): 1061 Err("When one node is available/selected the disk template must" 1062 " be one of %s" % utils.CommaJoin(_SINGLE_NODE_DISK_TEMPLATES)) 1063 1064 if opts.do_confd_tests and not constants.ENABLE_CONFD: 1065 Err("You selected confd tests but confd was disabled at configure time") 1066 1067 has_err = True 1068 try: 1069 self.BurnCreateInstances() 1070 1071 if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]: 1072 self.BurnModifyRuntimeMemory() 1073 1074 if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR: 1075 self.BurnReplaceDisks1D8() 1076 if (opts.do_replace2 and len(self.nodes) > 2 and 1077 opts.disk_template in constants.DTS_INT_MIRROR): 1078 self.BurnReplaceDisks2() 1079 1080 if (opts.disk_template in constants.DTS_GROWABLE and 1081 compat.any(n > 0 for n in self.disk_growth)): 1082 self.BurnGrowDisks() 1083 1084 if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED: 1085 self.BurnFailover() 1086 1087 if opts.do_migrate: 1088 if opts.disk_template not in constants.DTS_MIRRORED: 1089 Log("Skipping migration (disk template %s does not support it)", 1090 opts.disk_template) 1091 elif not self.hv_can_migrate: 1092 Log("Skipping migration (hypervisor %s does not support it)", 1093 self.hypervisor) 1094 else: 1095 self.BurnMigrate() 1096 1097 if (opts.do_move and len(self.nodes) > 1 and 1098 opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]): 1099 self.BurnMove() 1100 1101 if (opts.do_importexport and 1102 opts.disk_template in _IMPEXP_DISK_TEMPLATES): 1103 self.BurnImportExport() 1104 1105 if opts.do_reinstall: 1106 self.BurnReinstall() 1107 1108 if opts.do_reboot: 1109 self.BurnReboot() 1110 1111 if opts.do_renamesame: 1112 self.BurnRenameSame() 1113 1114 if opts.do_addremove_disks: 1115 self.BurnAddRemoveDisks() 1116 1117 default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE] 1118 # Don't add/remove nics in routed mode, as we would need an ip to add 1119 # them with 1120 if opts.do_addremove_nics: 1121 if default_nic_mode == constants.NIC_MODE_BRIDGED: 1122 self.BurnAddRemoveNICs() 1123 else: 1124 Log("Skipping nic add/remove as the cluster is not in bridged mode") 1125 1126 if opts.do_activate_disks: 1127 self.BurnActivateDisks() 1128 1129 if opts.rename: 1130 self.BurnRename() 1131 1132 if opts.do_confd_tests: 1133 self.BurnConfd() 1134 1135 if opts.do_startstop: 1136 self.BurnStopStart() 1137 1138 has_err = False 1139 finally: 1140 if has_err: 1141 Log("Error detected: opcode buffer follows:\n\n") 1142 Log(self.GetFeedbackBuf()) 1143 Log("\n\n") 1144 if not self.opts.keep_instances: 1145 try: 1146 self.BurnRemove() 1147 except Exception, err: # pylint: disable=W0703 1148 if has_err: # already detected errors, so errors in removal 1149 # are quite expected 1150 Log("Note: error detected during instance remove: %s", err) 1151 else: # non-expected error 1152 raise 1153 1154 return constants.EXIT_SUCCESS
1155
1156 1157 -def Main():
1158 """Main function. 1159 1160 """ 1161 utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0], 1162 debug=False, stderr_logging=True) 1163 1164 return Burner().BurninCluster()
1165