1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """Debugging commands"""
22
23
24
25
26
27
28 import simplejson
29 import time
30 import socket
31 import logging
32
33 from ganeti.cli import *
34 from ganeti import cli
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import compat
40 from ganeti import ht
41
42
43
44 _LIST_LOCKS_DEF_FIELDS = [
45 "name",
46 "mode",
47 "owner",
48 "pending",
49 ]
50
51
53 """Sleeps for a while
54
55 @param opts: the command line options selected by the user
56 @type args: list
57 @param args: should contain only one element, the duration
58 the sleep
59 @rtype: int
60 @return: the desired exit code
61
62 """
63 delay = float(args[0])
64 op = opcodes.OpTestDelay(duration=delay,
65 on_master=opts.on_master,
66 on_nodes=opts.on_nodes,
67 repeat=opts.repeat)
68 SubmitOpCode(op, opts=opts)
69
70 return 0
71
72
74 """Send any opcode to the master.
75
76 @param opts: the command line options selected by the user
77 @type args: list
78 @param args: should contain only one element, the path of
79 the file with the opcode definition
80 @rtype: int
81 @return: the desired exit code
82
83 """
84 cl = cli.GetClient()
85 jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
86
87 job_cnt = 0
88 op_cnt = 0
89 if opts.timing_stats:
90 ToStdout("Loading...")
91 for job_idx in range(opts.rep_job):
92 for fname in args:
93
94 op_data = simplejson.loads(utils.ReadFile(fname))
95 op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
96 op_list = op_list * opts.rep_op
97 jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
98 op_cnt += len(op_list)
99 job_cnt += 1
100
101 if opts.timing_stats:
102 t1 = time.time()
103 ToStdout("Submitting...")
104
105 jex.SubmitPending(each=opts.each)
106
107 if opts.timing_stats:
108 t2 = time.time()
109 ToStdout("Executing...")
110
111 jex.GetResults()
112 if opts.timing_stats:
113 t3 = time.time()
114 ToStdout("C:op %4d" % op_cnt)
115 ToStdout("C:job %4d" % job_cnt)
116 ToStdout("T:submit %4.4f" % (t2 - t1))
117 ToStdout("T:exec %4.4f" % (t3 - t2))
118 ToStdout("T:total %4.4f" % (t3 - t1))
119 return 0
120
121
123 """Runs the test allocator opcode.
124
125 @param opts: the command line options selected by the user
126 @type args: list
127 @param args: should contain only one element, the iallocator name
128 @rtype: int
129 @return: the desired exit code
130
131 """
132 try:
133 disks = [{
134 constants.IDISK_SIZE: utils.ParseUnit(val),
135 constants.IDISK_MODE: constants.DISK_RDWR,
136 } for val in opts.disks.split(",")]
137 except errors.UnitParseError, err:
138 ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
139 return 1
140
141 nics = [val.split("/") for val in opts.nics.split(",")]
142 for row in nics:
143 while len(row) < 3:
144 row.append(None)
145 for i in range(3):
146 if row[i] == "":
147 row[i] = None
148 nic_dict = [{
149 constants.INIC_MAC: v[0],
150 constants.INIC_IP: v[1],
151
152 "bridge": v[2],
153 } for v in nics]
154
155 if opts.tags is None:
156 opts.tags = []
157 else:
158 opts.tags = opts.tags.split(",")
159 if opts.target_groups is None:
160 target_groups = []
161 else:
162 target_groups = opts.target_groups
163
164 op = opcodes.OpTestAllocator(mode=opts.mode,
165 name=args[0],
166 instances=args,
167 memory=opts.memory,
168 disks=disks,
169 disk_template=opts.disk_template,
170 nics=nic_dict,
171 os=opts.os,
172 vcpus=opts.vcpus,
173 tags=opts.tags,
174 direction=opts.direction,
175 allocator=opts.iallocator,
176 evac_mode=opts.evac_mode,
177 target_groups=target_groups)
178 result = SubmitOpCode(op, opts=opts)
179 ToStdout("%s" % result)
180 return 0
181
182
184 """Tests job dependencies.
185
186 """
187 ToStdout("Testing job dependencies")
188
189 cl = cli.GetClient()
190
191 try:
192 SubmitOpCode(opcodes.OpTestDelay(duration=0, depends=[(-1, None)]), cl=cl)
193 except errors.GenericError, err:
194 if opts.debug:
195 ToStdout("Ignoring error: %s", err)
196 else:
197 raise errors.OpExecError("Submitting plain opcode with relative job ID"
198 " did not fail as expected")
199
200
201 jobs = [
202 [opcodes.OpTestDelay(duration=1)],
203 [opcodes.OpTestDelay(duration=1,
204 depends=[(-1, [])])],
205 [opcodes.OpTestDelay(duration=1,
206 depends=[(-2, [constants.JOB_STATUS_SUCCESS])])],
207 [opcodes.OpTestDelay(duration=1,
208 depends=[])],
209 [opcodes.OpTestDelay(duration=1,
210 depends=[(-2, [constants.JOB_STATUS_SUCCESS])])],
211 ]
212
213
214 check_fn = ht.TListOf(ht.TAnd(ht.TIsLength(2),
215 ht.TItems([ht.TBool,
216 ht.TOr(ht.TNonEmptyString,
217 ht.TJobId)])))
218
219 result = cl.SubmitManyJobs(jobs)
220 if not check_fn(result):
221 raise errors.OpExecError("Job submission doesn't match %s: %s" %
222 (check_fn, result))
223
224
225 jex = JobExecutor(cl=cl, opts=opts)
226
227 for (status, job_id) in result:
228 jex.AddJobId(None, status, job_id)
229
230 job_results = jex.GetResults()
231 if not compat.all(row[0] for row in job_results):
232 raise errors.OpExecError("At least one of the submitted jobs failed: %s" %
233 job_results)
234
235
236 data = cl.QueryJobs([job_id for (_, job_id) in result],
237 ["id", "opexec", "ops"])
238 data_job_id = [job_id for (job_id, _, _) in data]
239 data_opexec = [opexec for (_, opexec, _) in data]
240 data_op = [[opcodes.OpCode.LoadOpCode(op) for op in ops]
241 for (_, _, ops) in data]
242
243 assert compat.all(not op.depends or len(op.depends) == 1
244 for ops in data_op
245 for op in ops)
246
247
248 for (job_idx, res_jobdep) in [(1, data_job_id[0]),
249 (2, data_job_id[0]),
250 (4, data_job_id[2])]:
251 if data_op[job_idx][0].depends[0][0] != res_jobdep:
252 raise errors.OpExecError("Job %s's opcode doesn't depend on correct job"
253 " ID (%s)" % (job_idx, res_jobdep))
254
255
256 if not (data_opexec[0] <= data_opexec[1] and
257 data_opexec[0] <= data_opexec[2] and
258 data_opexec[2] <= data_opexec[4]):
259 raise errors.OpExecError("Jobs did not run in correct order: %s" % data)
260
261 assert len(jobs) == 5 and compat.all(len(ops) == 1 for ops in jobs)
262
263 ToStdout("Job dependency tests were successful")
264
265
267 """Tests submitting jobs.
268
269 """
270 ToStdout("Testing job submission")
271
272 testdata = [
273 (0, 0, constants.OP_PRIO_LOWEST),
274 (0, 0, constants.OP_PRIO_HIGHEST),
275 ]
276
277 for priority in (constants.OP_PRIO_SUBMIT_VALID |
278 frozenset([constants.OP_PRIO_LOWEST,
279 constants.OP_PRIO_HIGHEST])):
280 for offset in [-1, +1]:
281 testdata.extend([
282 (0, 0, priority + offset),
283 (3, 0, priority + offset),
284 (0, 3, priority + offset),
285 (4, 2, priority + offset),
286 ])
287
288 cl = cli.GetClient()
289
290 for before, after, failpriority in testdata:
291 ops = []
292 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
293 ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
294 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
295
296 try:
297 cl.SubmitJob(ops)
298 except errors.GenericError, err:
299 if opts.debug:
300 ToStdout("Ignoring error: %s", err)
301 else:
302 raise errors.OpExecError("Submitting opcode with priority %s did not"
303 " fail when it should (allowed are %s)" %
304 (failpriority, constants.OP_PRIO_SUBMIT_VALID))
305
306 jobs = [
307 [opcodes.OpTestDelay(duration=0),
308 opcodes.OpTestDelay(duration=0, dry_run=False),
309 opcodes.OpTestDelay(duration=0, dry_run=True)],
310 ops,
311 ]
312 result = cl.SubmitManyJobs(jobs)
313 if not (len(result) == 2 and
314 compat.all(len(i) == 2 for i in result) and
315 compat.all(isinstance(i[1], basestring) for i in result) and
316 result[0][0] and not result[1][0]):
317 raise errors.OpExecError("Submitting multiple jobs did not work as"
318 " expected, result %s" % result)
319 assert len(result) == 2
320
321 ToStdout("Job submission tests were successful")
322
323
326 """Initializes this class.
327
328 """
329 cli.StdioJobPollReportCb.__init__(self)
330 self._expected_msgcount = 0
331 self._all_testmsgs = []
332 self._testmsgs = None
333 self._job_id = None
334
336 """Returns all test log messages received so far.
337
338 """
339 return self._all_testmsgs
340
342 """Returns the job ID.
343
344 """
345 return self._job_id
346
348 """Handles a log message.
349
350 """
351 if self._job_id is None:
352 self._job_id = job_id
353 elif self._job_id != job_id:
354 raise errors.ProgrammerError("The same reporter instance was used for"
355 " more than one job")
356
357 if log_type == constants.ELOG_JQUEUE_TEST:
358 (sockname, test, arg) = log_msg
359 return self._ProcessTestMessage(job_id, sockname, test, arg)
360
361 elif (log_type == constants.ELOG_MESSAGE and
362 log_msg.startswith(constants.JQT_MSGPREFIX)):
363 if self._testmsgs is None:
364 raise errors.OpExecError("Received test message without a preceding"
365 " start message")
366 testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
367 self._testmsgs.append(testmsg)
368 self._all_testmsgs.append(testmsg)
369 return
370
371 return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
372 timestamp, log_type,
373 log_msg)
374
376 """Handles a job queue test message.
377
378 """
379 if test not in constants.JQT_ALL:
380 raise errors.OpExecError("Received invalid test message %s" % test)
381
382 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
383 try:
384 sock.settimeout(30.0)
385
386 logging.debug("Connecting to %s", sockname)
387 sock.connect(sockname)
388
389 logging.debug("Checking status")
390 jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
391 if not jobdetails:
392 raise errors.OpExecError("Can't find job %s" % job_id)
393
394 status = jobdetails[0]
395
396 logging.debug("Status of job %s is %s", job_id, status)
397
398 if test == constants.JQT_EXPANDNAMES:
399 if status != constants.JOB_STATUS_WAITING:
400 raise errors.OpExecError("Job status while expanding names is '%s',"
401 " not '%s' as expected" %
402 (status, constants.JOB_STATUS_WAITING))
403 elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
404 if status != constants.JOB_STATUS_RUNNING:
405 raise errors.OpExecError("Job status while executing opcode is '%s',"
406 " not '%s' as expected" %
407 (status, constants.JOB_STATUS_RUNNING))
408
409 if test == constants.JQT_STARTMSG:
410 logging.debug("Expecting %s test messages", arg)
411 self._testmsgs = []
412 elif test == constants.JQT_LOGMSG:
413 if len(self._testmsgs) != arg:
414 raise errors.OpExecError("Received %s test messages when %s are"
415 " expected" % (len(self._testmsgs), arg))
416 finally:
417 logging.debug("Closing socket")
418 sock.close()
419
420
422 """Runs a few tests on the job queue.
423
424 """
425 _TestJobSubmission(opts)
426 _TestJobDependency(opts)
427
428 (TM_SUCCESS,
429 TM_MULTISUCCESS,
430 TM_FAIL,
431 TM_PARTFAIL) = range(4)
432 TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
433
434 for mode in TM_ALL:
435 test_messages = [
436 "Testing mode %s" % mode,
437 "Hello World",
438 "A",
439 "",
440 "B"
441 "Foo|bar|baz",
442 utils.TimestampForFilename(),
443 ]
444
445 fail = mode in (TM_FAIL, TM_PARTFAIL)
446
447 if mode == TM_PARTFAIL:
448 ToStdout("Testing partial job failure")
449 ops = [
450 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
451 log_messages=test_messages, fail=False),
452 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
453 log_messages=test_messages, fail=False),
454 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
455 log_messages=test_messages, fail=True),
456 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
457 log_messages=test_messages, fail=False),
458 ]
459 expect_messages = 3 * [test_messages]
460 expect_opstatus = [
461 constants.OP_STATUS_SUCCESS,
462 constants.OP_STATUS_SUCCESS,
463 constants.OP_STATUS_ERROR,
464 constants.OP_STATUS_ERROR,
465 ]
466 expect_resultlen = 2
467 elif mode == TM_MULTISUCCESS:
468 ToStdout("Testing multiple successful opcodes")
469 ops = [
470 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
471 log_messages=test_messages, fail=False),
472 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
473 log_messages=test_messages, fail=False),
474 ]
475 expect_messages = 2 * [test_messages]
476 expect_opstatus = [
477 constants.OP_STATUS_SUCCESS,
478 constants.OP_STATUS_SUCCESS,
479 ]
480 expect_resultlen = 2
481 else:
482 if mode == TM_SUCCESS:
483 ToStdout("Testing job success")
484 expect_opstatus = [constants.OP_STATUS_SUCCESS]
485 elif mode == TM_FAIL:
486 ToStdout("Testing job failure")
487 expect_opstatus = [constants.OP_STATUS_ERROR]
488 else:
489 raise errors.ProgrammerError("Unknown test mode %s" % mode)
490
491 ops = [
492 opcodes.OpTestJqueue(notify_waitlock=True,
493 notify_exec=True,
494 log_messages=test_messages,
495 fail=fail)
496 ]
497 expect_messages = [test_messages]
498 expect_resultlen = 1
499
500 cl = cli.GetClient()
501 cli.SetGenericOpcodeOpts(ops, opts)
502
503
504 job_id = cli.SendJob(ops, cl=cl)
505
506 reporter = _JobQueueTestReporter()
507 results = None
508
509 try:
510 results = cli.PollJob(job_id, cl=cl, reporter=reporter)
511 except errors.OpExecError, err:
512 if not fail:
513 raise
514 ToStdout("Ignoring error: %s", err)
515 else:
516 if fail:
517 raise errors.OpExecError("Job didn't fail when it should")
518
519
520 if fail:
521 if results is not None:
522 raise errors.OpExecError("Received result from failed job")
523 elif len(results) != expect_resultlen:
524 raise errors.OpExecError("Received %s results (%s), expected %s" %
525 (len(results), results, expect_resultlen))
526
527
528 all_messages = [i for j in expect_messages for i in j]
529 if reporter.GetTestMessages() != all_messages:
530 raise errors.OpExecError("Received test messages don't match input"
531 " (input %r, received %r)" %
532 (all_messages, reporter.GetTestMessages()))
533
534
535 reported_job_id = reporter.GetJobId()
536 if reported_job_id != job_id:
537 raise errors.OpExecError("Reported job ID %s doesn't match"
538 "submission job ID %s" %
539 (reported_job_id, job_id))
540
541 jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
542 if not jobdetails:
543 raise errors.OpExecError("Can't find job %s" % job_id)
544
545 if fail:
546 exp_status = constants.JOB_STATUS_ERROR
547 else:
548 exp_status = constants.JOB_STATUS_SUCCESS
549
550 (final_status, final_opstatus) = jobdetails
551 if final_status != exp_status:
552 raise errors.OpExecError("Final job status is %s, not %s as expected" %
553 (final_status, exp_status))
554 if len(final_opstatus) != len(ops):
555 raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
556 " expected %s)" %
557 (len(final_opstatus), len(ops)))
558 if final_opstatus != expect_opstatus:
559 raise errors.OpExecError("Opcode status is %s, expected %s" %
560 (final_opstatus, expect_opstatus))
561
562 ToStdout("Job queue test successful")
563
564 return 0
565
566
568 """List all locks.
569
570 @param opts: the command line options selected by the user
571 @type args: list
572 @param args: should be an empty list
573 @rtype: int
574 @return: the desired exit code
575
576 """
577 selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
578
579 def _DashIfNone(fn):
580 def wrapper(value):
581 if not value:
582 return "-"
583 return fn(value)
584 return wrapper
585
586 def _FormatPending(value):
587 """Format pending acquires.
588
589 """
590 return utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
591 for mode, threads in value)
592
593
594 fmtoverride = {
595 "mode": (_DashIfNone(str), False),
596 "owner": (_DashIfNone(",".join), False),
597 "pending": (_DashIfNone(_FormatPending), False),
598 }
599
600 while True:
601 ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
602 opts.separator, not opts.no_headers,
603 format_override=fmtoverride, verbose=opts.verbose)
604
605 if ret != constants.EXIT_SUCCESS:
606 return ret
607
608 if not opts.interval:
609 break
610
611 ToStdout("")
612 time.sleep(opts.interval)
613
614 return 0
615
616
617 commands = {
618 "delay": (
619 Delay, [ArgUnknown(min=1, max=1)],
620 [cli_option("--no-master", dest="on_master", default=True,
621 action="store_false", help="Do not sleep in the master code"),
622 cli_option("-n", dest="on_nodes", default=[],
623 action="append", help="Select nodes to sleep on"),
624 cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
625 help="Number of times to repeat the sleep"),
626 DRY_RUN_OPT, PRIORITY_OPT,
627 ],
628 "[opts...] <duration>", "Executes a TestDelay OpCode"),
629 "submit-job": (
630 GenericOpCodes, [ArgFile(min=1)],
631 [VERBOSE_OPT,
632 cli_option("--op-repeat", type="int", default="1", dest="rep_op",
633 help="Repeat the opcode sequence this number of times"),
634 cli_option("--job-repeat", type="int", default="1", dest="rep_job",
635 help="Repeat the job this number of times"),
636 cli_option("--timing-stats", default=False,
637 action="store_true", help="Show timing stats"),
638 cli_option("--each", default=False, action="store_true",
639 help="Submit each job separately"),
640 DRY_RUN_OPT, PRIORITY_OPT,
641 ],
642 "<op_list_file...>", "Submits jobs built from json files"
643 " containing a list of serialized opcodes"),
644 "iallocator": (
645 TestAllocator, [ArgUnknown(min=1)],
646 [cli_option("--dir", dest="direction", default=constants.IALLOCATOR_DIR_IN,
647 choices=list(constants.VALID_IALLOCATOR_DIRECTIONS),
648 help="Show allocator input (in) or allocator"
649 " results (out)"),
650 IALLOCATOR_OPT,
651 cli_option("-m", "--mode", default="relocate",
652 choices=list(constants.VALID_IALLOCATOR_MODES),
653 help=("Request mode (one of %s)" %
654 utils.CommaJoin(constants.VALID_IALLOCATOR_MODES))),
655 cli_option("--memory", default=128, type="unit",
656 help="Memory size for the instance (MiB)"),
657 cli_option("--disks", default="4096,4096",
658 help="Comma separated list of disk sizes (MiB)"),
659 DISK_TEMPLATE_OPT,
660 cli_option("--nics", default="00:11:22:33:44:55",
661 help="Comma separated list of nics, each nic"
662 " definition is of form mac/ip/bridge, if"
663 " missing values are replace by None"),
664 OS_OPT,
665 cli_option("-p", "--vcpus", default=1, type="int",
666 help="Select number of VCPUs for the instance"),
667 cli_option("--tags", default=None,
668 help="Comma separated list of tags"),
669 cli_option("--evac-mode", default=constants.IALLOCATOR_NEVAC_ALL,
670 choices=list(constants.IALLOCATOR_NEVAC_MODES),
671 help=("Node evacuation mode (one of %s)" %
672 utils.CommaJoin(constants.IALLOCATOR_NEVAC_MODES))),
673 cli_option("--target-groups", help="Target groups for relocation",
674 default=[], action="append"),
675 DRY_RUN_OPT, PRIORITY_OPT,
676 ],
677 "{opts...} <instance>", "Executes a TestAllocator OpCode"),
678 "test-jobqueue": (
679 TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
680 "", "Test a few aspects of the job queue"),
681 "locks": (
682 ListLocks, ARGS_NONE,
683 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT, VERBOSE_OPT],
684 "[--interval N]", "Show a list of locks in the master daemon"),
685 }
686
687
688 aliases = {
689 "allocator": "iallocator",
690 }
691
692
695