1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 """Debugging commands"""
31
32
33
34
35
36
37 import simplejson
38 import time
39 import socket
40 import logging
41
42 from ganeti.cli import *
43 from ganeti import cli
44 from ganeti import constants
45 from ganeti import opcodes
46 from ganeti import utils
47 from ganeti import errors
48 from ganeti import compat
49 from ganeti import ht
50
51
52
53 _LIST_LOCKS_DEF_FIELDS = [
54 "name",
55 "mode",
56 "owner",
57 "pending",
58 ]
59
60
62 """Sleeps for a while
63
64 @param opts: the command line options selected by the user
65 @type args: list
66 @param args: should contain only one element, the duration
67 the sleep
68 @rtype: int
69 @return: the desired exit code
70
71 """
72 delay = float(args[0])
73 op = opcodes.OpTestDelay(duration=delay,
74 on_master=opts.on_master,
75 on_nodes=opts.on_nodes,
76 repeat=opts.repeat,
77 no_locks=opts.no_locks)
78 SubmitOrSend(op, opts)
79
80 return 0
81
82
84 """Send any opcode to the master.
85
86 @param opts: the command line options selected by the user
87 @type args: list
88 @param args: should contain only one element, the path of
89 the file with the opcode definition
90 @rtype: int
91 @return: the desired exit code
92
93 """
94 cl = cli.GetClient()
95 jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
96
97 job_cnt = 0
98 op_cnt = 0
99 if opts.timing_stats:
100 ToStdout("Loading...")
101 for job_idx in range(opts.rep_job):
102 for fname in args:
103
104 op_data = simplejson.loads(utils.ReadFile(fname))
105 op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
106 op_list = op_list * opts.rep_op
107 jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
108 op_cnt += len(op_list)
109 job_cnt += 1
110
111 if opts.timing_stats:
112 t1 = time.time()
113 ToStdout("Submitting...")
114
115 jex.SubmitPending(each=opts.each)
116
117 if opts.timing_stats:
118 t2 = time.time()
119 ToStdout("Executing...")
120
121 jex.GetResults()
122 if opts.timing_stats:
123 t3 = time.time()
124 ToStdout("C:op %4d" % op_cnt)
125 ToStdout("C:job %4d" % job_cnt)
126 ToStdout("T:submit %4.4f" % (t2 - t1))
127 ToStdout("T:exec %4.4f" % (t3 - t2))
128 ToStdout("T:total %4.4f" % (t3 - t1))
129 return 0
130
131
133 """Runs the test allocator opcode.
134
135 @param opts: the command line options selected by the user
136 @type args: list
137 @param args: should contain only one element, the iallocator name
138 @rtype: int
139 @return: the desired exit code
140
141 """
142 try:
143 disks = [{
144 constants.IDISK_SIZE: utils.ParseUnit(val),
145 constants.IDISK_MODE: constants.DISK_RDWR,
146 } for val in opts.disks.split(",")]
147 except errors.UnitParseError, err:
148 ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
149 return 1
150
151 nics = [val.split("/") for val in opts.nics.split(",")]
152 for row in nics:
153 while len(row) < 3:
154 row.append(None)
155 for i in range(3):
156 if row[i] == "":
157 row[i] = None
158 nic_dict = [{
159 constants.INIC_MAC: v[0],
160 constants.INIC_IP: v[1],
161
162 "bridge": v[2],
163 } for v in nics]
164
165 if opts.tags is None:
166 opts.tags = []
167 else:
168 opts.tags = opts.tags.split(",")
169 if opts.target_groups is None:
170 target_groups = []
171 else:
172 target_groups = opts.target_groups
173
174 op = opcodes.OpTestAllocator(mode=opts.mode,
175 name=args[0],
176 instances=args,
177 memory=opts.memory,
178 disks=disks,
179 disk_template=opts.disk_template,
180 nics=nic_dict,
181 os=opts.os,
182 vcpus=opts.vcpus,
183 tags=opts.tags,
184 direction=opts.direction,
185 iallocator=opts.iallocator,
186 evac_mode=opts.evac_mode,
187 target_groups=target_groups,
188 spindle_use=opts.spindle_use,
189 count=opts.count)
190 result = SubmitOpCode(op, opts=opts)
191 ToStdout("%s" % result)
192 return 0
193
194
196 """Tests job dependencies.
197
198 """
199 ToStdout("Testing job dependencies")
200
201 cl = cli.GetClient()
202
203 try:
204 SubmitOpCode(opcodes.OpTestDelay(duration=0, depends=[(-1, None)]), cl=cl)
205 except errors.GenericError, err:
206 if opts.debug:
207 ToStdout("Ignoring error for 'wrong dependencies' test: %s", err)
208 else:
209 raise errors.OpExecError("Submitting plain opcode with relative job ID"
210 " did not fail as expected")
211
212
213 jobs = [
214 [opcodes.OpTestDelay(duration=1)],
215 [opcodes.OpTestDelay(duration=1,
216 depends=[(-1, [])])],
217 [opcodes.OpTestDelay(duration=1,
218 depends=[(-2, [constants.JOB_STATUS_SUCCESS])])],
219 [opcodes.OpTestDelay(duration=1,
220 depends=[])],
221 [opcodes.OpTestDelay(duration=1,
222 depends=[(-2, [constants.JOB_STATUS_SUCCESS])])],
223 ]
224
225
226 check_fn = ht.TListOf(ht.TAnd(ht.TIsLength(2),
227 ht.TItems([ht.TBool,
228 ht.TOr(ht.TNonEmptyString,
229 ht.TJobId)])))
230
231 result = cl.SubmitManyJobs(jobs)
232 if not check_fn(result):
233 raise errors.OpExecError("Job submission doesn't match %s: %s" %
234 (check_fn, result))
235
236
237 jex = JobExecutor(cl=cl, opts=opts)
238
239 for (status, job_id) in result:
240 jex.AddJobId(None, status, job_id)
241
242 job_results = jex.GetResults()
243 if not compat.all(row[0] for row in job_results):
244 raise errors.OpExecError("At least one of the submitted jobs failed: %s" %
245 job_results)
246
247
248 data = cl.QueryJobs([job_id for (_, job_id) in result],
249 ["id", "opexec", "ops"])
250 data_job_id = [job_id for (job_id, _, _) in data]
251 data_opexec = [opexec for (_, opexec, _) in data]
252 data_op = [[opcodes.OpCode.LoadOpCode(op) for op in ops]
253 for (_, _, ops) in data]
254
255 assert compat.all(not op.depends or len(op.depends) == 1
256 for ops in data_op
257 for op in ops)
258
259
260 for (job_idx, res_jobdep) in [(1, data_job_id[0]),
261 (2, data_job_id[0]),
262 (4, data_job_id[2])]:
263 if data_op[job_idx][0].depends[0][0] != res_jobdep:
264 raise errors.OpExecError("Job %s's opcode doesn't depend on correct job"
265 " ID (%s)" % (job_idx, res_jobdep))
266
267
268 if not (data_opexec[0] <= data_opexec[1] and
269 data_opexec[0] <= data_opexec[2] and
270 data_opexec[2] <= data_opexec[4]):
271 raise errors.OpExecError("Jobs did not run in correct order: %s" % data)
272
273 assert len(jobs) == 5 and compat.all(len(ops) == 1 for ops in jobs)
274
275 ToStdout("Job dependency tests were successful")
276
277
279 """Tests submitting jobs.
280
281 """
282 ToStdout("Testing job submission")
283
284 testdata = [
285 (0, 0, constants.OP_PRIO_LOWEST),
286 (0, 0, constants.OP_PRIO_HIGHEST),
287 ]
288
289 for priority in (constants.OP_PRIO_SUBMIT_VALID |
290 frozenset([constants.OP_PRIO_LOWEST,
291 constants.OP_PRIO_HIGHEST])):
292 for offset in [-1, +1]:
293 testdata.extend([
294 (0, 0, priority + offset),
295 (3, 0, priority + offset),
296 (0, 3, priority + offset),
297 (4, 2, priority + offset),
298 ])
299
300 cl = cli.GetClient()
301
302 for before, after, failpriority in testdata:
303 ops = []
304 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
305 ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
306 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
307
308 try:
309 cl.SubmitJob(ops)
310 except errors.GenericError, err:
311 if opts.debug:
312 ToStdout("Ignoring error for 'wrong priority' test: %s", err)
313 else:
314 raise errors.OpExecError("Submitting opcode with priority %s did not"
315 " fail when it should (allowed are %s)" %
316 (failpriority, constants.OP_PRIO_SUBMIT_VALID))
317
318 jobs = [
319 [opcodes.OpTestDelay(duration=0),
320 opcodes.OpTestDelay(duration=0, dry_run=False),
321 opcodes.OpTestDelay(duration=0, dry_run=True)],
322 ops,
323 ]
324 result = cl.SubmitManyJobs(jobs)
325 if not (len(result) == 2 and
326 compat.all(len(i) == 2 for i in result) and
327 isinstance(result[0][1], int) and
328 isinstance(result[1][1], basestring) and
329 result[0][0] and not result[1][0]):
330 raise errors.OpExecError("Submitting multiple jobs did not work as"
331 " expected, result %s" % result)
332 assert len(result) == 2
333
334 ToStdout("Job submission tests were successful")
335
336
339 """Initializes this class.
340
341 """
342 cli.StdioJobPollReportCb.__init__(self)
343 self._expected_msgcount = 0
344 self._all_testmsgs = []
345 self._testmsgs = None
346 self._job_id = None
347
349 """Returns all test log messages received so far.
350
351 """
352 return self._all_testmsgs
353
355 """Returns the job ID.
356
357 """
358 return self._job_id
359
361 """Handles a log message.
362
363 """
364 if self._job_id is None:
365 self._job_id = job_id
366 elif self._job_id != job_id:
367 raise errors.ProgrammerError("The same reporter instance was used for"
368 " more than one job")
369
370 if log_type == constants.ELOG_JQUEUE_TEST:
371 (sockname, test, arg) = log_msg
372 return self._ProcessTestMessage(job_id, sockname, test, arg)
373
374 elif (log_type == constants.ELOG_MESSAGE and
375 log_msg.startswith(constants.JQT_MSGPREFIX)):
376 if self._testmsgs is None:
377 raise errors.OpExecError("Received test message without a preceding"
378 " start message")
379 testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
380 self._testmsgs.append(testmsg)
381 self._all_testmsgs.append(testmsg)
382 return
383
384 return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
385 timestamp, log_type,
386 log_msg)
387
389 """Handles a job queue test message.
390
391 """
392 if test not in constants.JQT_ALL:
393 raise errors.OpExecError("Received invalid test message %s" % test)
394
395 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
396 try:
397 sock.settimeout(30.0)
398
399 logging.debug("Connecting to %s", sockname)
400 sock.connect(sockname)
401
402 logging.debug("Checking status")
403 jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
404 if not jobdetails:
405 raise errors.OpExecError("Can't find job %s" % job_id)
406
407 status = jobdetails[0]
408
409 logging.debug("Status of job %s is %s", job_id, status)
410
411 if test == constants.JQT_EXPANDNAMES:
412 if status != constants.JOB_STATUS_WAITING:
413 raise errors.OpExecError("Job status while expanding names is '%s',"
414 " not '%s' as expected" %
415 (status, constants.JOB_STATUS_WAITING))
416 elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
417 if status != constants.JOB_STATUS_RUNNING:
418 raise errors.OpExecError("Job status while executing opcode is '%s',"
419 " not '%s' as expected" %
420 (status, constants.JOB_STATUS_RUNNING))
421
422 if test == constants.JQT_STARTMSG:
423 logging.debug("Expecting %s test messages", arg)
424 self._testmsgs = []
425 elif test == constants.JQT_LOGMSG:
426 if len(self._testmsgs) != arg:
427 raise errors.OpExecError("Received %s test messages when %s are"
428 " expected" % (len(self._testmsgs), arg))
429 finally:
430 logging.debug("Closing socket")
431 sock.close()
432
433
435 """Runs a few tests on the job queue.
436
437 """
438 _TestJobSubmission(opts)
439 _TestJobDependency(opts)
440
441 (TM_SUCCESS,
442 TM_MULTISUCCESS,
443 TM_FAIL,
444 TM_PARTFAIL) = range(4)
445 TM_ALL = compat.UniqueFrozenset([
446 TM_SUCCESS,
447 TM_MULTISUCCESS,
448 TM_FAIL,
449 TM_PARTFAIL,
450 ])
451
452 for mode in TM_ALL:
453 test_messages = [
454 "Testing mode %s" % mode,
455 "Hello World",
456 "A",
457 "",
458 "B"
459 "Foo|bar|baz",
460 utils.TimestampForFilename(),
461 ]
462
463 fail = mode in (TM_FAIL, TM_PARTFAIL)
464
465 if mode == TM_PARTFAIL:
466 ToStdout("Testing partial job failure")
467 ops = [
468 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
469 log_messages=test_messages, fail=False),
470 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
471 log_messages=test_messages, fail=False),
472 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
473 log_messages=test_messages, fail=True),
474 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
475 log_messages=test_messages, fail=False),
476 ]
477 expect_messages = 3 * [test_messages]
478 expect_opstatus = [
479 constants.OP_STATUS_SUCCESS,
480 constants.OP_STATUS_SUCCESS,
481 constants.OP_STATUS_ERROR,
482 constants.OP_STATUS_ERROR,
483 ]
484 expect_resultlen = 2
485 elif mode == TM_MULTISUCCESS:
486 ToStdout("Testing multiple successful opcodes")
487 ops = [
488 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
489 log_messages=test_messages, fail=False),
490 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
491 log_messages=test_messages, fail=False),
492 ]
493 expect_messages = 2 * [test_messages]
494 expect_opstatus = [
495 constants.OP_STATUS_SUCCESS,
496 constants.OP_STATUS_SUCCESS,
497 ]
498 expect_resultlen = 2
499 else:
500 if mode == TM_SUCCESS:
501 ToStdout("Testing job success")
502 expect_opstatus = [constants.OP_STATUS_SUCCESS]
503 elif mode == TM_FAIL:
504 ToStdout("Testing job failure")
505 expect_opstatus = [constants.OP_STATUS_ERROR]
506 else:
507 raise errors.ProgrammerError("Unknown test mode %s" % mode)
508
509 ops = [
510 opcodes.OpTestJqueue(notify_waitlock=True,
511 notify_exec=True,
512 log_messages=test_messages,
513 fail=fail),
514 ]
515 expect_messages = [test_messages]
516 expect_resultlen = 1
517
518 cl = cli.GetClient()
519 cli.SetGenericOpcodeOpts(ops, opts)
520
521
522 job_id = cli.SendJob(ops, cl=cl)
523
524 reporter = _JobQueueTestReporter()
525 results = None
526
527 try:
528 results = cli.PollJob(job_id, cl=cl, reporter=reporter)
529 except errors.OpExecError, err:
530 if not fail:
531 raise
532 ToStdout("Ignoring error for 'job fail' test: %s", err)
533 else:
534 if fail:
535 raise errors.OpExecError("Job didn't fail when it should")
536
537
538 if fail:
539 if results is not None:
540 raise errors.OpExecError("Received result from failed job")
541 elif len(results) != expect_resultlen:
542 raise errors.OpExecError("Received %s results (%s), expected %s" %
543 (len(results), results, expect_resultlen))
544
545
546 all_messages = [i for j in expect_messages for i in j]
547 if reporter.GetTestMessages() != all_messages:
548 raise errors.OpExecError("Received test messages don't match input"
549 " (input %r, received %r)" %
550 (all_messages, reporter.GetTestMessages()))
551
552
553 reported_job_id = reporter.GetJobId()
554 if reported_job_id != job_id:
555 raise errors.OpExecError("Reported job ID %s doesn't match"
556 "submission job ID %s" %
557 (reported_job_id, job_id))
558
559 jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
560 if not jobdetails:
561 raise errors.OpExecError("Can't find job %s" % job_id)
562
563 if fail:
564 exp_status = constants.JOB_STATUS_ERROR
565 else:
566 exp_status = constants.JOB_STATUS_SUCCESS
567
568 (final_status, final_opstatus) = jobdetails
569 if final_status != exp_status:
570 raise errors.OpExecError("Final job status is %s, not %s as expected" %
571 (final_status, exp_status))
572 if len(final_opstatus) != len(ops):
573 raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
574 " expected %s)" %
575 (len(final_opstatus), len(ops)))
576 if final_opstatus != expect_opstatus:
577 raise errors.OpExecError("Opcode status is %s, expected %s" %
578 (final_opstatus, expect_opstatus))
579
580 ToStdout("Job queue test successful")
581
582 return 0
583
584
586 """List all locks.
587
588 @param opts: the command line options selected by the user
589 @type args: list
590 @param args: should be an empty list
591 @rtype: int
592 @return: the desired exit code
593
594 """
595 selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
596
597 def _DashIfNone(fn):
598 def wrapper(value):
599 if not value:
600 return "-"
601 return fn(value)
602 return wrapper
603
604 def _FormatPending(value):
605 """Format pending acquires.
606
607 """
608 return utils.CommaJoin("%s:%s" % (mode, ",".join(map(str, threads)))
609 for mode, threads in value)
610
611
612 fmtoverride = {
613 "mode": (_DashIfNone(str), False),
614 "owner": (_DashIfNone(",".join), False),
615 "pending": (_DashIfNone(_FormatPending), False),
616 }
617
618 while True:
619 ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
620 opts.separator, not opts.no_headers,
621 format_override=fmtoverride, verbose=opts.verbose)
622
623 if ret != constants.EXIT_SUCCESS:
624 return ret
625
626 if not opts.interval:
627 break
628
629 ToStdout("")
630 time.sleep(opts.interval)
631
632 return 0
633
634
635 commands = {
636 "delay": (
637 Delay, [ArgUnknown(min=1, max=1)],
638 [cli_option("--no-master", dest="on_master", default=True,
639 action="store_false", help="Do not sleep in the master code"),
640 cli_option("-n", dest="on_nodes", default=[],
641 action="append", help="Select nodes to sleep on"),
642 cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
643 help="Number of times to repeat the sleep"),
644 cli_option("-l", "--no-locks", default=False, dest="no_locks",
645 action="store_true",
646 help="Don't take locks while performing the delay"),
647 DRY_RUN_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
648 "[opts...] <duration>", "Executes a TestDelay OpCode"),
649 "submit-job": (
650 GenericOpCodes, [ArgFile(min=1)],
651 [VERBOSE_OPT,
652 cli_option("--op-repeat", type="int", default="1", dest="rep_op",
653 help="Repeat the opcode sequence this number of times"),
654 cli_option("--job-repeat", type="int", default="1", dest="rep_job",
655 help="Repeat the job this number of times"),
656 cli_option("--timing-stats", default=False,
657 action="store_true", help="Show timing stats"),
658 cli_option("--each", default=False, action="store_true",
659 help="Submit each job separately"),
660 DRY_RUN_OPT, PRIORITY_OPT,
661 ],
662 "<op_list_file...>", "Submits jobs built from json files"
663 " containing a list of serialized opcodes"),
664 "iallocator": (
665 TestAllocator, [ArgUnknown(min=1)],
666 [cli_option("--dir", dest="direction", default=constants.IALLOCATOR_DIR_IN,
667 choices=list(constants.VALID_IALLOCATOR_DIRECTIONS),
668 help="Show allocator input (in) or allocator"
669 " results (out)"),
670 IALLOCATOR_OPT,
671 cli_option("-m", "--mode", default="relocate",
672 choices=list(constants.VALID_IALLOCATOR_MODES),
673 help=("Request mode (one of %s)" %
674 utils.CommaJoin(constants.VALID_IALLOCATOR_MODES))),
675 cli_option("--memory", default=128, type="unit",
676 help="Memory size for the instance (MiB)"),
677 cli_option("--disks", default="4096,4096",
678 help="Comma separated list of disk sizes (MiB)"),
679 DISK_TEMPLATE_OPT,
680 cli_option("--nics", default="00:11:22:33:44:55",
681 help="Comma separated list of nics, each nic"
682 " definition is of form mac/ip/bridge, if"
683 " missing values are replace by None"),
684 OS_OPT,
685 cli_option("-p", "--vcpus", default=1, type="int",
686 help="Select number of VCPUs for the instance"),
687 cli_option("--tags", default=None,
688 help="Comma separated list of tags"),
689 cli_option("--evac-mode", default=constants.NODE_EVAC_ALL,
690 choices=list(constants.NODE_EVAC_MODES),
691 help=("Node evacuation mode (one of %s)" %
692 utils.CommaJoin(constants.NODE_EVAC_MODES))),
693 cli_option("--target-groups", help="Target groups for relocation",
694 default=[], action="append"),
695 cli_option("--spindle-use", help="How many spindles to use",
696 default=1, type="int"),
697 cli_option("--count", help="How many instances to allocate",
698 default=2, type="int"),
699 DRY_RUN_OPT, PRIORITY_OPT,
700 ],
701 "{opts...} <instance>", "Executes a TestAllocator OpCode"),
702 "test-jobqueue": (
703 TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
704 "", "Test a few aspects of the job queue"),
705 "locks": (
706 ListLocks, ARGS_NONE,
707 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT, VERBOSE_OPT],
708 "[--interval N]", "Show a list of locks in the master daemon"),
709 }
710
711
712 aliases = {
713 "allocator": "iallocator",
714 }
715
716
719