1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """Debugging commands"""
22
23
24
25
26
27
28 import simplejson
29 import time
30 import socket
31 import logging
32
33 from ganeti.cli import *
34 from ganeti import cli
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import compat
40 from ganeti import ht
41
42
43
44 _LIST_LOCKS_DEF_FIELDS = [
45 "name",
46 "mode",
47 "owner",
48 "pending",
49 ]
50
51
53 """Sleeps for a while
54
55 @param opts: the command line options selected by the user
56 @type args: list
57 @param args: should contain only one element, the duration
58 the sleep
59 @rtype: int
60 @return: the desired exit code
61
62 """
63 delay = float(args[0])
64 op = opcodes.OpTestDelay(duration=delay,
65 on_master=opts.on_master,
66 on_nodes=opts.on_nodes,
67 repeat=opts.repeat)
68 SubmitOrSend(op, opts)
69
70 return 0
71
72
74 """Send any opcode to the master.
75
76 @param opts: the command line options selected by the user
77 @type args: list
78 @param args: should contain only one element, the path of
79 the file with the opcode definition
80 @rtype: int
81 @return: the desired exit code
82
83 """
84 cl = cli.GetClient()
85 jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
86
87 job_cnt = 0
88 op_cnt = 0
89 if opts.timing_stats:
90 ToStdout("Loading...")
91 for job_idx in range(opts.rep_job):
92 for fname in args:
93
94 op_data = simplejson.loads(utils.ReadFile(fname))
95 op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
96 op_list = op_list * opts.rep_op
97 jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
98 op_cnt += len(op_list)
99 job_cnt += 1
100
101 if opts.timing_stats:
102 t1 = time.time()
103 ToStdout("Submitting...")
104
105 jex.SubmitPending(each=opts.each)
106
107 if opts.timing_stats:
108 t2 = time.time()
109 ToStdout("Executing...")
110
111 jex.GetResults()
112 if opts.timing_stats:
113 t3 = time.time()
114 ToStdout("C:op %4d" % op_cnt)
115 ToStdout("C:job %4d" % job_cnt)
116 ToStdout("T:submit %4.4f" % (t2 - t1))
117 ToStdout("T:exec %4.4f" % (t3 - t2))
118 ToStdout("T:total %4.4f" % (t3 - t1))
119 return 0
120
121
123 """Runs the test allocator opcode.
124
125 @param opts: the command line options selected by the user
126 @type args: list
127 @param args: should contain only one element, the iallocator name
128 @rtype: int
129 @return: the desired exit code
130
131 """
132 try:
133 disks = [{
134 constants.IDISK_SIZE: utils.ParseUnit(val),
135 constants.IDISK_MODE: constants.DISK_RDWR,
136 } for val in opts.disks.split(",")]
137 except errors.UnitParseError, err:
138 ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
139 return 1
140
141 nics = [val.split("/") for val in opts.nics.split(",")]
142 for row in nics:
143 while len(row) < 3:
144 row.append(None)
145 for i in range(3):
146 if row[i] == "":
147 row[i] = None
148 nic_dict = [{
149 constants.INIC_MAC: v[0],
150 constants.INIC_IP: v[1],
151
152 "bridge": v[2],
153 } for v in nics]
154
155 if opts.tags is None:
156 opts.tags = []
157 else:
158 opts.tags = opts.tags.split(",")
159 if opts.target_groups is None:
160 target_groups = []
161 else:
162 target_groups = opts.target_groups
163
164 op = opcodes.OpTestAllocator(mode=opts.mode,
165 name=args[0],
166 instances=args,
167 memory=opts.memory,
168 disks=disks,
169 disk_template=opts.disk_template,
170 nics=nic_dict,
171 os=opts.os,
172 vcpus=opts.vcpus,
173 tags=opts.tags,
174 direction=opts.direction,
175 iallocator=opts.iallocator,
176 evac_mode=opts.evac_mode,
177 target_groups=target_groups,
178 spindle_use=opts.spindle_use,
179 count=opts.count)
180 result = SubmitOpCode(op, opts=opts)
181 ToStdout("%s" % result)
182 return 0
183
184
186 """Tests job dependencies.
187
188 """
189 ToStdout("Testing job dependencies")
190
191 cl = cli.GetClient()
192
193 try:
194 SubmitOpCode(opcodes.OpTestDelay(duration=0, depends=[(-1, None)]), cl=cl)
195 except errors.GenericError, err:
196 if opts.debug:
197 ToStdout("Ignoring error for 'wrong dependencies' test: %s", err)
198 else:
199 raise errors.OpExecError("Submitting plain opcode with relative job ID"
200 " did not fail as expected")
201
202
203 jobs = [
204 [opcodes.OpTestDelay(duration=1)],
205 [opcodes.OpTestDelay(duration=1,
206 depends=[(-1, [])])],
207 [opcodes.OpTestDelay(duration=1,
208 depends=[(-2, [constants.JOB_STATUS_SUCCESS])])],
209 [opcodes.OpTestDelay(duration=1,
210 depends=[])],
211 [opcodes.OpTestDelay(duration=1,
212 depends=[(-2, [constants.JOB_STATUS_SUCCESS])])],
213 ]
214
215
216 check_fn = ht.TListOf(ht.TAnd(ht.TIsLength(2),
217 ht.TItems([ht.TBool,
218 ht.TOr(ht.TNonEmptyString,
219 ht.TJobId)])))
220
221 result = cl.SubmitManyJobs(jobs)
222 if not check_fn(result):
223 raise errors.OpExecError("Job submission doesn't match %s: %s" %
224 (check_fn, result))
225
226
227 jex = JobExecutor(cl=cl, opts=opts)
228
229 for (status, job_id) in result:
230 jex.AddJobId(None, status, job_id)
231
232 job_results = jex.GetResults()
233 if not compat.all(row[0] for row in job_results):
234 raise errors.OpExecError("At least one of the submitted jobs failed: %s" %
235 job_results)
236
237
238 data = cl.QueryJobs([job_id for (_, job_id) in result],
239 ["id", "opexec", "ops"])
240 data_job_id = [job_id for (job_id, _, _) in data]
241 data_opexec = [opexec for (_, opexec, _) in data]
242 data_op = [[opcodes.OpCode.LoadOpCode(op) for op in ops]
243 for (_, _, ops) in data]
244
245 assert compat.all(not op.depends or len(op.depends) == 1
246 for ops in data_op
247 for op in ops)
248
249
250 for (job_idx, res_jobdep) in [(1, data_job_id[0]),
251 (2, data_job_id[0]),
252 (4, data_job_id[2])]:
253 if data_op[job_idx][0].depends[0][0] != res_jobdep:
254 raise errors.OpExecError("Job %s's opcode doesn't depend on correct job"
255 " ID (%s)" % (job_idx, res_jobdep))
256
257
258 if not (data_opexec[0] <= data_opexec[1] and
259 data_opexec[0] <= data_opexec[2] and
260 data_opexec[2] <= data_opexec[4]):
261 raise errors.OpExecError("Jobs did not run in correct order: %s" % data)
262
263 assert len(jobs) == 5 and compat.all(len(ops) == 1 for ops in jobs)
264
265 ToStdout("Job dependency tests were successful")
266
267
269 """Tests submitting jobs.
270
271 """
272 ToStdout("Testing job submission")
273
274 testdata = [
275 (0, 0, constants.OP_PRIO_LOWEST),
276 (0, 0, constants.OP_PRIO_HIGHEST),
277 ]
278
279 for priority in (constants.OP_PRIO_SUBMIT_VALID |
280 frozenset([constants.OP_PRIO_LOWEST,
281 constants.OP_PRIO_HIGHEST])):
282 for offset in [-1, +1]:
283 testdata.extend([
284 (0, 0, priority + offset),
285 (3, 0, priority + offset),
286 (0, 3, priority + offset),
287 (4, 2, priority + offset),
288 ])
289
290 cl = cli.GetClient()
291
292 for before, after, failpriority in testdata:
293 ops = []
294 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
295 ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
296 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
297
298 try:
299 cl.SubmitJob(ops)
300 except errors.GenericError, err:
301 if opts.debug:
302 ToStdout("Ignoring error for 'wrong priority' test: %s", err)
303 else:
304 raise errors.OpExecError("Submitting opcode with priority %s did not"
305 " fail when it should (allowed are %s)" %
306 (failpriority, constants.OP_PRIO_SUBMIT_VALID))
307
308 jobs = [
309 [opcodes.OpTestDelay(duration=0),
310 opcodes.OpTestDelay(duration=0, dry_run=False),
311 opcodes.OpTestDelay(duration=0, dry_run=True)],
312 ops,
313 ]
314 result = cl.SubmitManyJobs(jobs)
315 if not (len(result) == 2 and
316 compat.all(len(i) == 2 for i in result) and
317 isinstance(result[0][1], int) and
318 isinstance(result[1][1], basestring) and
319 result[0][0] and not result[1][0]):
320 raise errors.OpExecError("Submitting multiple jobs did not work as"
321 " expected, result %s" % result)
322 assert len(result) == 2
323
324 ToStdout("Job submission tests were successful")
325
326
329 """Initializes this class.
330
331 """
332 cli.StdioJobPollReportCb.__init__(self)
333 self._expected_msgcount = 0
334 self._all_testmsgs = []
335 self._testmsgs = None
336 self._job_id = None
337
339 """Returns all test log messages received so far.
340
341 """
342 return self._all_testmsgs
343
345 """Returns the job ID.
346
347 """
348 return self._job_id
349
351 """Handles a log message.
352
353 """
354 if self._job_id is None:
355 self._job_id = job_id
356 elif self._job_id != job_id:
357 raise errors.ProgrammerError("The same reporter instance was used for"
358 " more than one job")
359
360 if log_type == constants.ELOG_JQUEUE_TEST:
361 (sockname, test, arg) = log_msg
362 return self._ProcessTestMessage(job_id, sockname, test, arg)
363
364 elif (log_type == constants.ELOG_MESSAGE and
365 log_msg.startswith(constants.JQT_MSGPREFIX)):
366 if self._testmsgs is None:
367 raise errors.OpExecError("Received test message without a preceding"
368 " start message")
369 testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
370 self._testmsgs.append(testmsg)
371 self._all_testmsgs.append(testmsg)
372 return
373
374 return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
375 timestamp, log_type,
376 log_msg)
377
379 """Handles a job queue test message.
380
381 """
382 if test not in constants.JQT_ALL:
383 raise errors.OpExecError("Received invalid test message %s" % test)
384
385 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
386 try:
387 sock.settimeout(30.0)
388
389 logging.debug("Connecting to %s", sockname)
390 sock.connect(sockname)
391
392 logging.debug("Checking status")
393 jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
394 if not jobdetails:
395 raise errors.OpExecError("Can't find job %s" % job_id)
396
397 status = jobdetails[0]
398
399 logging.debug("Status of job %s is %s", job_id, status)
400
401 if test == constants.JQT_EXPANDNAMES:
402 if status != constants.JOB_STATUS_WAITING:
403 raise errors.OpExecError("Job status while expanding names is '%s',"
404 " not '%s' as expected" %
405 (status, constants.JOB_STATUS_WAITING))
406 elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
407 if status != constants.JOB_STATUS_RUNNING:
408 raise errors.OpExecError("Job status while executing opcode is '%s',"
409 " not '%s' as expected" %
410 (status, constants.JOB_STATUS_RUNNING))
411
412 if test == constants.JQT_STARTMSG:
413 logging.debug("Expecting %s test messages", arg)
414 self._testmsgs = []
415 elif test == constants.JQT_LOGMSG:
416 if len(self._testmsgs) != arg:
417 raise errors.OpExecError("Received %s test messages when %s are"
418 " expected" % (len(self._testmsgs), arg))
419 finally:
420 logging.debug("Closing socket")
421 sock.close()
422
423
425 """Runs a few tests on the job queue.
426
427 """
428 _TestJobSubmission(opts)
429 _TestJobDependency(opts)
430
431 (TM_SUCCESS,
432 TM_MULTISUCCESS,
433 TM_FAIL,
434 TM_PARTFAIL) = range(4)
435 TM_ALL = compat.UniqueFrozenset([
436 TM_SUCCESS,
437 TM_MULTISUCCESS,
438 TM_FAIL,
439 TM_PARTFAIL,
440 ])
441
442 for mode in TM_ALL:
443 test_messages = [
444 "Testing mode %s" % mode,
445 "Hello World",
446 "A",
447 "",
448 "B"
449 "Foo|bar|baz",
450 utils.TimestampForFilename(),
451 ]
452
453 fail = mode in (TM_FAIL, TM_PARTFAIL)
454
455 if mode == TM_PARTFAIL:
456 ToStdout("Testing partial job failure")
457 ops = [
458 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
459 log_messages=test_messages, fail=False),
460 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
461 log_messages=test_messages, fail=False),
462 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
463 log_messages=test_messages, fail=True),
464 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
465 log_messages=test_messages, fail=False),
466 ]
467 expect_messages = 3 * [test_messages]
468 expect_opstatus = [
469 constants.OP_STATUS_SUCCESS,
470 constants.OP_STATUS_SUCCESS,
471 constants.OP_STATUS_ERROR,
472 constants.OP_STATUS_ERROR,
473 ]
474 expect_resultlen = 2
475 elif mode == TM_MULTISUCCESS:
476 ToStdout("Testing multiple successful opcodes")
477 ops = [
478 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
479 log_messages=test_messages, fail=False),
480 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
481 log_messages=test_messages, fail=False),
482 ]
483 expect_messages = 2 * [test_messages]
484 expect_opstatus = [
485 constants.OP_STATUS_SUCCESS,
486 constants.OP_STATUS_SUCCESS,
487 ]
488 expect_resultlen = 2
489 else:
490 if mode == TM_SUCCESS:
491 ToStdout("Testing job success")
492 expect_opstatus = [constants.OP_STATUS_SUCCESS]
493 elif mode == TM_FAIL:
494 ToStdout("Testing job failure")
495 expect_opstatus = [constants.OP_STATUS_ERROR]
496 else:
497 raise errors.ProgrammerError("Unknown test mode %s" % mode)
498
499 ops = [
500 opcodes.OpTestJqueue(notify_waitlock=True,
501 notify_exec=True,
502 log_messages=test_messages,
503 fail=fail),
504 ]
505 expect_messages = [test_messages]
506 expect_resultlen = 1
507
508 cl = cli.GetClient()
509 cli.SetGenericOpcodeOpts(ops, opts)
510
511
512 job_id = cli.SendJob(ops, cl=cl)
513
514 reporter = _JobQueueTestReporter()
515 results = None
516
517 try:
518 results = cli.PollJob(job_id, cl=cl, reporter=reporter)
519 except errors.OpExecError, err:
520 if not fail:
521 raise
522 ToStdout("Ignoring error for 'job fail' test: %s", err)
523 else:
524 if fail:
525 raise errors.OpExecError("Job didn't fail when it should")
526
527
528 if fail:
529 if results is not None:
530 raise errors.OpExecError("Received result from failed job")
531 elif len(results) != expect_resultlen:
532 raise errors.OpExecError("Received %s results (%s), expected %s" %
533 (len(results), results, expect_resultlen))
534
535
536 all_messages = [i for j in expect_messages for i in j]
537 if reporter.GetTestMessages() != all_messages:
538 raise errors.OpExecError("Received test messages don't match input"
539 " (input %r, received %r)" %
540 (all_messages, reporter.GetTestMessages()))
541
542
543 reported_job_id = reporter.GetJobId()
544 if reported_job_id != job_id:
545 raise errors.OpExecError("Reported job ID %s doesn't match"
546 "submission job ID %s" %
547 (reported_job_id, job_id))
548
549 jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
550 if not jobdetails:
551 raise errors.OpExecError("Can't find job %s" % job_id)
552
553 if fail:
554 exp_status = constants.JOB_STATUS_ERROR
555 else:
556 exp_status = constants.JOB_STATUS_SUCCESS
557
558 (final_status, final_opstatus) = jobdetails
559 if final_status != exp_status:
560 raise errors.OpExecError("Final job status is %s, not %s as expected" %
561 (final_status, exp_status))
562 if len(final_opstatus) != len(ops):
563 raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
564 " expected %s)" %
565 (len(final_opstatus), len(ops)))
566 if final_opstatus != expect_opstatus:
567 raise errors.OpExecError("Opcode status is %s, expected %s" %
568 (final_opstatus, expect_opstatus))
569
570 ToStdout("Job queue test successful")
571
572 return 0
573
574
576 """List all locks.
577
578 @param opts: the command line options selected by the user
579 @type args: list
580 @param args: should be an empty list
581 @rtype: int
582 @return: the desired exit code
583
584 """
585 selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
586
587 def _DashIfNone(fn):
588 def wrapper(value):
589 if not value:
590 return "-"
591 return fn(value)
592 return wrapper
593
594 def _FormatPending(value):
595 """Format pending acquires.
596
597 """
598 return utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
599 for mode, threads in value)
600
601
602 fmtoverride = {
603 "mode": (_DashIfNone(str), False),
604 "owner": (_DashIfNone(",".join), False),
605 "pending": (_DashIfNone(_FormatPending), False),
606 }
607
608 while True:
609 ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
610 opts.separator, not opts.no_headers,
611 format_override=fmtoverride, verbose=opts.verbose)
612
613 if ret != constants.EXIT_SUCCESS:
614 return ret
615
616 if not opts.interval:
617 break
618
619 ToStdout("")
620 time.sleep(opts.interval)
621
622 return 0
623
624
625 commands = {
626 "delay": (
627 Delay, [ArgUnknown(min=1, max=1)],
628 [cli_option("--no-master", dest="on_master", default=True,
629 action="store_false", help="Do not sleep in the master code"),
630 cli_option("-n", dest="on_nodes", default=[],
631 action="append", help="Select nodes to sleep on"),
632 cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
633 help="Number of times to repeat the sleep"),
634 DRY_RUN_OPT, PRIORITY_OPT, SUBMIT_OPT,
635 ],
636 "[opts...] <duration>", "Executes a TestDelay OpCode"),
637 "submit-job": (
638 GenericOpCodes, [ArgFile(min=1)],
639 [VERBOSE_OPT,
640 cli_option("--op-repeat", type="int", default="1", dest="rep_op",
641 help="Repeat the opcode sequence this number of times"),
642 cli_option("--job-repeat", type="int", default="1", dest="rep_job",
643 help="Repeat the job this number of times"),
644 cli_option("--timing-stats", default=False,
645 action="store_true", help="Show timing stats"),
646 cli_option("--each", default=False, action="store_true",
647 help="Submit each job separately"),
648 DRY_RUN_OPT, PRIORITY_OPT,
649 ],
650 "<op_list_file...>", "Submits jobs built from json files"
651 " containing a list of serialized opcodes"),
652 "iallocator": (
653 TestAllocator, [ArgUnknown(min=1)],
654 [cli_option("--dir", dest="direction", default=constants.IALLOCATOR_DIR_IN,
655 choices=list(constants.VALID_IALLOCATOR_DIRECTIONS),
656 help="Show allocator input (in) or allocator"
657 " results (out)"),
658 IALLOCATOR_OPT,
659 cli_option("-m", "--mode", default="relocate",
660 choices=list(constants.VALID_IALLOCATOR_MODES),
661 help=("Request mode (one of %s)" %
662 utils.CommaJoin(constants.VALID_IALLOCATOR_MODES))),
663 cli_option("--memory", default=128, type="unit",
664 help="Memory size for the instance (MiB)"),
665 cli_option("--disks", default="4096,4096",
666 help="Comma separated list of disk sizes (MiB)"),
667 DISK_TEMPLATE_OPT,
668 cli_option("--nics", default="00:11:22:33:44:55",
669 help="Comma separated list of nics, each nic"
670 " definition is of form mac/ip/bridge, if"
671 " missing values are replace by None"),
672 OS_OPT,
673 cli_option("-p", "--vcpus", default=1, type="int",
674 help="Select number of VCPUs for the instance"),
675 cli_option("--tags", default=None,
676 help="Comma separated list of tags"),
677 cli_option("--evac-mode", default=constants.IALLOCATOR_NEVAC_ALL,
678 choices=list(constants.IALLOCATOR_NEVAC_MODES),
679 help=("Node evacuation mode (one of %s)" %
680 utils.CommaJoin(constants.IALLOCATOR_NEVAC_MODES))),
681 cli_option("--target-groups", help="Target groups for relocation",
682 default=[], action="append"),
683 cli_option("--spindle-use", help="How many spindles to use",
684 default=1, type="int"),
685 cli_option("--count", help="How many instances to allocate",
686 default=2, type="int"),
687 DRY_RUN_OPT, PRIORITY_OPT,
688 ],
689 "{opts...} <instance>", "Executes a TestAllocator OpCode"),
690 "test-jobqueue": (
691 TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
692 "", "Test a few aspects of the job queue"),
693 "locks": (
694 ListLocks, ARGS_NONE,
695 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT, VERBOSE_OPT],
696 "[--interval N]", "Show a list of locks in the master daemon"),
697 }
698
699
700 aliases = {
701 "allocator": "iallocator",
702 }
703
704
707