1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """Debugging commands"""
22
23
24
25
26
27
28 import simplejson
29 import time
30 import socket
31 import logging
32
33 from ganeti.cli import *
34 from ganeti import cli
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import compat
40 from ganeti import ht
41
42
43
44 _LIST_LOCKS_DEF_FIELDS = [
45 "name",
46 "mode",
47 "owner",
48 "pending",
49 ]
50
51
53 """Sleeps for a while
54
55 @param opts: the command line options selected by the user
56 @type args: list
57 @param args: should contain only one element, the duration
58 the sleep
59 @rtype: int
60 @return: the desired exit code
61
62 """
63 delay = float(args[0])
64 op = opcodes.OpTestDelay(duration=delay,
65 on_master=opts.on_master,
66 on_nodes=opts.on_nodes,
67 repeat=opts.repeat)
68 SubmitOrSend(op, opts)
69
70 return 0
71
72
74 """Send any opcode to the master.
75
76 @param opts: the command line options selected by the user
77 @type args: list
78 @param args: should contain only one element, the path of
79 the file with the opcode definition
80 @rtype: int
81 @return: the desired exit code
82
83 """
84 cl = cli.GetClient()
85 jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
86
87 job_cnt = 0
88 op_cnt = 0
89 if opts.timing_stats:
90 ToStdout("Loading...")
91 for job_idx in range(opts.rep_job):
92 for fname in args:
93
94 op_data = simplejson.loads(utils.ReadFile(fname))
95 op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
96 op_list = op_list * opts.rep_op
97 jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
98 op_cnt += len(op_list)
99 job_cnt += 1
100
101 if opts.timing_stats:
102 t1 = time.time()
103 ToStdout("Submitting...")
104
105 jex.SubmitPending(each=opts.each)
106
107 if opts.timing_stats:
108 t2 = time.time()
109 ToStdout("Executing...")
110
111 jex.GetResults()
112 if opts.timing_stats:
113 t3 = time.time()
114 ToStdout("C:op %4d" % op_cnt)
115 ToStdout("C:job %4d" % job_cnt)
116 ToStdout("T:submit %4.4f" % (t2 - t1))
117 ToStdout("T:exec %4.4f" % (t3 - t2))
118 ToStdout("T:total %4.4f" % (t3 - t1))
119 return 0
120
121
123 """Runs the test allocator opcode.
124
125 @param opts: the command line options selected by the user
126 @type args: list
127 @param args: should contain only one element, the iallocator name
128 @rtype: int
129 @return: the desired exit code
130
131 """
132 try:
133 disks = [{
134 constants.IDISK_SIZE: utils.ParseUnit(val),
135 constants.IDISK_MODE: constants.DISK_RDWR,
136 } for val in opts.disks.split(",")]
137 except errors.UnitParseError, err:
138 ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
139 return 1
140
141 nics = [val.split("/") for val in opts.nics.split(",")]
142 for row in nics:
143 while len(row) < 3:
144 row.append(None)
145 for i in range(3):
146 if row[i] == "":
147 row[i] = None
148 nic_dict = [{
149 constants.INIC_MAC: v[0],
150 constants.INIC_IP: v[1],
151
152 "bridge": v[2],
153 } for v in nics]
154
155 if opts.tags is None:
156 opts.tags = []
157 else:
158 opts.tags = opts.tags.split(",")
159 if opts.target_groups is None:
160 target_groups = []
161 else:
162 target_groups = opts.target_groups
163
164 op = opcodes.OpTestAllocator(mode=opts.mode,
165 name=args[0],
166 instances=args,
167 memory=opts.memory,
168 disks=disks,
169 disk_template=opts.disk_template,
170 nics=nic_dict,
171 os=opts.os,
172 vcpus=opts.vcpus,
173 tags=opts.tags,
174 direction=opts.direction,
175 allocator=opts.iallocator,
176 evac_mode=opts.evac_mode,
177 target_groups=target_groups,
178 spindle_use=opts.spindle_use)
179 result = SubmitOpCode(op, opts=opts)
180 ToStdout("%s" % result)
181 return 0
182
183
185 """Tests job dependencies.
186
187 """
188 ToStdout("Testing job dependencies")
189
190 cl = cli.GetClient()
191
192 try:
193 SubmitOpCode(opcodes.OpTestDelay(duration=0, depends=[(-1, None)]), cl=cl)
194 except errors.GenericError, err:
195 if opts.debug:
196 ToStdout("Ignoring error: %s", err)
197 else:
198 raise errors.OpExecError("Submitting plain opcode with relative job ID"
199 " did not fail as expected")
200
201
202 jobs = [
203 [opcodes.OpTestDelay(duration=1)],
204 [opcodes.OpTestDelay(duration=1,
205 depends=[(-1, [])])],
206 [opcodes.OpTestDelay(duration=1,
207 depends=[(-2, [constants.JOB_STATUS_SUCCESS])])],
208 [opcodes.OpTestDelay(duration=1,
209 depends=[])],
210 [opcodes.OpTestDelay(duration=1,
211 depends=[(-2, [constants.JOB_STATUS_SUCCESS])])],
212 ]
213
214
215 check_fn = ht.TListOf(ht.TAnd(ht.TIsLength(2),
216 ht.TItems([ht.TBool,
217 ht.TOr(ht.TNonEmptyString,
218 ht.TJobId)])))
219
220 result = cl.SubmitManyJobs(jobs)
221 if not check_fn(result):
222 raise errors.OpExecError("Job submission doesn't match %s: %s" %
223 (check_fn, result))
224
225
226 jex = JobExecutor(cl=cl, opts=opts)
227
228 for (status, job_id) in result:
229 jex.AddJobId(None, status, job_id)
230
231 job_results = jex.GetResults()
232 if not compat.all(row[0] for row in job_results):
233 raise errors.OpExecError("At least one of the submitted jobs failed: %s" %
234 job_results)
235
236
237 data = cl.QueryJobs([job_id for (_, job_id) in result],
238 ["id", "opexec", "ops"])
239 data_job_id = [job_id for (job_id, _, _) in data]
240 data_opexec = [opexec for (_, opexec, _) in data]
241 data_op = [[opcodes.OpCode.LoadOpCode(op) for op in ops]
242 for (_, _, ops) in data]
243
244 assert compat.all(not op.depends or len(op.depends) == 1
245 for ops in data_op
246 for op in ops)
247
248
249 for (job_idx, res_jobdep) in [(1, data_job_id[0]),
250 (2, data_job_id[0]),
251 (4, data_job_id[2])]:
252 if data_op[job_idx][0].depends[0][0] != res_jobdep:
253 raise errors.OpExecError("Job %s's opcode doesn't depend on correct job"
254 " ID (%s)" % (job_idx, res_jobdep))
255
256
257 if not (data_opexec[0] <= data_opexec[1] and
258 data_opexec[0] <= data_opexec[2] and
259 data_opexec[2] <= data_opexec[4]):
260 raise errors.OpExecError("Jobs did not run in correct order: %s" % data)
261
262 assert len(jobs) == 5 and compat.all(len(ops) == 1 for ops in jobs)
263
264 ToStdout("Job dependency tests were successful")
265
266
268 """Tests submitting jobs.
269
270 """
271 ToStdout("Testing job submission")
272
273 testdata = [
274 (0, 0, constants.OP_PRIO_LOWEST),
275 (0, 0, constants.OP_PRIO_HIGHEST),
276 ]
277
278 for priority in (constants.OP_PRIO_SUBMIT_VALID |
279 frozenset([constants.OP_PRIO_LOWEST,
280 constants.OP_PRIO_HIGHEST])):
281 for offset in [-1, +1]:
282 testdata.extend([
283 (0, 0, priority + offset),
284 (3, 0, priority + offset),
285 (0, 3, priority + offset),
286 (4, 2, priority + offset),
287 ])
288
289 cl = cli.GetClient()
290
291 for before, after, failpriority in testdata:
292 ops = []
293 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
294 ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
295 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
296
297 try:
298 cl.SubmitJob(ops)
299 except errors.GenericError, err:
300 if opts.debug:
301 ToStdout("Ignoring error: %s", err)
302 else:
303 raise errors.OpExecError("Submitting opcode with priority %s did not"
304 " fail when it should (allowed are %s)" %
305 (failpriority, constants.OP_PRIO_SUBMIT_VALID))
306
307 jobs = [
308 [opcodes.OpTestDelay(duration=0),
309 opcodes.OpTestDelay(duration=0, dry_run=False),
310 opcodes.OpTestDelay(duration=0, dry_run=True)],
311 ops,
312 ]
313 result = cl.SubmitManyJobs(jobs)
314 if not (len(result) == 2 and
315 compat.all(len(i) == 2 for i in result) and
316 compat.all(isinstance(i[1], basestring) for i in result) and
317 result[0][0] and not result[1][0]):
318 raise errors.OpExecError("Submitting multiple jobs did not work as"
319 " expected, result %s" % result)
320 assert len(result) == 2
321
322 ToStdout("Job submission tests were successful")
323
324
327 """Initializes this class.
328
329 """
330 cli.StdioJobPollReportCb.__init__(self)
331 self._expected_msgcount = 0
332 self._all_testmsgs = []
333 self._testmsgs = None
334 self._job_id = None
335
337 """Returns all test log messages received so far.
338
339 """
340 return self._all_testmsgs
341
343 """Returns the job ID.
344
345 """
346 return self._job_id
347
349 """Handles a log message.
350
351 """
352 if self._job_id is None:
353 self._job_id = job_id
354 elif self._job_id != job_id:
355 raise errors.ProgrammerError("The same reporter instance was used for"
356 " more than one job")
357
358 if log_type == constants.ELOG_JQUEUE_TEST:
359 (sockname, test, arg) = log_msg
360 return self._ProcessTestMessage(job_id, sockname, test, arg)
361
362 elif (log_type == constants.ELOG_MESSAGE and
363 log_msg.startswith(constants.JQT_MSGPREFIX)):
364 if self._testmsgs is None:
365 raise errors.OpExecError("Received test message without a preceding"
366 " start message")
367 testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
368 self._testmsgs.append(testmsg)
369 self._all_testmsgs.append(testmsg)
370 return
371
372 return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
373 timestamp, log_type,
374 log_msg)
375
377 """Handles a job queue test message.
378
379 """
380 if test not in constants.JQT_ALL:
381 raise errors.OpExecError("Received invalid test message %s" % test)
382
383 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
384 try:
385 sock.settimeout(30.0)
386
387 logging.debug("Connecting to %s", sockname)
388 sock.connect(sockname)
389
390 logging.debug("Checking status")
391 jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
392 if not jobdetails:
393 raise errors.OpExecError("Can't find job %s" % job_id)
394
395 status = jobdetails[0]
396
397 logging.debug("Status of job %s is %s", job_id, status)
398
399 if test == constants.JQT_EXPANDNAMES:
400 if status != constants.JOB_STATUS_WAITING:
401 raise errors.OpExecError("Job status while expanding names is '%s',"
402 " not '%s' as expected" %
403 (status, constants.JOB_STATUS_WAITING))
404 elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
405 if status != constants.JOB_STATUS_RUNNING:
406 raise errors.OpExecError("Job status while executing opcode is '%s',"
407 " not '%s' as expected" %
408 (status, constants.JOB_STATUS_RUNNING))
409
410 if test == constants.JQT_STARTMSG:
411 logging.debug("Expecting %s test messages", arg)
412 self._testmsgs = []
413 elif test == constants.JQT_LOGMSG:
414 if len(self._testmsgs) != arg:
415 raise errors.OpExecError("Received %s test messages when %s are"
416 " expected" % (len(self._testmsgs), arg))
417 finally:
418 logging.debug("Closing socket")
419 sock.close()
420
421
423 """Runs a few tests on the job queue.
424
425 """
426 _TestJobSubmission(opts)
427 _TestJobDependency(opts)
428
429 (TM_SUCCESS,
430 TM_MULTISUCCESS,
431 TM_FAIL,
432 TM_PARTFAIL) = range(4)
433 TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
434
435 for mode in TM_ALL:
436 test_messages = [
437 "Testing mode %s" % mode,
438 "Hello World",
439 "A",
440 "",
441 "B"
442 "Foo|bar|baz",
443 utils.TimestampForFilename(),
444 ]
445
446 fail = mode in (TM_FAIL, TM_PARTFAIL)
447
448 if mode == TM_PARTFAIL:
449 ToStdout("Testing partial job failure")
450 ops = [
451 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
452 log_messages=test_messages, fail=False),
453 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
454 log_messages=test_messages, fail=False),
455 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
456 log_messages=test_messages, fail=True),
457 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
458 log_messages=test_messages, fail=False),
459 ]
460 expect_messages = 3 * [test_messages]
461 expect_opstatus = [
462 constants.OP_STATUS_SUCCESS,
463 constants.OP_STATUS_SUCCESS,
464 constants.OP_STATUS_ERROR,
465 constants.OP_STATUS_ERROR,
466 ]
467 expect_resultlen = 2
468 elif mode == TM_MULTISUCCESS:
469 ToStdout("Testing multiple successful opcodes")
470 ops = [
471 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
472 log_messages=test_messages, fail=False),
473 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
474 log_messages=test_messages, fail=False),
475 ]
476 expect_messages = 2 * [test_messages]
477 expect_opstatus = [
478 constants.OP_STATUS_SUCCESS,
479 constants.OP_STATUS_SUCCESS,
480 ]
481 expect_resultlen = 2
482 else:
483 if mode == TM_SUCCESS:
484 ToStdout("Testing job success")
485 expect_opstatus = [constants.OP_STATUS_SUCCESS]
486 elif mode == TM_FAIL:
487 ToStdout("Testing job failure")
488 expect_opstatus = [constants.OP_STATUS_ERROR]
489 else:
490 raise errors.ProgrammerError("Unknown test mode %s" % mode)
491
492 ops = [
493 opcodes.OpTestJqueue(notify_waitlock=True,
494 notify_exec=True,
495 log_messages=test_messages,
496 fail=fail)
497 ]
498 expect_messages = [test_messages]
499 expect_resultlen = 1
500
501 cl = cli.GetClient()
502 cli.SetGenericOpcodeOpts(ops, opts)
503
504
505 job_id = cli.SendJob(ops, cl=cl)
506
507 reporter = _JobQueueTestReporter()
508 results = None
509
510 try:
511 results = cli.PollJob(job_id, cl=cl, reporter=reporter)
512 except errors.OpExecError, err:
513 if not fail:
514 raise
515 ToStdout("Ignoring error: %s", err)
516 else:
517 if fail:
518 raise errors.OpExecError("Job didn't fail when it should")
519
520
521 if fail:
522 if results is not None:
523 raise errors.OpExecError("Received result from failed job")
524 elif len(results) != expect_resultlen:
525 raise errors.OpExecError("Received %s results (%s), expected %s" %
526 (len(results), results, expect_resultlen))
527
528
529 all_messages = [i for j in expect_messages for i in j]
530 if reporter.GetTestMessages() != all_messages:
531 raise errors.OpExecError("Received test messages don't match input"
532 " (input %r, received %r)" %
533 (all_messages, reporter.GetTestMessages()))
534
535
536 reported_job_id = reporter.GetJobId()
537 if reported_job_id != job_id:
538 raise errors.OpExecError("Reported job ID %s doesn't match"
539 "submission job ID %s" %
540 (reported_job_id, job_id))
541
542 jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
543 if not jobdetails:
544 raise errors.OpExecError("Can't find job %s" % job_id)
545
546 if fail:
547 exp_status = constants.JOB_STATUS_ERROR
548 else:
549 exp_status = constants.JOB_STATUS_SUCCESS
550
551 (final_status, final_opstatus) = jobdetails
552 if final_status != exp_status:
553 raise errors.OpExecError("Final job status is %s, not %s as expected" %
554 (final_status, exp_status))
555 if len(final_opstatus) != len(ops):
556 raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
557 " expected %s)" %
558 (len(final_opstatus), len(ops)))
559 if final_opstatus != expect_opstatus:
560 raise errors.OpExecError("Opcode status is %s, expected %s" %
561 (final_opstatus, expect_opstatus))
562
563 ToStdout("Job queue test successful")
564
565 return 0
566
567
569 """List all locks.
570
571 @param opts: the command line options selected by the user
572 @type args: list
573 @param args: should be an empty list
574 @rtype: int
575 @return: the desired exit code
576
577 """
578 selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
579
580 def _DashIfNone(fn):
581 def wrapper(value):
582 if not value:
583 return "-"
584 return fn(value)
585 return wrapper
586
587 def _FormatPending(value):
588 """Format pending acquires.
589
590 """
591 return utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
592 for mode, threads in value)
593
594
595 fmtoverride = {
596 "mode": (_DashIfNone(str), False),
597 "owner": (_DashIfNone(",".join), False),
598 "pending": (_DashIfNone(_FormatPending), False),
599 }
600
601 while True:
602 ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
603 opts.separator, not opts.no_headers,
604 format_override=fmtoverride, verbose=opts.verbose)
605
606 if ret != constants.EXIT_SUCCESS:
607 return ret
608
609 if not opts.interval:
610 break
611
612 ToStdout("")
613 time.sleep(opts.interval)
614
615 return 0
616
617
618 commands = {
619 "delay": (
620 Delay, [ArgUnknown(min=1, max=1)],
621 [cli_option("--no-master", dest="on_master", default=True,
622 action="store_false", help="Do not sleep in the master code"),
623 cli_option("-n", dest="on_nodes", default=[],
624 action="append", help="Select nodes to sleep on"),
625 cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
626 help="Number of times to repeat the sleep"),
627 DRY_RUN_OPT, PRIORITY_OPT, SUBMIT_OPT,
628 ],
629 "[opts...] <duration>", "Executes a TestDelay OpCode"),
630 "submit-job": (
631 GenericOpCodes, [ArgFile(min=1)],
632 [VERBOSE_OPT,
633 cli_option("--op-repeat", type="int", default="1", dest="rep_op",
634 help="Repeat the opcode sequence this number of times"),
635 cli_option("--job-repeat", type="int", default="1", dest="rep_job",
636 help="Repeat the job this number of times"),
637 cli_option("--timing-stats", default=False,
638 action="store_true", help="Show timing stats"),
639 cli_option("--each", default=False, action="store_true",
640 help="Submit each job separately"),
641 DRY_RUN_OPT, PRIORITY_OPT,
642 ],
643 "<op_list_file...>", "Submits jobs built from json files"
644 " containing a list of serialized opcodes"),
645 "iallocator": (
646 TestAllocator, [ArgUnknown(min=1)],
647 [cli_option("--dir", dest="direction", default=constants.IALLOCATOR_DIR_IN,
648 choices=list(constants.VALID_IALLOCATOR_DIRECTIONS),
649 help="Show allocator input (in) or allocator"
650 " results (out)"),
651 IALLOCATOR_OPT,
652 cli_option("-m", "--mode", default="relocate",
653 choices=list(constants.VALID_IALLOCATOR_MODES),
654 help=("Request mode (one of %s)" %
655 utils.CommaJoin(constants.VALID_IALLOCATOR_MODES))),
656 cli_option("--memory", default=128, type="unit",
657 help="Memory size for the instance (MiB)"),
658 cli_option("--disks", default="4096,4096",
659 help="Comma separated list of disk sizes (MiB)"),
660 DISK_TEMPLATE_OPT,
661 cli_option("--nics", default="00:11:22:33:44:55",
662 help="Comma separated list of nics, each nic"
663 " definition is of form mac/ip/bridge, if"
664 " missing values are replace by None"),
665 OS_OPT,
666 cli_option("-p", "--vcpus", default=1, type="int",
667 help="Select number of VCPUs for the instance"),
668 cli_option("--tags", default=None,
669 help="Comma separated list of tags"),
670 cli_option("--evac-mode", default=constants.IALLOCATOR_NEVAC_ALL,
671 choices=list(constants.IALLOCATOR_NEVAC_MODES),
672 help=("Node evacuation mode (one of %s)" %
673 utils.CommaJoin(constants.IALLOCATOR_NEVAC_MODES))),
674 cli_option("--target-groups", help="Target groups for relocation",
675 default=[], action="append"),
676 cli_option("--spindle-use", help="How many spindles to use",
677 default=1, type="int"),
678 DRY_RUN_OPT, PRIORITY_OPT,
679 ],
680 "{opts...} <instance>", "Executes a TestAllocator OpCode"),
681 "test-jobqueue": (
682 TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
683 "", "Test a few aspects of the job queue"),
684 "locks": (
685 ListLocks, ARGS_NONE,
686 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT, VERBOSE_OPT],
687 "[--interval N]", "Show a list of locks in the master daemon"),
688 }
689
690
691 aliases = {
692 "allocator": "iallocator",
693 }
694
695
698