Package ganeti :: Package tools :: Module burnin
[hide private]
[frames] | no frames]

Source Code for Module ganeti.tools.burnin

   1  #!/usr/bin/python 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21   
  22  """Burnin program 
  23   
  24  """ 
  25   
  26  import sys 
  27  import optparse 
  28  import time 
  29  import socket 
  30  import urllib 
  31  from itertools import izip, islice, cycle 
  32  from cStringIO import StringIO 
  33   
  34  from ganeti import opcodes 
  35  from ganeti import constants 
  36  from ganeti import cli 
  37  from ganeti import errors 
  38  from ganeti import utils 
  39  from ganeti import hypervisor 
  40  from ganeti import compat 
  41  from ganeti import pathutils 
  42   
  43  from ganeti.confd import client as confd_client 
  44   
  45   
  46  USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") 
  47   
  48  MAX_RETRIES = 3 
  49  LOG_HEADERS = { 
  50    0: "- ", 
  51    1: "* ", 
  52    2: "", 
  53    } 
  54   
  55  #: Disk templates supporting a single node 
  56  _SINGLE_NODE_DISK_TEMPLATES = compat.UniqueFrozenset([ 
  57    constants.DT_DISKLESS, 
  58    constants.DT_PLAIN, 
  59    constants.DT_FILE, 
  60    constants.DT_SHARED_FILE, 
  61    constants.DT_EXT, 
  62    constants.DT_RBD, 
  63    ]) 
  64   
  65  _SUPPORTED_DISK_TEMPLATES = compat.UniqueFrozenset([ 
  66    constants.DT_DISKLESS, 
  67    constants.DT_DRBD8, 
  68    constants.DT_EXT, 
  69    constants.DT_FILE, 
  70    constants.DT_PLAIN, 
  71    constants.DT_RBD, 
  72    constants.DT_SHARED_FILE, 
  73    ]) 
  74   
  75  #: Disk templates for which import/export is tested 
  76  _IMPEXP_DISK_TEMPLATES = (_SUPPORTED_DISK_TEMPLATES - frozenset([ 
  77    constants.DT_DISKLESS, 
  78    constants.DT_FILE, 
  79    constants.DT_SHARED_FILE, 
  80    ])) 
81 82 83 -class InstanceDown(Exception):
84 """The checked instance was not up"""
85
86 87 -class BurninFailure(Exception):
88 """Failure detected during burning"""
89
90 91 -def Usage():
92 """Shows program usage information and exits the program.""" 93 94 print >> sys.stderr, "Usage:" 95 print >> sys.stderr, USAGE 96 sys.exit(2)
97
98 99 -def Log(msg, *args, **kwargs):
100 """Simple function that prints out its argument. 101 102 """ 103 if args: 104 msg = msg % args 105 indent = kwargs.get("indent", 0) 106 sys.stdout.write("%*s%s%s\n" % (2 * indent, "", 107 LOG_HEADERS.get(indent, " "), msg)) 108 sys.stdout.flush()
109
110 111 -def Err(msg, exit_code=1):
112 """Simple error logging that prints to stderr. 113 114 """ 115 sys.stderr.write(msg + "\n") 116 sys.stderr.flush() 117 sys.exit(exit_code)
118
119 120 -class SimpleOpener(urllib.FancyURLopener):
121 """A simple url opener""" 122 # pylint: disable=W0221 123
124 - def prompt_user_passwd(self, host, realm, clear_cache=0):
125 """No-interaction version of prompt_user_passwd.""" 126 # we follow parent class' API 127 # pylint: disable=W0613 128 return None, None
129
130 - def http_error_default(self, url, fp, errcode, errmsg, headers):
131 """Custom error handling""" 132 # make sure sockets are not left in CLOSE_WAIT, this is similar 133 # but with a different exception to the BasicURLOpener class 134 _ = fp.read() # throw away data 135 fp.close() 136 raise InstanceDown("HTTP error returned: code %s, msg %s" % 137 (errcode, errmsg))
138 139 140 OPTIONS = [ 141 cli.cli_option("-o", "--os", dest="os", default=None, 142 help="OS to use during burnin", 143 metavar="<OS>", 144 completion_suggest=cli.OPT_COMPL_ONE_OS), 145 cli.HYPERVISOR_OPT, 146 cli.OSPARAMS_OPT, 147 cli.cli_option("--disk-size", dest="disk_size", 148 help="Disk size (determines disk count)", 149 default="128m", type="string", metavar="<size,size,...>", 150 completion_suggest=("128M 512M 1G 4G 1G,256M" 151 " 4G,1G,1G 10G").split()), 152 cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth", 153 default="128m", type="string", metavar="<size,size,...>"), 154 cli.cli_option("--mem-size", dest="mem_size", help="Memory size", 155 default=None, type="unit", metavar="<size>", 156 completion_suggest=("128M 256M 512M 1G 4G 8G" 157 " 12G 16G").split()), 158 cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size", 159 default=256, type="unit", metavar="<size>", 160 completion_suggest=("128M 256M 512M 1G 4G 8G" 161 " 12G 16G").split()), 162 cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size", 163 default=128, type="unit", metavar="<size>", 164 completion_suggest=("128M 256M 512M 1G 4G 8G" 165 " 12G 16G").split()), 166 cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count", 167 default=3, type="unit", metavar="<count>", 168 completion_suggest=("1 2 3 4").split()), 169 cli.DEBUG_OPT, 170 cli.VERBOSE_OPT, 171 cli.NOIPCHECK_OPT, 172 cli.NONAMECHECK_OPT, 173 cli.EARLY_RELEASE_OPT, 174 cli.cli_option("--no-replace1", dest="do_replace1", 175 help="Skip disk replacement with the same secondary", 176 action="store_false", default=True), 177 cli.cli_option("--no-replace2", dest="do_replace2", 178 help="Skip disk replacement with a different secondary", 179 action="store_false", default=True), 180 cli.cli_option("--no-failover", dest="do_failover", 181 help="Skip instance failovers", action="store_false", 182 default=True), 183 cli.cli_option("--no-migrate", dest="do_migrate", 184 help="Skip instance live migration", 185 action="store_false", default=True), 186 cli.cli_option("--no-move", dest="do_move", 187 help="Skip instance moves", action="store_false", 188 default=True), 189 cli.cli_option("--no-importexport", dest="do_importexport", 190 help="Skip instance export/import", action="store_false", 191 default=True), 192 cli.cli_option("--no-startstop", dest="do_startstop", 193 help="Skip instance stop/start", action="store_false", 194 default=True), 195 cli.cli_option("--no-reinstall", dest="do_reinstall", 196 help="Skip instance reinstall", action="store_false", 197 default=True), 198 cli.cli_option("--no-reboot", dest="do_reboot", 199 help="Skip instance reboot", action="store_false", 200 default=True), 201 cli.cli_option("--no-renamesame", dest="do_renamesame", 202 help="Skip instance rename to same name", action="store_false", 203 default=True), 204 cli.cli_option("--reboot-types", dest="reboot_types", 205 help="Specify the reboot types", default=None), 206 cli.cli_option("--no-activate-disks", dest="do_activate_disks", 207 help="Skip disk activation/deactivation", 208 action="store_false", default=True), 209 cli.cli_option("--no-add-disks", dest="do_addremove_disks", 210 help="Skip disk addition/removal", 211 action="store_false", default=True), 212 cli.cli_option("--no-add-nics", dest="do_addremove_nics", 213 help="Skip NIC addition/removal", 214 action="store_false", default=True), 215 cli.cli_option("--no-nics", dest="nics", 216 help="No network interfaces", action="store_const", 217 const=[], default=[{}]), 218 cli.cli_option("--no-confd", dest="do_confd_tests", 219 help="Skip confd queries", 220 action="store_false", default=constants.ENABLE_CONFD), 221 cli.cli_option("--rename", dest="rename", default=None, 222 help=("Give one unused instance name which is taken" 223 " to start the renaming sequence"), 224 metavar="<instance_name>"), 225 cli.cli_option("-t", "--disk-template", dest="disk_template", 226 choices=list(_SUPPORTED_DISK_TEMPLATES), 227 default=constants.DT_DRBD8, 228 help=("Disk template (default %s, otherwise one of %s)" % 229 (constants.DT_DRBD8, 230 utils.CommaJoin(_SUPPORTED_DISK_TEMPLATES)))), 231 cli.cli_option("-n", "--nodes", dest="nodes", default="", 232 help=("Comma separated list of nodes to perform" 233 " the burnin on (defaults to all nodes)"), 234 completion_suggest=cli.OPT_COMPL_MANY_NODES), 235 cli.cli_option("-I", "--iallocator", dest="iallocator", 236 default=None, type="string", 237 help=("Perform the allocation using an iallocator" 238 " instead of fixed node spread (node restrictions no" 239 " longer apply, therefore -n/--nodes must not be" 240 " used"), 241 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR), 242 cli.cli_option("-p", "--parallel", default=False, action="store_true", 243 dest="parallel", 244 help=("Enable parallelization of some operations in" 245 " order to speed burnin or to test granular locking")), 246 cli.cli_option("--net-timeout", default=15, type="int", 247 dest="net_timeout", 248 help=("The instance check network timeout in seconds" 249 " (defaults to 15 seconds)"), 250 completion_suggest="15 60 300 900".split()), 251 cli.cli_option("-C", "--http-check", default=False, action="store_true", 252 dest="http_check", 253 help=("Enable checking of instance status via http," 254 " looking for /hostname.txt that should contain the" 255 " name of the instance")), 256 cli.cli_option("-K", "--keep-instances", default=False, 257 action="store_true", 258 dest="keep_instances", 259 help=("Leave instances on the cluster after burnin," 260 " for investigation in case of errors or simply" 261 " to use them")), 262 cli.REASON_OPT, 263 ] 264 265 # Mainly used for bash completion 266 ARGUMENTS = [cli.ArgInstance(min=1)]
267 268 269 -def _DoCheckInstances(fn):
270 """Decorator for checking instances. 271 272 """ 273 def wrapper(self, *args, **kwargs): 274 val = fn(self, *args, **kwargs) 275 for instance in self.instances: 276 self._CheckInstanceAlive(instance) # pylint: disable=W0212 277 return val
278 279 return wrapper 280
281 282 -def _DoBatch(retry):
283 """Decorator for possible batch operations. 284 285 Must come after the _DoCheckInstances decorator (if any). 286 287 @param retry: whether this is a retryable batch, will be 288 passed to StartBatch 289 290 """ 291 def wrap(fn): 292 def batched(self, *args, **kwargs): 293 self.StartBatch(retry) 294 val = fn(self, *args, **kwargs) 295 self.CommitQueue() 296 return val
297 return batched 298 299 return wrap 300
301 302 -class Burner(object):
303 """Burner class.""" 304
305 - def __init__(self):
306 """Constructor.""" 307 self.url_opener = SimpleOpener() 308 self._feed_buf = StringIO() 309 self.nodes = [] 310 self.instances = [] 311 self.to_rem = [] 312 self.queued_ops = [] 313 self.opts = None 314 self.queue_retry = False 315 self.disk_count = self.disk_growth = self.disk_size = None 316 self.hvp = self.bep = None 317 self.ParseOptions() 318 self.cl = cli.GetClient() 319 self.GetState()
320
321 - def ClearFeedbackBuf(self):
322 """Clear the feedback buffer.""" 323 self._feed_buf.truncate(0)
324
325 - def GetFeedbackBuf(self):
326 """Return the contents of the buffer.""" 327 return self._feed_buf.getvalue()
328
329 - def Feedback(self, msg):
330 """Acumulate feedback in our buffer.""" 331 formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2]) 332 self._feed_buf.write(formatted_msg + "\n") 333 if self.opts.verbose: 334 Log(formatted_msg, indent=3)
335
336 - def MaybeRetry(self, retry_count, msg, fn, *args):
337 """Possibly retry a given function execution. 338 339 @type retry_count: int 340 @param retry_count: retry counter: 341 - 0: non-retryable action 342 - 1: last retry for a retryable action 343 - MAX_RETRIES: original try for a retryable action 344 @type msg: str 345 @param msg: the kind of the operation 346 @type fn: callable 347 @param fn: the function to be called 348 349 """ 350 try: 351 val = fn(*args) 352 if retry_count > 0 and retry_count < MAX_RETRIES: 353 Log("Idempotent %s succeeded after %d retries", 354 msg, MAX_RETRIES - retry_count) 355 return val 356 except Exception, err: # pylint: disable=W0703 357 if retry_count == 0: 358 Log("Non-idempotent %s failed, aborting", msg) 359 raise 360 elif retry_count == 1: 361 Log("Idempotent %s repeated failure, aborting", msg) 362 raise 363 else: 364 Log("Idempotent %s failed, retry #%d/%d: %s", 365 msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err) 366 self.MaybeRetry(retry_count - 1, msg, fn, *args)
367
368 - def _ExecOp(self, *ops):
369 """Execute one or more opcodes and manage the exec buffer. 370 371 @return: if only opcode has been passed, we return its result; 372 otherwise we return the list of results 373 374 """ 375 job_id = cli.SendJob(ops, cl=self.cl) 376 results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback) 377 if len(ops) == 1: 378 return results[0] 379 else: 380 return results
381
382 - def ExecOp(self, retry, *ops):
383 """Execute one or more opcodes and manage the exec buffer. 384 385 @return: if only opcode has been passed, we return its result; 386 otherwise we return the list of results 387 388 """ 389 if retry: 390 rval = MAX_RETRIES 391 else: 392 rval = 0 393 cli.SetGenericOpcodeOpts(ops, self.opts) 394 return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
395
396 - def ExecOrQueue(self, name, ops, post_process=None):
397 """Execute an opcode and manage the exec buffer.""" 398 if self.opts.parallel: 399 cli.SetGenericOpcodeOpts(ops, self.opts) 400 self.queued_ops.append((ops, name, post_process)) 401 else: 402 val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142 403 if post_process is not None: 404 post_process() 405 return val
406
407 - def StartBatch(self, retry):
408 """Start a new batch of jobs. 409 410 @param retry: whether this is a retryable batch 411 412 """ 413 self.queued_ops = [] 414 self.queue_retry = retry
415
416 - def CommitQueue(self):
417 """Execute all submitted opcodes in case of parallel burnin""" 418 if not self.opts.parallel or not self.queued_ops: 419 return 420 421 if self.queue_retry: 422 rval = MAX_RETRIES 423 else: 424 rval = 0 425 426 try: 427 results = self.MaybeRetry(rval, "jobset", self.ExecJobSet, 428 self.queued_ops) 429 finally: 430 self.queued_ops = [] 431 return results
432
433 - def ExecJobSet(self, jobs):
434 """Execute a set of jobs and return once all are done. 435 436 The method will return the list of results, if all jobs are 437 successful. Otherwise, OpExecError will be raised from within 438 cli.py. 439 440 """ 441 self.ClearFeedbackBuf() 442 jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback) 443 for ops, name, _ in jobs: 444 jex.QueueJob(name, *ops) # pylint: disable=W0142 445 try: 446 results = jex.GetResults() 447 except Exception, err: # pylint: disable=W0703 448 Log("Jobs failed: %s", err) 449 raise BurninFailure() 450 451 fail = False 452 val = [] 453 for (_, name, post_process), (success, result) in zip(jobs, results): 454 if success: 455 if post_process: 456 try: 457 post_process() 458 except Exception, err: # pylint: disable=W0703 459 Log("Post process call for job %s failed: %s", name, err) 460 fail = True 461 val.append(result) 462 else: 463 fail = True 464 465 if fail: 466 raise BurninFailure() 467 468 return val
469
470 - def ParseOptions(self):
471 """Parses the command line options. 472 473 In case of command line errors, it will show the usage and exit the 474 program. 475 476 """ 477 parser = optparse.OptionParser(usage="\n%s" % USAGE, 478 version=("%%prog (ganeti) %s" % 479 constants.RELEASE_VERSION), 480 option_list=OPTIONS) 481 482 options, args = parser.parse_args() 483 if len(args) < 1 or options.os is None: 484 Usage() 485 486 if options.mem_size: 487 options.maxmem_size = options.mem_size 488 options.minmem_size = options.mem_size 489 elif options.minmem_size > options.maxmem_size: 490 Err("Maximum memory lower than minimum memory") 491 492 if options.disk_template not in _SUPPORTED_DISK_TEMPLATES: 493 Err("Unknown or unsupported disk template '%s'" % options.disk_template) 494 495 if options.disk_template == constants.DT_DISKLESS: 496 disk_size = disk_growth = [] 497 options.do_addremove_disks = False 498 else: 499 disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")] 500 disk_growth = [utils.ParseUnit(v) 501 for v in options.disk_growth.split(",")] 502 if len(disk_growth) != len(disk_size): 503 Err("Wrong disk sizes/growth combination") 504 if ((disk_size and options.disk_template == constants.DT_DISKLESS) or 505 (not disk_size and options.disk_template != constants.DT_DISKLESS)): 506 Err("Wrong disk count/disk template combination") 507 508 self.disk_size = disk_size 509 self.disk_growth = disk_growth 510 self.disk_count = len(disk_size) 511 512 if options.nodes and options.iallocator: 513 Err("Give either the nodes option or the iallocator option, not both") 514 515 if options.http_check and not options.name_check: 516 Err("Can't enable HTTP checks without name checks") 517 518 self.opts = options 519 self.instances = args 520 self.bep = { 521 constants.BE_MINMEM: options.minmem_size, 522 constants.BE_MAXMEM: options.maxmem_size, 523 constants.BE_VCPUS: options.vcpu_count, 524 } 525 526 self.hypervisor = None 527 self.hvp = {} 528 if options.hypervisor: 529 self.hypervisor, self.hvp = options.hypervisor 530 531 if options.reboot_types is None: 532 options.reboot_types = constants.REBOOT_TYPES 533 else: 534 options.reboot_types = options.reboot_types.split(",") 535 rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES) 536 if rt_diff: 537 Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff)) 538 539 socket.setdefaulttimeout(options.net_timeout)
540
541 - def GetState(self):
542 """Read the cluster state from the master daemon.""" 543 if self.opts.nodes: 544 names = self.opts.nodes.split(",") 545 else: 546 names = [] 547 try: 548 op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"], 549 names=names, use_locking=True) 550 result = self.ExecOp(True, op) 551 except errors.GenericError, err: 552 err_code, msg = cli.FormatError(err) 553 Err(msg, exit_code=err_code) 554 self.nodes = [data[0] for data in result if not (data[1] or data[2])] 555 556 op_diagnose = opcodes.OpOsDiagnose(output_fields=["name", 557 "variants", 558 "hidden"], 559 names=[]) 560 result = self.ExecOp(True, op_diagnose) 561 562 if not result: 563 Err("Can't get the OS list") 564 565 found = False 566 for (name, variants, _) in result: 567 if self.opts.os in cli.CalculateOSNames(name, variants): 568 found = True 569 break 570 571 if not found: 572 Err("OS '%s' not found" % self.opts.os) 573 574 cluster_info = self.cl.QueryClusterInfo() 575 self.cluster_info = cluster_info 576 if not self.cluster_info: 577 Err("Can't get cluster info") 578 579 default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT] 580 self.cluster_default_nicparams = default_nic_params 581 if self.hypervisor is None: 582 self.hypervisor = self.cluster_info["default_hypervisor"] 583 self.hv_can_migrate = \ 584 hypervisor.GetHypervisorClass(self.hypervisor).CAN_MIGRATE
585 586 @_DoCheckInstances 587 @_DoBatch(False)
588 - def BurnCreateInstances(self):
589 """Create the given instances. 590 591 """ 592 self.to_rem = [] 593 mytor = izip(cycle(self.nodes), 594 islice(cycle(self.nodes), 1, None), 595 self.instances) 596 597 Log("Creating instances") 598 for pnode, snode, instance in mytor: 599 Log("instance %s", instance, indent=1) 600 if self.opts.iallocator: 601 pnode = snode = None 602 msg = "with iallocator %s" % self.opts.iallocator 603 elif self.opts.disk_template not in constants.DTS_INT_MIRROR: 604 snode = None 605 msg = "on %s" % pnode 606 else: 607 msg = "on %s, %s" % (pnode, snode) 608 609 Log(msg, indent=2) 610 611 op = opcodes.OpInstanceCreate(instance_name=instance, 612 disks=[{"size": size} 613 for size in self.disk_size], 614 disk_template=self.opts.disk_template, 615 nics=self.opts.nics, 616 mode=constants.INSTANCE_CREATE, 617 os_type=self.opts.os, 618 pnode=pnode, 619 snode=snode, 620 start=True, 621 ip_check=self.opts.ip_check, 622 name_check=self.opts.name_check, 623 wait_for_sync=True, 624 file_driver="loop", 625 file_storage_dir=None, 626 iallocator=self.opts.iallocator, 627 beparams=self.bep, 628 hvparams=self.hvp, 629 hypervisor=self.hypervisor, 630 osparams=self.opts.osparams, 631 ) 632 remove_instance = lambda name: lambda: self.to_rem.append(name) 633 self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
634 635 @_DoBatch(False)
636 - def BurnModifyRuntimeMemory(self):
637 """Alter the runtime memory.""" 638 Log("Setting instance runtime memory") 639 for instance in self.instances: 640 Log("instance %s", instance, indent=1) 641 tgt_mem = self.bep[constants.BE_MINMEM] 642 op = opcodes.OpInstanceSetParams(instance_name=instance, 643 runtime_mem=tgt_mem) 644 Log("Set memory to %s MB", tgt_mem, indent=2) 645 self.ExecOrQueue(instance, [op])
646 647 @_DoBatch(False)
648 - def BurnGrowDisks(self):
649 """Grow both the os and the swap disks by the requested amount, if any.""" 650 Log("Growing disks") 651 for instance in self.instances: 652 Log("instance %s", instance, indent=1) 653 for idx, growth in enumerate(self.disk_growth): 654 if growth > 0: 655 op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx, 656 amount=growth, wait_for_sync=True) 657 Log("increase disk/%s by %s MB", idx, growth, indent=2) 658 self.ExecOrQueue(instance, [op])
659 660 @_DoBatch(True)
661 - def BurnReplaceDisks1D8(self):
662 """Replace disks on primary and secondary for drbd8.""" 663 Log("Replacing disks on the same nodes") 664 early_release = self.opts.early_release 665 for instance in self.instances: 666 Log("instance %s", instance, indent=1) 667 ops = [] 668 for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI: 669 op = opcodes.OpInstanceReplaceDisks(instance_name=instance, 670 mode=mode, 671 disks=list(range(self.disk_count)), 672 early_release=early_release) 673 Log("run %s", mode, indent=2) 674 ops.append(op) 675 self.ExecOrQueue(instance, ops)
676 677 @_DoBatch(True)
678 - def BurnReplaceDisks2(self):
679 """Replace secondary node.""" 680 Log("Changing the secondary node") 681 mode = constants.REPLACE_DISK_CHG 682 683 mytor = izip(islice(cycle(self.nodes), 2, None), 684 self.instances) 685 for tnode, instance in mytor: 686 Log("instance %s", instance, indent=1) 687 if self.opts.iallocator: 688 tnode = None 689 msg = "with iallocator %s" % self.opts.iallocator 690 else: 691 msg = tnode 692 op = opcodes.OpInstanceReplaceDisks(instance_name=instance, 693 mode=mode, 694 remote_node=tnode, 695 iallocator=self.opts.iallocator, 696 disks=[], 697 early_release=self.opts.early_release) 698 Log("run %s %s", mode, msg, indent=2) 699 self.ExecOrQueue(instance, [op])
700 701 @_DoCheckInstances 702 @_DoBatch(False)
703 - def BurnFailover(self):
704 """Failover the instances.""" 705 Log("Failing over instances") 706 for instance in self.instances: 707 Log("instance %s", instance, indent=1) 708 op = opcodes.OpInstanceFailover(instance_name=instance, 709 ignore_consistency=False) 710 self.ExecOrQueue(instance, [op])
711 712 @_DoCheckInstances 713 @_DoBatch(False)
714 - def BurnMove(self):
715 """Move the instances.""" 716 Log("Moving instances") 717 mytor = izip(islice(cycle(self.nodes), 1, None), 718 self.instances) 719 for tnode, instance in mytor: 720 Log("instance %s", instance, indent=1) 721 op = opcodes.OpInstanceMove(instance_name=instance, 722 target_node=tnode) 723 self.ExecOrQueue(instance, [op])
724 725 @_DoBatch(False)
726 - def BurnMigrate(self):
727 """Migrate the instances.""" 728 Log("Migrating instances") 729 for instance in self.instances: 730 Log("instance %s", instance, indent=1) 731 op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, 732 cleanup=False) 733 734 op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, 735 cleanup=True) 736 Log("migration and migration cleanup", indent=2) 737 self.ExecOrQueue(instance, [op1, op2])
738 739 @_DoCheckInstances 740 @_DoBatch(False)
741 - def BurnImportExport(self):
742 """Export the instance, delete it, and import it back. 743 744 """ 745 Log("Exporting and re-importing instances") 746 mytor = izip(cycle(self.nodes), 747 islice(cycle(self.nodes), 1, None), 748 islice(cycle(self.nodes), 2, None), 749 self.instances) 750 751 for pnode, snode, enode, instance in mytor: 752 Log("instance %s", instance, indent=1) 753 # read the full name of the instance 754 nam_op = opcodes.OpInstanceQuery(output_fields=["name"], 755 names=[instance], use_locking=True) 756 full_name = self.ExecOp(False, nam_op)[0][0] 757 758 if self.opts.iallocator: 759 pnode = snode = None 760 import_log_msg = ("import from %s" 761 " with iallocator %s" % 762 (enode, self.opts.iallocator)) 763 elif self.opts.disk_template not in constants.DTS_INT_MIRROR: 764 snode = None 765 import_log_msg = ("import from %s to %s" % 766 (enode, pnode)) 767 else: 768 import_log_msg = ("import from %s to %s, %s" % 769 (enode, pnode, snode)) 770 771 exp_op = opcodes.OpBackupExport(instance_name=instance, 772 target_node=enode, 773 mode=constants.EXPORT_MODE_LOCAL, 774 shutdown=True) 775 rem_op = opcodes.OpInstanceRemove(instance_name=instance, 776 ignore_failures=True) 777 imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name) 778 imp_op = opcodes.OpInstanceCreate(instance_name=instance, 779 disks=[{"size": size} 780 for size in self.disk_size], 781 disk_template=self.opts.disk_template, 782 nics=self.opts.nics, 783 mode=constants.INSTANCE_IMPORT, 784 src_node=enode, 785 src_path=imp_dir, 786 pnode=pnode, 787 snode=snode, 788 start=True, 789 ip_check=self.opts.ip_check, 790 name_check=self.opts.name_check, 791 wait_for_sync=True, 792 file_storage_dir=None, 793 file_driver="loop", 794 iallocator=self.opts.iallocator, 795 beparams=self.bep, 796 hvparams=self.hvp, 797 osparams=self.opts.osparams, 798 ) 799 800 erem_op = opcodes.OpBackupRemove(instance_name=instance) 801 802 Log("export to node %s", enode, indent=2) 803 Log("remove instance", indent=2) 804 Log(import_log_msg, indent=2) 805 Log("remove export", indent=2) 806 self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
807 808 @staticmethod
809 - def StopInstanceOp(instance):
810 """Stop given instance.""" 811 return opcodes.OpInstanceShutdown(instance_name=instance)
812 813 @staticmethod
814 - def StartInstanceOp(instance):
815 """Start given instance.""" 816 return opcodes.OpInstanceStartup(instance_name=instance, force=False)
817 818 @staticmethod
819 - def RenameInstanceOp(instance, instance_new):
820 """Rename instance.""" 821 return opcodes.OpInstanceRename(instance_name=instance, 822 new_name=instance_new)
823 824 @_DoCheckInstances 825 @_DoBatch(True)
826 - def BurnStopStart(self):
827 """Stop/start the instances.""" 828 Log("Stopping and starting instances") 829 for instance in self.instances: 830 Log("instance %s", instance, indent=1) 831 op1 = self.StopInstanceOp(instance) 832 op2 = self.StartInstanceOp(instance) 833 self.ExecOrQueue(instance, [op1, op2])
834 835 @_DoBatch(False)
836 - def BurnRemove(self):
837 """Remove the instances.""" 838 Log("Removing instances") 839 for instance in self.to_rem: 840 Log("instance %s", instance, indent=1) 841 op = opcodes.OpInstanceRemove(instance_name=instance, 842 ignore_failures=True) 843 self.ExecOrQueue(instance, [op])
844
845 - def BurnRename(self):
846 """Rename the instances. 847 848 Note that this function will not execute in parallel, since we 849 only have one target for rename. 850 851 """ 852 Log("Renaming instances") 853 rename = self.opts.rename 854 for instance in self.instances: 855 Log("instance %s", instance, indent=1) 856 op_stop1 = self.StopInstanceOp(instance) 857 op_stop2 = self.StopInstanceOp(rename) 858 op_rename1 = self.RenameInstanceOp(instance, rename) 859 op_rename2 = self.RenameInstanceOp(rename, instance) 860 op_start1 = self.StartInstanceOp(rename) 861 op_start2 = self.StartInstanceOp(instance) 862 self.ExecOp(False, op_stop1, op_rename1, op_start1) 863 self._CheckInstanceAlive(rename) 864 self.ExecOp(False, op_stop2, op_rename2, op_start2) 865 self._CheckInstanceAlive(instance)
866 867 @_DoCheckInstances 868 @_DoBatch(True)
869 - def BurnReinstall(self):
870 """Reinstall the instances.""" 871 Log("Reinstalling instances") 872 for instance in self.instances: 873 Log("instance %s", instance, indent=1) 874 op1 = self.StopInstanceOp(instance) 875 op2 = opcodes.OpInstanceReinstall(instance_name=instance) 876 Log("reinstall without passing the OS", indent=2) 877 op3 = opcodes.OpInstanceReinstall(instance_name=instance, 878 os_type=self.opts.os) 879 Log("reinstall specifying the OS", indent=2) 880 op4 = self.StartInstanceOp(instance) 881 self.ExecOrQueue(instance, [op1, op2, op3, op4])
882 883 @_DoCheckInstances 884 @_DoBatch(True)
885 - def BurnReboot(self):
886 """Reboot the instances.""" 887 Log("Rebooting instances") 888 for instance in self.instances: 889 Log("instance %s", instance, indent=1) 890 ops = [] 891 for reboot_type in self.opts.reboot_types: 892 op = opcodes.OpInstanceReboot(instance_name=instance, 893 reboot_type=reboot_type, 894 ignore_secondaries=False) 895 Log("reboot with type '%s'", reboot_type, indent=2) 896 ops.append(op) 897 self.ExecOrQueue(instance, ops)
898 899 @_DoCheckInstances 900 @_DoBatch(True)
901 - def BurnRenameSame(self):
902 """Rename the instances to their own name.""" 903 Log("Renaming the instances to their own name") 904 for instance in self.instances: 905 Log("instance %s", instance, indent=1) 906 op1 = self.StopInstanceOp(instance) 907 op2 = self.RenameInstanceOp(instance, instance) 908 Log("rename to the same name", indent=2) 909 op4 = self.StartInstanceOp(instance) 910 self.ExecOrQueue(instance, [op1, op2, op4])
911 912 @_DoCheckInstances 913 @_DoBatch(True)
914 - def BurnActivateDisks(self):
915 """Activate and deactivate disks of the instances.""" 916 Log("Activating/deactivating disks") 917 for instance in self.instances: 918 Log("instance %s", instance, indent=1) 919 op_start = self.StartInstanceOp(instance) 920 op_act = opcodes.OpInstanceActivateDisks(instance_name=instance) 921 op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance) 922 op_stop = self.StopInstanceOp(instance) 923 Log("activate disks when online", indent=2) 924 Log("activate disks when offline", indent=2) 925 Log("deactivate disks (when offline)", indent=2) 926 self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
927 928 @_DoCheckInstances 929 @_DoBatch(False)
930 - def BurnAddRemoveDisks(self):
931 """Add and remove an extra disk for the instances.""" 932 Log("Adding and removing disks") 933 for instance in self.instances: 934 Log("instance %s", instance, indent=1) 935 op_add = opcodes.OpInstanceSetParams( 936 instance_name=instance, 937 disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})]) 938 op_rem = opcodes.OpInstanceSetParams( 939 instance_name=instance, disks=[(constants.DDM_REMOVE, {})]) 940 op_stop = self.StopInstanceOp(instance) 941 op_start = self.StartInstanceOp(instance) 942 Log("adding a disk", indent=2) 943 Log("removing last disk", indent=2) 944 self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
945 946 @_DoBatch(False)
947 - def BurnAddRemoveNICs(self):
948 """Add, change and remove an extra NIC for the instances.""" 949 Log("Adding and removing NICs") 950 for instance in self.instances: 951 Log("instance %s", instance, indent=1) 952 op_add = opcodes.OpInstanceSetParams( 953 instance_name=instance, nics=[(constants.DDM_ADD, {})]) 954 op_chg = opcodes.OpInstanceSetParams( 955 instance_name=instance, nics=[(constants.DDM_MODIFY, 956 -1, {"mac": constants.VALUE_GENERATE})]) 957 op_rem = opcodes.OpInstanceSetParams( 958 instance_name=instance, nics=[(constants.DDM_REMOVE, {})]) 959 Log("adding a NIC", indent=2) 960 Log("changing a NIC", indent=2) 961 Log("removing last NIC", indent=2) 962 self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
963
964 - def ConfdCallback(self, reply):
965 """Callback for confd queries""" 966 if reply.type == confd_client.UPCALL_REPLY: 967 if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK: 968 Err("Query %s gave non-ok status %s: %s" % (reply.orig_request, 969 reply.server_reply.status, 970 reply.server_reply)) 971 if reply.orig_request.type == constants.CONFD_REQ_PING: 972 Log("Ping: OK", indent=1) 973 elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER: 974 if reply.server_reply.answer == self.cluster_info["master"]: 975 Log("Master: OK", indent=1) 976 else: 977 Err("Master: wrong: %s" % reply.server_reply.answer) 978 elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME: 979 if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER: 980 Log("Node role for master: OK", indent=1) 981 else: 982 Err("Node role for master: wrong: %s" % reply.server_reply.answer)
983
984 - def DoConfdRequestReply(self, req):
985 self.confd_counting_callback.RegisterQuery(req.rsalt) 986 self.confd_client.SendRequest(req, async=False) 987 while not self.confd_counting_callback.AllAnswered(): 988 if not self.confd_client.ReceiveReply(): 989 Err("Did not receive all expected confd replies") 990 break
991
992 - def BurnConfd(self):
993 """Run confd queries for our instances. 994 995 The following confd queries are tested: 996 - CONFD_REQ_PING: simple ping 997 - CONFD_REQ_CLUSTER_MASTER: cluster master 998 - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master 999 1000 """ 1001 Log("Checking confd results") 1002 1003 filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback) 1004 counting_callback = confd_client.ConfdCountingCallback(filter_callback) 1005 self.confd_counting_callback = counting_callback 1006 1007 self.confd_client = confd_client.GetConfdClient(counting_callback) 1008 1009 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) 1010 self.DoConfdRequestReply(req) 1011 1012 req = confd_client.ConfdClientRequest( 1013 type=constants.CONFD_REQ_CLUSTER_MASTER) 1014 self.DoConfdRequestReply(req) 1015 1016 req = confd_client.ConfdClientRequest( 1017 type=constants.CONFD_REQ_NODE_ROLE_BYNAME, 1018 query=self.cluster_info["master"]) 1019 self.DoConfdRequestReply(req)
1020
1021 - def _CheckInstanceAlive(self, instance):
1022 """Check if an instance is alive by doing http checks. 1023 1024 This will try to retrieve the url on the instance /hostname.txt 1025 and check that it contains the hostname of the instance. In case 1026 we get ECONNREFUSED, we retry up to the net timeout seconds, for 1027 any other error we abort. 1028 1029 """ 1030 if not self.opts.http_check: 1031 return 1032 end_time = time.time() + self.opts.net_timeout 1033 url = None 1034 while time.time() < end_time and url is None: 1035 try: 1036 url = self.url_opener.open("http://%s/hostname.txt" % instance) 1037 except IOError: 1038 # here we can have connection refused, no route to host, etc. 1039 time.sleep(1) 1040 if url is None: 1041 raise InstanceDown(instance, "Cannot contact instance") 1042 hostname = url.read().strip() 1043 url.close() 1044 if hostname != instance: 1045 raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" % 1046 (instance, hostname)))
1047
1048 - def BurninCluster(self):
1049 """Test a cluster intensively. 1050 1051 This will create instances and then start/stop/failover them. 1052 It is safe for existing instances but could impact performance. 1053 1054 """ 1055 1056 Log("Testing global parameters") 1057 1058 if (len(self.nodes) == 1 and 1059 self.opts.disk_template not in _SINGLE_NODE_DISK_TEMPLATES): 1060 Err("When one node is available/selected the disk template must" 1061 " be one of %s" % utils.CommaJoin(_SINGLE_NODE_DISK_TEMPLATES)) 1062 1063 if self.opts.do_confd_tests and not constants.ENABLE_CONFD: 1064 Err("You selected confd tests but confd was disabled at configure time") 1065 1066 has_err = True 1067 try: 1068 self.BurnCreateInstances() 1069 1070 if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]: 1071 self.BurnModifyRuntimeMemory() 1072 1073 if self.opts.do_replace1 and \ 1074 self.opts.disk_template in constants.DTS_INT_MIRROR: 1075 self.BurnReplaceDisks1D8() 1076 if (self.opts.do_replace2 and len(self.nodes) > 2 and 1077 self.opts.disk_template in constants.DTS_INT_MIRROR): 1078 self.BurnReplaceDisks2() 1079 1080 if (self.opts.disk_template in constants.DTS_GROWABLE and 1081 compat.any(n > 0 for n in self.disk_growth)): 1082 self.BurnGrowDisks() 1083 1084 if self.opts.do_failover and \ 1085 self.opts.disk_template in constants.DTS_MIRRORED: 1086 self.BurnFailover() 1087 1088 if self.opts.do_migrate: 1089 if self.opts.disk_template not in constants.DTS_MIRRORED: 1090 Log("Skipping migration (disk template %s does not support it)", 1091 self.opts.disk_template) 1092 elif not self.hv_can_migrate: 1093 Log("Skipping migration (hypervisor %s does not support it)", 1094 self.hypervisor) 1095 else: 1096 self.BurnMigrate() 1097 1098 if (self.opts.do_move and len(self.nodes) > 1 and 1099 self.opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]): 1100 self.BurnMove() 1101 1102 if (self.opts.do_importexport and 1103 self.opts.disk_template in _IMPEXP_DISK_TEMPLATES): 1104 self.BurnImportExport() 1105 1106 if self.opts.do_reinstall: 1107 self.BurnReinstall() 1108 1109 if self.opts.do_reboot: 1110 self.BurnReboot() 1111 1112 if self.opts.do_renamesame: 1113 self.BurnRenameSame() 1114 1115 if self.opts.do_addremove_disks: 1116 self.BurnAddRemoveDisks() 1117 1118 default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE] 1119 # Don't add/remove nics in routed mode, as we would need an ip to add 1120 # them with 1121 if self.opts.do_addremove_nics: 1122 if default_nic_mode == constants.NIC_MODE_BRIDGED: 1123 self.BurnAddRemoveNICs() 1124 else: 1125 Log("Skipping nic add/remove as the cluster is not in bridged mode") 1126 1127 if self.opts.do_activate_disks: 1128 self.BurnActivateDisks() 1129 1130 if self.opts.rename: 1131 self.BurnRename() 1132 1133 if self.opts.do_confd_tests: 1134 self.BurnConfd() 1135 1136 if self.opts.do_startstop: 1137 self.BurnStopStart() 1138 1139 has_err = False 1140 finally: 1141 if has_err: 1142 Log("Error detected: opcode buffer follows:\n\n") 1143 Log(self.GetFeedbackBuf()) 1144 Log("\n\n") 1145 if not self.opts.keep_instances: 1146 try: 1147 self.BurnRemove() 1148 except Exception, err: # pylint: disable=W0703 1149 if has_err: # already detected errors, so errors in removal 1150 # are quite expected 1151 Log("Note: error detected during instance remove: %s", err) 1152 else: # non-expected error 1153 raise 1154 1155 return constants.EXIT_SUCCESS
1156
1157 1158 -def Main():
1159 """Main function. 1160 1161 """ 1162 utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0], 1163 debug=False, stderr_logging=True) 1164 1165 return Burner().BurninCluster()
1166