Package ganeti :: Package client :: Module gnt_debug
[hide private]
[frames] | no frames]

Source Code for Module ganeti.client.gnt_debug

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2010 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21  """Debugging commands""" 
 22   
 23  # pylint: disable-msg=W0401,W0614,C0103 
 24  # W0401: Wildcard import ganeti.cli 
 25  # W0614: Unused import %s from wildcard import (since we need cli) 
 26  # C0103: Invalid name gnt-backup 
 27   
 28  import simplejson 
 29  import time 
 30  import socket 
 31  import logging 
 32   
 33  from ganeti.cli import * 
 34  from ganeti import cli 
 35  from ganeti import constants 
 36  from ganeti import opcodes 
 37  from ganeti import utils 
 38  from ganeti import errors 
 39  from ganeti import compat 
 40   
 41   
 42  #: Default fields for L{ListLocks} 
 43  _LIST_LOCKS_DEF_FIELDS = [ 
 44    "name", 
 45    "mode", 
 46    "owner", 
 47    "pending", 
 48    ] 
 49   
 50   
51 -def Delay(opts, args):
52 """Sleeps for a while 53 54 @param opts: the command line options selected by the user 55 @type args: list 56 @param args: should contain only one element, the duration 57 the sleep 58 @rtype: int 59 @return: the desired exit code 60 61 """ 62 delay = float(args[0]) 63 op = opcodes.OpTestDelay(duration=delay, 64 on_master=opts.on_master, 65 on_nodes=opts.on_nodes, 66 repeat=opts.repeat) 67 SubmitOpCode(op, opts=opts) 68 69 return 0
70 71
72 -def GenericOpCodes(opts, args):
73 """Send any opcode to the master. 74 75 @param opts: the command line options selected by the user 76 @type args: list 77 @param args: should contain only one element, the path of 78 the file with the opcode definition 79 @rtype: int 80 @return: the desired exit code 81 82 """ 83 cl = cli.GetClient() 84 jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts) 85 86 job_cnt = 0 87 op_cnt = 0 88 if opts.timing_stats: 89 ToStdout("Loading...") 90 for job_idx in range(opts.rep_job): 91 for fname in args: 92 # pylint: disable-msg=W0142 93 op_data = simplejson.loads(utils.ReadFile(fname)) 94 op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data] 95 op_list = op_list * opts.rep_op 96 jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list) 97 op_cnt += len(op_list) 98 job_cnt += 1 99 100 if opts.timing_stats: 101 t1 = time.time() 102 ToStdout("Submitting...") 103 104 jex.SubmitPending(each=opts.each) 105 106 if opts.timing_stats: 107 t2 = time.time() 108 ToStdout("Executing...") 109 110 jex.GetResults() 111 if opts.timing_stats: 112 t3 = time.time() 113 ToStdout("C:op %4d" % op_cnt) 114 ToStdout("C:job %4d" % job_cnt) 115 ToStdout("T:submit %4.4f" % (t2-t1)) 116 ToStdout("T:exec %4.4f" % (t3-t2)) 117 ToStdout("T:total %4.4f" % (t3-t1)) 118 return 0
119 120
121 -def TestAllocator(opts, args):
122 """Runs the test allocator opcode. 123 124 @param opts: the command line options selected by the user 125 @type args: list 126 @param args: should contain only one element, the iallocator name 127 @rtype: int 128 @return: the desired exit code 129 130 """ 131 try: 132 disks = [{"size": utils.ParseUnit(val), "mode": 'w'} 133 for val in opts.disks.split(",")] 134 except errors.UnitParseError, err: 135 ToStderr("Invalid disks parameter '%s': %s", opts.disks, err) 136 return 1 137 138 nics = [val.split("/") for val in opts.nics.split(",")] 139 for row in nics: 140 while len(row) < 3: 141 row.append(None) 142 for i in range(3): 143 if row[i] == '': 144 row[i] = None 145 nic_dict = [{"mac": v[0], "ip": v[1], "bridge": v[2]} for v in nics] 146 147 if opts.tags is None: 148 opts.tags = [] 149 else: 150 opts.tags = opts.tags.split(",") 151 152 op = opcodes.OpTestAllocator(mode=opts.mode, 153 name=args[0], 154 evac_nodes=args, 155 mem_size=opts.mem, 156 disks=disks, 157 disk_template=opts.disk_template, 158 nics=nic_dict, 159 os=opts.os, 160 vcpus=opts.vcpus, 161 tags=opts.tags, 162 direction=opts.direction, 163 allocator=opts.iallocator, 164 ) 165 result = SubmitOpCode(op, opts=opts) 166 ToStdout("%s" % result) 167 return 0
168 169
170 -def _TestJobSubmission(opts):
171 """Tests submitting jobs. 172 173 """ 174 ToStdout("Testing job submission") 175 176 testdata = [ 177 (0, 0, constants.OP_PRIO_LOWEST), 178 (0, 0, constants.OP_PRIO_HIGHEST), 179 ] 180 181 for priority in (constants.OP_PRIO_SUBMIT_VALID | 182 frozenset([constants.OP_PRIO_LOWEST, 183 constants.OP_PRIO_HIGHEST])): 184 for offset in [-1, +1]: 185 testdata.extend([ 186 (0, 0, priority + offset), 187 (3, 0, priority + offset), 188 (0, 3, priority + offset), 189 (4, 2, priority + offset), 190 ]) 191 192 cl = cli.GetClient() 193 194 for before, after, failpriority in testdata: 195 ops = [] 196 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)]) 197 ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority)) 198 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)]) 199 200 try: 201 cl.SubmitJob(ops) 202 except errors.GenericError, err: 203 if opts.debug: 204 ToStdout("Ignoring error: %s", err) 205 else: 206 raise errors.OpExecError("Submitting opcode with priority %s did not" 207 " fail when it should (allowed are %s)" % 208 (failpriority, constants.OP_PRIO_SUBMIT_VALID)) 209 210 jobs = [ 211 [opcodes.OpTestDelay(duration=0), 212 opcodes.OpTestDelay(duration=0, dry_run=False), 213 opcodes.OpTestDelay(duration=0, dry_run=True)], 214 ops, 215 ] 216 result = cl.SubmitManyJobs(jobs) 217 if not (len(result) == 2 and 218 compat.all(len(i) == 2 for i in result) and 219 compat.all(isinstance(i[1], basestring) for i in result) and 220 result[0][0] and not result[1][0]): 221 raise errors.OpExecError("Submitting multiple jobs did not work as" 222 " expected, result %s" % result) 223 assert len(result) == 2 224 225 ToStdout("Job submission tests were successful")
226 227
228 -class _JobQueueTestReporter(cli.StdioJobPollReportCb):
229 - def __init__(self):
230 """Initializes this class. 231 232 """ 233 cli.StdioJobPollReportCb.__init__(self) 234 self._expected_msgcount = 0 235 self._all_testmsgs = [] 236 self._testmsgs = None 237 self._job_id = None
238
239 - def GetTestMessages(self):
240 """Returns all test log messages received so far. 241 242 """ 243 return self._all_testmsgs
244
245 - def GetJobId(self):
246 """Returns the job ID. 247 248 """ 249 return self._job_id
250
251 - def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
252 """Handles a log message. 253 254 """ 255 if self._job_id is None: 256 self._job_id = job_id 257 elif self._job_id != job_id: 258 raise errors.ProgrammerError("The same reporter instance was used for" 259 " more than one job") 260 261 if log_type == constants.ELOG_JQUEUE_TEST: 262 (sockname, test, arg) = log_msg 263 return self._ProcessTestMessage(job_id, sockname, test, arg) 264 265 elif (log_type == constants.ELOG_MESSAGE and 266 log_msg.startswith(constants.JQT_MSGPREFIX)): 267 if self._testmsgs is None: 268 raise errors.OpExecError("Received test message without a preceding" 269 " start message") 270 testmsg = log_msg[len(constants.JQT_MSGPREFIX):] 271 self._testmsgs.append(testmsg) 272 self._all_testmsgs.append(testmsg) 273 return 274 275 return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial, 276 timestamp, log_type, 277 log_msg)
278
279 - def _ProcessTestMessage(self, job_id, sockname, test, arg):
280 """Handles a job queue test message. 281 282 """ 283 if test not in constants.JQT_ALL: 284 raise errors.OpExecError("Received invalid test message %s" % test) 285 286 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 287 try: 288 sock.settimeout(30.0) 289 290 logging.debug("Connecting to %s", sockname) 291 sock.connect(sockname) 292 293 logging.debug("Checking status") 294 jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0] 295 if not jobdetails: 296 raise errors.OpExecError("Can't find job %s" % job_id) 297 298 status = jobdetails[0] 299 300 logging.debug("Status of job %s is %s", job_id, status) 301 302 if test == constants.JQT_EXPANDNAMES: 303 if status != constants.JOB_STATUS_WAITLOCK: 304 raise errors.OpExecError("Job status while expanding names is '%s'," 305 " not '%s' as expected" % 306 (status, constants.JOB_STATUS_WAITLOCK)) 307 elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG): 308 if status != constants.JOB_STATUS_RUNNING: 309 raise errors.OpExecError("Job status while executing opcode is '%s'," 310 " not '%s' as expected" % 311 (status, constants.JOB_STATUS_RUNNING)) 312 313 if test == constants.JQT_STARTMSG: 314 logging.debug("Expecting %s test messages", arg) 315 self._testmsgs = [] 316 elif test == constants.JQT_LOGMSG: 317 if len(self._testmsgs) != arg: 318 raise errors.OpExecError("Received %s test messages when %s are" 319 " expected" % (len(self._testmsgs), arg)) 320 finally: 321 logging.debug("Closing socket") 322 sock.close()
323 324
325 -def TestJobqueue(opts, _):
326 """Runs a few tests on the job queue. 327 328 """ 329 _TestJobSubmission(opts) 330 331 (TM_SUCCESS, 332 TM_MULTISUCCESS, 333 TM_FAIL, 334 TM_PARTFAIL) = range(4) 335 TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL]) 336 337 for mode in TM_ALL: 338 test_messages = [ 339 "Testing mode %s" % mode, 340 "Hello World", 341 "A", 342 "", 343 "B" 344 "Foo|bar|baz", 345 utils.TimestampForFilename(), 346 ] 347 348 fail = mode in (TM_FAIL, TM_PARTFAIL) 349 350 if mode == TM_PARTFAIL: 351 ToStdout("Testing partial job failure") 352 ops = [ 353 opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True, 354 log_messages=test_messages, fail=False), 355 opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True, 356 log_messages=test_messages, fail=False), 357 opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True, 358 log_messages=test_messages, fail=True), 359 opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True, 360 log_messages=test_messages, fail=False), 361 ] 362 expect_messages = 3 * [test_messages] 363 expect_opstatus = [ 364 constants.OP_STATUS_SUCCESS, 365 constants.OP_STATUS_SUCCESS, 366 constants.OP_STATUS_ERROR, 367 constants.OP_STATUS_ERROR, 368 ] 369 expect_resultlen = 2 370 elif mode == TM_MULTISUCCESS: 371 ToStdout("Testing multiple successful opcodes") 372 ops = [ 373 opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True, 374 log_messages=test_messages, fail=False), 375 opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True, 376 log_messages=test_messages, fail=False), 377 ] 378 expect_messages = 2 * [test_messages] 379 expect_opstatus = [ 380 constants.OP_STATUS_SUCCESS, 381 constants.OP_STATUS_SUCCESS, 382 ] 383 expect_resultlen = 2 384 else: 385 if mode == TM_SUCCESS: 386 ToStdout("Testing job success") 387 expect_opstatus = [constants.OP_STATUS_SUCCESS] 388 elif mode == TM_FAIL: 389 ToStdout("Testing job failure") 390 expect_opstatus = [constants.OP_STATUS_ERROR] 391 else: 392 raise errors.ProgrammerError("Unknown test mode %s" % mode) 393 394 ops = [ 395 opcodes.OpTestJobqueue(notify_waitlock=True, 396 notify_exec=True, 397 log_messages=test_messages, 398 fail=fail) 399 ] 400 expect_messages = [test_messages] 401 expect_resultlen = 1 402 403 cl = cli.GetClient() 404 cli.SetGenericOpcodeOpts(ops, opts) 405 406 # Send job to master daemon 407 job_id = cli.SendJob(ops, cl=cl) 408 409 reporter = _JobQueueTestReporter() 410 results = None 411 412 try: 413 results = cli.PollJob(job_id, cl=cl, reporter=reporter) 414 except errors.OpExecError, err: 415 if not fail: 416 raise 417 ToStdout("Ignoring error: %s", err) 418 else: 419 if fail: 420 raise errors.OpExecError("Job didn't fail when it should") 421 422 # Check length of result 423 if fail: 424 if results is not None: 425 raise errors.OpExecError("Received result from failed job") 426 elif len(results) != expect_resultlen: 427 raise errors.OpExecError("Received %s results (%s), expected %s" % 428 (len(results), results, expect_resultlen)) 429 430 # Check received log messages 431 all_messages = [i for j in expect_messages for i in j] 432 if reporter.GetTestMessages() != all_messages: 433 raise errors.OpExecError("Received test messages don't match input" 434 " (input %r, received %r)" % 435 (all_messages, reporter.GetTestMessages())) 436 437 # Check final status 438 reported_job_id = reporter.GetJobId() 439 if reported_job_id != job_id: 440 raise errors.OpExecError("Reported job ID %s doesn't match" 441 "submission job ID %s" % 442 (reported_job_id, job_id)) 443 444 jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0] 445 if not jobdetails: 446 raise errors.OpExecError("Can't find job %s" % job_id) 447 448 if fail: 449 exp_status = constants.JOB_STATUS_ERROR 450 else: 451 exp_status = constants.JOB_STATUS_SUCCESS 452 453 (final_status, final_opstatus) = jobdetails 454 if final_status != exp_status: 455 raise errors.OpExecError("Final job status is %s, not %s as expected" % 456 (final_status, exp_status)) 457 if len(final_opstatus) != len(ops): 458 raise errors.OpExecError("Did not receive status for all opcodes (got %s," 459 " expected %s)" % 460 (len(final_opstatus), len(ops))) 461 if final_opstatus != expect_opstatus: 462 raise errors.OpExecError("Opcode status is %s, expected %s" % 463 (final_opstatus, expect_opstatus)) 464 465 ToStdout("Job queue test successful") 466 467 return 0
468 469
470 -def ListLocks(opts, args): # pylint: disable-msg=W0613
471 """List all locks. 472 473 @param opts: the command line options selected by the user 474 @type args: list 475 @param args: should be an empty list 476 @rtype: int 477 @return: the desired exit code 478 479 """ 480 selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS) 481 482 if not opts.no_headers: 483 headers = { 484 "name": "Name", 485 "mode": "Mode", 486 "owner": "Owner", 487 "pending": "Pending", 488 } 489 else: 490 headers = None 491 492 while True: 493 # Not reusing client as interval might be too long 494 output = GetClient().QueryLocks(selected_fields, False) 495 496 # change raw values to nicer strings 497 for row in output: 498 for idx, field in enumerate(selected_fields): 499 val = row[idx] 500 501 if field in ("mode", "owner", "pending") and not val: 502 val = "-" 503 elif field == "owner": 504 val = ",".join(val) 505 elif field == "pending": 506 val = utils.CommaJoin("%s:%s" % (mode, ",".join(threads)) 507 for mode, threads in val) 508 509 row[idx] = str(val) 510 511 data = GenerateTable(separator=opts.separator, headers=headers, 512 fields=selected_fields, data=output) 513 for line in data: 514 ToStdout(line) 515 516 if not opts.interval: 517 break 518 519 ToStdout("") 520 time.sleep(opts.interval) 521 522 return 0 523 524 525 commands = { 526 'delay': ( 527 Delay, [ArgUnknown(min=1, max=1)], 528 [cli_option("--no-master", dest="on_master", default=True, 529 action="store_false", help="Do not sleep in the master code"), 530 cli_option("-n", dest="on_nodes", default=[], 531 action="append", help="Select nodes to sleep on"), 532 cli_option("-r", "--repeat", type="int", default="0", dest="repeat", 533 help="Number of times to repeat the sleep"), 534 DRY_RUN_OPT, PRIORITY_OPT, 535 ], 536 "[opts...] <duration>", "Executes a TestDelay OpCode"), 537 'submit-job': ( 538 GenericOpCodes, [ArgFile(min=1)], 539 [VERBOSE_OPT, 540 cli_option("--op-repeat", type="int", default="1", dest="rep_op", 541 help="Repeat the opcode sequence this number of times"), 542 cli_option("--job-repeat", type="int", default="1", dest="rep_job", 543 help="Repeat the job this number of times"), 544 cli_option("--timing-stats", default=False, 545 action="store_true", help="Show timing stats"), 546 cli_option("--each", default=False, action="store_true", 547 help="Submit each job separately"), 548 DRY_RUN_OPT, PRIORITY_OPT, 549 ], 550 "<op_list_file...>", "Submits jobs built from json files" 551 " containing a list of serialized opcodes"), 552 'allocator': ( 553 TestAllocator, [ArgUnknown(min=1)], 554 [cli_option("--dir", dest="direction", 555 default="in", choices=["in", "out"], 556 help="Show allocator input (in) or allocator" 557 " results (out)"), 558 IALLOCATOR_OPT, 559 cli_option("-m", "--mode", default="relocate", 560 choices=["relocate", "allocate", "multi-evacuate"], 561 help="Request mode, either allocate or relocate"), 562 cli_option("--mem", default=128, type="unit", 563 help="Memory size for the instance (MiB)"), 564 cli_option("--disks", default="4096,4096", 565 help="Comma separated list of disk sizes (MiB)"), 566 DISK_TEMPLATE_OPT, 567 cli_option("--nics", default="00:11:22:33:44:55", 568 help="Comma separated list of nics, each nic" 569 " definition is of form mac/ip/bridge, if" 570 " missing values are replace by None"), 571 OS_OPT, 572 cli_option("-p", "--vcpus", default=1, type="int", 573 help="Select number of VCPUs for the instance"), 574 cli_option("--tags", default=None, 575 help="Comma separated list of tags"), 576 DRY_RUN_OPT, PRIORITY_OPT, 577 ], 578 "{opts...} <instance>", "Executes a TestAllocator OpCode"), 579 "test-jobqueue": ( 580 TestJobqueue, ARGS_NONE, [PRIORITY_OPT], 581 "", "Test a few aspects of the job queue"), 582 "locks": ( 583 ListLocks, ARGS_NONE, [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT], 584 "[--interval N]", "Show a list of locks in the master daemon"), 585 } 586 587
588 -def Main():
589 return GenericMain(commands)
590