Package ganeti :: Package client :: Module gnt_debug
[hide private]
[frames] | no frames]

Source Code for Module ganeti.client.gnt_debug

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2010, 2011 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21  """Debugging commands""" 
 22   
 23  # pylint: disable-msg=W0401,W0614,C0103 
 24  # W0401: Wildcard import ganeti.cli 
 25  # W0614: Unused import %s from wildcard import (since we need cli) 
 26  # C0103: Invalid name gnt-backup 
 27   
 28  import simplejson 
 29  import time 
 30  import socket 
 31  import logging 
 32   
 33  from ganeti.cli import * 
 34  from ganeti import cli 
 35  from ganeti import constants 
 36  from ganeti import opcodes 
 37  from ganeti import utils 
 38  from ganeti import errors 
 39  from ganeti import compat 
 40   
 41   
 42  #: Default fields for L{ListLocks} 
 43  _LIST_LOCKS_DEF_FIELDS = [ 
 44    "name", 
 45    "mode", 
 46    "owner", 
 47    "pending", 
 48    ] 
 49   
 50   
51 -def Delay(opts, args):
52 """Sleeps for a while 53 54 @param opts: the command line options selected by the user 55 @type args: list 56 @param args: should contain only one element, the duration 57 the sleep 58 @rtype: int 59 @return: the desired exit code 60 61 """ 62 delay = float(args[0]) 63 op = opcodes.OpTestDelay(duration=delay, 64 on_master=opts.on_master, 65 on_nodes=opts.on_nodes, 66 repeat=opts.repeat) 67 SubmitOpCode(op, opts=opts) 68 69 return 0
70 71
72 -def GenericOpCodes(opts, args):
73 """Send any opcode to the master. 74 75 @param opts: the command line options selected by the user 76 @type args: list 77 @param args: should contain only one element, the path of 78 the file with the opcode definition 79 @rtype: int 80 @return: the desired exit code 81 82 """ 83 cl = cli.GetClient() 84 jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts) 85 86 job_cnt = 0 87 op_cnt = 0 88 if opts.timing_stats: 89 ToStdout("Loading...") 90 for job_idx in range(opts.rep_job): 91 for fname in args: 92 # pylint: disable-msg=W0142 93 op_data = simplejson.loads(utils.ReadFile(fname)) 94 op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data] 95 op_list = op_list * opts.rep_op 96 jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list) 97 op_cnt += len(op_list) 98 job_cnt += 1 99 100 if opts.timing_stats: 101 t1 = time.time() 102 ToStdout("Submitting...") 103 104 jex.SubmitPending(each=opts.each) 105 106 if opts.timing_stats: 107 t2 = time.time() 108 ToStdout("Executing...") 109 110 jex.GetResults() 111 if opts.timing_stats: 112 t3 = time.time() 113 ToStdout("C:op %4d" % op_cnt) 114 ToStdout("C:job %4d" % job_cnt) 115 ToStdout("T:submit %4.4f" % (t2-t1)) 116 ToStdout("T:exec %4.4f" % (t3-t2)) 117 ToStdout("T:total %4.4f" % (t3-t1)) 118 return 0
119 120
121 -def TestAllocator(opts, args):
122 """Runs the test allocator opcode. 123 124 @param opts: the command line options selected by the user 125 @type args: list 126 @param args: should contain only one element, the iallocator name 127 @rtype: int 128 @return: the desired exit code 129 130 """ 131 try: 132 disks = [{"size": utils.ParseUnit(val), "mode": 'w'} 133 for val in opts.disks.split(",")] 134 except errors.UnitParseError, err: 135 ToStderr("Invalid disks parameter '%s': %s", opts.disks, err) 136 return 1 137 138 nics = [val.split("/") for val in opts.nics.split(",")] 139 for row in nics: 140 while len(row) < 3: 141 row.append(None) 142 for i in range(3): 143 if row[i] == '': 144 row[i] = None 145 nic_dict = [{"mac": v[0], "ip": v[1], "bridge": v[2]} for v in nics] 146 147 if opts.tags is None: 148 opts.tags = [] 149 else: 150 opts.tags = opts.tags.split(",") 151 152 op = opcodes.OpTestAllocator(mode=opts.mode, 153 name=args[0], 154 evac_nodes=args, 155 mem_size=opts.mem, 156 disks=disks, 157 disk_template=opts.disk_template, 158 nics=nic_dict, 159 os=opts.os, 160 vcpus=opts.vcpus, 161 tags=opts.tags, 162 direction=opts.direction, 163 allocator=opts.iallocator, 164 ) 165 result = SubmitOpCode(op, opts=opts) 166 ToStdout("%s" % result) 167 return 0
168 169
170 -def _TestJobSubmission(opts):
171 """Tests submitting jobs. 172 173 """ 174 ToStdout("Testing job submission") 175 176 testdata = [ 177 (0, 0, constants.OP_PRIO_LOWEST), 178 (0, 0, constants.OP_PRIO_HIGHEST), 179 ] 180 181 for priority in (constants.OP_PRIO_SUBMIT_VALID | 182 frozenset([constants.OP_PRIO_LOWEST, 183 constants.OP_PRIO_HIGHEST])): 184 for offset in [-1, +1]: 185 testdata.extend([ 186 (0, 0, priority + offset), 187 (3, 0, priority + offset), 188 (0, 3, priority + offset), 189 (4, 2, priority + offset), 190 ]) 191 192 cl = cli.GetClient() 193 194 for before, after, failpriority in testdata: 195 ops = [] 196 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)]) 197 ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority)) 198 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)]) 199 200 try: 201 cl.SubmitJob(ops) 202 except errors.GenericError, err: 203 if opts.debug: 204 ToStdout("Ignoring error: %s", err) 205 else: 206 raise errors.OpExecError("Submitting opcode with priority %s did not" 207 " fail when it should (allowed are %s)" % 208 (failpriority, constants.OP_PRIO_SUBMIT_VALID)) 209 210 jobs = [ 211 [opcodes.OpTestDelay(duration=0), 212 opcodes.OpTestDelay(duration=0, dry_run=False), 213 opcodes.OpTestDelay(duration=0, dry_run=True)], 214 ops, 215 ] 216 result = cl.SubmitManyJobs(jobs) 217 if not (len(result) == 2 and 218 compat.all(len(i) == 2 for i in result) and 219 compat.all(isinstance(i[1], basestring) for i in result) and 220 result[0][0] and not result[1][0]): 221 raise errors.OpExecError("Submitting multiple jobs did not work as" 222 " expected, result %s" % result) 223 assert len(result) == 2 224 225 ToStdout("Job submission tests were successful")
226 227
228 -class _JobQueueTestReporter(cli.StdioJobPollReportCb):
229 - def __init__(self):
230 """Initializes this class. 231 232 """ 233 cli.StdioJobPollReportCb.__init__(self) 234 self._expected_msgcount = 0 235 self._all_testmsgs = [] 236 self._testmsgs = None 237 self._job_id = None
238
239 - def GetTestMessages(self):
240 """Returns all test log messages received so far. 241 242 """ 243 return self._all_testmsgs
244
245 - def GetJobId(self):
246 """Returns the job ID. 247 248 """ 249 return self._job_id
250
251 - def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
252 """Handles a log message. 253 254 """ 255 if self._job_id is None: 256 self._job_id = job_id 257 elif self._job_id != job_id: 258 raise errors.ProgrammerError("The same reporter instance was used for" 259 " more than one job") 260 261 if log_type == constants.ELOG_JQUEUE_TEST: 262 (sockname, test, arg) = log_msg 263 return self._ProcessTestMessage(job_id, sockname, test, arg) 264 265 elif (log_type == constants.ELOG_MESSAGE and 266 log_msg.startswith(constants.JQT_MSGPREFIX)): 267 if self._testmsgs is None: 268 raise errors.OpExecError("Received test message without a preceding" 269 " start message") 270 testmsg = log_msg[len(constants.JQT_MSGPREFIX):] 271 self._testmsgs.append(testmsg) 272 self._all_testmsgs.append(testmsg) 273 return 274 275 return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial, 276 timestamp, log_type, 277 log_msg)
278
279 - def _ProcessTestMessage(self, job_id, sockname, test, arg):
280 """Handles a job queue test message. 281 282 """ 283 if test not in constants.JQT_ALL: 284 raise errors.OpExecError("Received invalid test message %s" % test) 285 286 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 287 try: 288 sock.settimeout(30.0) 289 290 logging.debug("Connecting to %s", sockname) 291 sock.connect(sockname) 292 293 logging.debug("Checking status") 294 jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0] 295 if not jobdetails: 296 raise errors.OpExecError("Can't find job %s" % job_id) 297 298 status = jobdetails[0] 299 300 logging.debug("Status of job %s is %s", job_id, status) 301 302 if test == constants.JQT_EXPANDNAMES: 303 if status != constants.JOB_STATUS_WAITLOCK: 304 raise errors.OpExecError("Job status while expanding names is '%s'," 305 " not '%s' as expected" % 306 (status, constants.JOB_STATUS_WAITLOCK)) 307 elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG): 308 if status != constants.JOB_STATUS_RUNNING: 309 raise errors.OpExecError("Job status while executing opcode is '%s'," 310 " not '%s' as expected" % 311 (status, constants.JOB_STATUS_RUNNING)) 312 313 if test == constants.JQT_STARTMSG: 314 logging.debug("Expecting %s test messages", arg) 315 self._testmsgs = [] 316 elif test == constants.JQT_LOGMSG: 317 if len(self._testmsgs) != arg: 318 raise errors.OpExecError("Received %s test messages when %s are" 319 " expected" % (len(self._testmsgs), arg)) 320 finally: 321 logging.debug("Closing socket") 322 sock.close()
323 324
325 -def TestJobqueue(opts, _):
326 """Runs a few tests on the job queue. 327 328 """ 329 _TestJobSubmission(opts) 330 331 (TM_SUCCESS, 332 TM_MULTISUCCESS, 333 TM_FAIL, 334 TM_PARTFAIL) = range(4) 335 TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL]) 336 337 for mode in TM_ALL: 338 test_messages = [ 339 "Testing mode %s" % mode, 340 "Hello World", 341 "A", 342 "", 343 "B" 344 "Foo|bar|baz", 345 utils.TimestampForFilename(), 346 ] 347 348 fail = mode in (TM_FAIL, TM_PARTFAIL) 349 350 if mode == TM_PARTFAIL: 351 ToStdout("Testing partial job failure") 352 ops = [ 353 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True, 354 log_messages=test_messages, fail=False), 355 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True, 356 log_messages=test_messages, fail=False), 357 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True, 358 log_messages=test_messages, fail=True), 359 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True, 360 log_messages=test_messages, fail=False), 361 ] 362 expect_messages = 3 * [test_messages] 363 expect_opstatus = [ 364 constants.OP_STATUS_SUCCESS, 365 constants.OP_STATUS_SUCCESS, 366 constants.OP_STATUS_ERROR, 367 constants.OP_STATUS_ERROR, 368 ] 369 expect_resultlen = 2 370 elif mode == TM_MULTISUCCESS: 371 ToStdout("Testing multiple successful opcodes") 372 ops = [ 373 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True, 374 log_messages=test_messages, fail=False), 375 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True, 376 log_messages=test_messages, fail=False), 377 ] 378 expect_messages = 2 * [test_messages] 379 expect_opstatus = [ 380 constants.OP_STATUS_SUCCESS, 381 constants.OP_STATUS_SUCCESS, 382 ] 383 expect_resultlen = 2 384 else: 385 if mode == TM_SUCCESS: 386 ToStdout("Testing job success") 387 expect_opstatus = [constants.OP_STATUS_SUCCESS] 388 elif mode == TM_FAIL: 389 ToStdout("Testing job failure") 390 expect_opstatus = [constants.OP_STATUS_ERROR] 391 else: 392 raise errors.ProgrammerError("Unknown test mode %s" % mode) 393 394 ops = [ 395 opcodes.OpTestJqueue(notify_waitlock=True, 396 notify_exec=True, 397 log_messages=test_messages, 398 fail=fail) 399 ] 400 expect_messages = [test_messages] 401 expect_resultlen = 1 402 403 cl = cli.GetClient() 404 cli.SetGenericOpcodeOpts(ops, opts) 405 406 # Send job to master daemon 407 job_id = cli.SendJob(ops, cl=cl) 408 409 reporter = _JobQueueTestReporter() 410 results = None 411 412 try: 413 results = cli.PollJob(job_id, cl=cl, reporter=reporter) 414 except errors.OpExecError, err: 415 if not fail: 416 raise 417 ToStdout("Ignoring error: %s", err) 418 else: 419 if fail: 420 raise errors.OpExecError("Job didn't fail when it should") 421 422 # Check length of result 423 if fail: 424 if results is not None: 425 raise errors.OpExecError("Received result from failed job") 426 elif len(results) != expect_resultlen: 427 raise errors.OpExecError("Received %s results (%s), expected %s" % 428 (len(results), results, expect_resultlen)) 429 430 # Check received log messages 431 all_messages = [i for j in expect_messages for i in j] 432 if reporter.GetTestMessages() != all_messages: 433 raise errors.OpExecError("Received test messages don't match input" 434 " (input %r, received %r)" % 435 (all_messages, reporter.GetTestMessages())) 436 437 # Check final status 438 reported_job_id = reporter.GetJobId() 439 if reported_job_id != job_id: 440 raise errors.OpExecError("Reported job ID %s doesn't match" 441 "submission job ID %s" % 442 (reported_job_id, job_id)) 443 444 jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0] 445 if not jobdetails: 446 raise errors.OpExecError("Can't find job %s" % job_id) 447 448 if fail: 449 exp_status = constants.JOB_STATUS_ERROR 450 else: 451 exp_status = constants.JOB_STATUS_SUCCESS 452 453 (final_status, final_opstatus) = jobdetails 454 if final_status != exp_status: 455 raise errors.OpExecError("Final job status is %s, not %s as expected" % 456 (final_status, exp_status)) 457 if len(final_opstatus) != len(ops): 458 raise errors.OpExecError("Did not receive status for all opcodes (got %s," 459 " expected %s)" % 460 (len(final_opstatus), len(ops))) 461 if final_opstatus != expect_opstatus: 462 raise errors.OpExecError("Opcode status is %s, expected %s" % 463 (final_opstatus, expect_opstatus)) 464 465 ToStdout("Job queue test successful") 466 467 return 0
468 469
470 -def ListLocks(opts, args): # pylint: disable-msg=W0613
471 """List all locks. 472 473 @param opts: the command line options selected by the user 474 @type args: list 475 @param args: should be an empty list 476 @rtype: int 477 @return: the desired exit code 478 479 """ 480 selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS) 481 482 def _DashIfNone(fn): 483 def wrapper(value): 484 if not value: 485 return "-" 486 return fn(value) 487 return wrapper 488 489 def _FormatPending(value): 490 """Format pending acquires. 491 492 """ 493 return utils.CommaJoin("%s:%s" % (mode, ",".join(threads)) 494 for mode, threads in value) 495 496 # Format raw values 497 fmtoverride = { 498 "mode": (_DashIfNone(str), False), 499 "owner": (_DashIfNone(",".join), False), 500 "pending": (_DashIfNone(_FormatPending), False), 501 } 502 503 while True: 504 ret = GenericList(constants.QR_LOCK, selected_fields, None, None, 505 opts.separator, not opts.no_headers, 506 format_override=fmtoverride, verbose=opts.verbose) 507 508 if ret != constants.EXIT_SUCCESS: 509 return ret 510 511 if not opts.interval: 512 break 513 514 ToStdout("") 515 time.sleep(opts.interval) 516 517 return 0 518 519 520 commands = { 521 'delay': ( 522 Delay, [ArgUnknown(min=1, max=1)], 523 [cli_option("--no-master", dest="on_master", default=True, 524 action="store_false", help="Do not sleep in the master code"), 525 cli_option("-n", dest="on_nodes", default=[], 526 action="append", help="Select nodes to sleep on"), 527 cli_option("-r", "--repeat", type="int", default="0", dest="repeat", 528 help="Number of times to repeat the sleep"), 529 DRY_RUN_OPT, PRIORITY_OPT, 530 ], 531 "[opts...] <duration>", "Executes a TestDelay OpCode"), 532 'submit-job': ( 533 GenericOpCodes, [ArgFile(min=1)], 534 [VERBOSE_OPT, 535 cli_option("--op-repeat", type="int", default="1", dest="rep_op", 536 help="Repeat the opcode sequence this number of times"), 537 cli_option("--job-repeat", type="int", default="1", dest="rep_job", 538 help="Repeat the job this number of times"), 539 cli_option("--timing-stats", default=False, 540 action="store_true", help="Show timing stats"), 541 cli_option("--each", default=False, action="store_true", 542 help="Submit each job separately"), 543 DRY_RUN_OPT, PRIORITY_OPT, 544 ], 545 "<op_list_file...>", "Submits jobs built from json files" 546 " containing a list of serialized opcodes"), 547 'allocator': ( 548 TestAllocator, [ArgUnknown(min=1)], 549 [cli_option("--dir", dest="direction", 550 default="in", choices=["in", "out"], 551 help="Show allocator input (in) or allocator" 552 " results (out)"), 553 IALLOCATOR_OPT, 554 cli_option("-m", "--mode", default="relocate", 555 choices=["relocate", "allocate", "multi-evacuate"], 556 help="Request mode, either allocate or relocate"), 557 cli_option("--mem", default=128, type="unit", 558 help="Memory size for the instance (MiB)"), 559 cli_option("--disks", default="4096,4096", 560 help="Comma separated list of disk sizes (MiB)"), 561 DISK_TEMPLATE_OPT, 562 cli_option("--nics", default="00:11:22:33:44:55", 563 help="Comma separated list of nics, each nic" 564 " definition is of form mac/ip/bridge, if" 565 " missing values are replace by None"), 566 OS_OPT, 567 cli_option("-p", "--vcpus", default=1, type="int", 568 help="Select number of VCPUs for the instance"), 569 cli_option("--tags", default=None, 570 help="Comma separated list of tags"), 571 DRY_RUN_OPT, PRIORITY_OPT, 572 ], 573 "{opts...} <instance>", "Executes a TestAllocator OpCode"), 574 "test-jobqueue": ( 575 TestJobqueue, ARGS_NONE, [PRIORITY_OPT], 576 "", "Test a few aspects of the job queue"), 577 "locks": ( 578 ListLocks, ARGS_NONE, 579 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT, VERBOSE_OPT], 580 "[--interval N]", "Show a list of locks in the master daemon"), 581 } 582 583
584 -def Main():
585 return GenericMain(commands)
586