1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 """Job related commands"""
31
32
33
34
35
36
37
38 from ganeti.cli import *
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import cli
43 from ganeti import qlang
44
45
46
47 _LIST_DEF_FIELDS = ["id", "status", "summary"]
48
49
50
51 _USER_JOB_STATUS = {
52 constants.JOB_STATUS_QUEUED: "queued",
53 constants.JOB_STATUS_WAITING: "waiting",
54 constants.JOB_STATUS_CANCELING: "canceling",
55 constants.JOB_STATUS_RUNNING: "running",
56 constants.JOB_STATUS_CANCELED: "canceled",
57 constants.JOB_STATUS_SUCCESS: "success",
58 constants.JOB_STATUS_ERROR: "error",
59 }
60
61
70
71
77
78
79 _JOB_LIST_FORMAT = {
80 "status": (_FormatStatus, False),
81 "summary": (_FormatSummary, False),
82 }
83 _JOB_LIST_FORMAT.update(dict.fromkeys(["opstart", "opexec", "opend"],
84 (lambda value: map(FormatTimestamp,
85 value),
86 None)))
87
88
90 """Parses a list of string job IDs into integers.
91
92 @param args: list of strings
93 @return: list of integers
94 @raise OpPrereqError: in case of invalid values
95
96 """
97 try:
98 return [int(a) for a in args]
99 except (ValueError, TypeError), err:
100 raise errors.OpPrereqError("Invalid job ID passed: %s" % err,
101 errors.ECODE_INVAL)
102
103
105 """List the jobs
106
107 @param opts: the command line options selected by the user
108 @type args: list
109 @param args: should be an empty list
110 @rtype: int
111 @return: the desired exit code
112
113 """
114 selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS)
115
116 if opts.archived and "archived" not in selected_fields:
117 selected_fields.append("archived")
118
119 qfilter = qlang.MakeSimpleFilter("status", opts.status_filter)
120
121 cl = GetClient(query=True)
122
123 return GenericList(constants.QR_JOB, selected_fields, args, None,
124 opts.separator, not opts.no_headers,
125 format_override=_JOB_LIST_FORMAT, verbose=opts.verbose,
126 force_filter=opts.force_filter, namefield="id",
127 qfilter=qfilter, isnumeric=True, cl=cl)
128
129
131 """List job fields.
132
133 @param opts: the command line options selected by the user
134 @type args: list
135 @param args: fields to list, or empty for all
136 @rtype: int
137 @return: the desired exit code
138
139 """
140 cl = GetClient(query=True)
141
142 return GenericListFields(constants.QR_JOB, args, opts.separator,
143 not opts.no_headers, cl=cl)
144
145
147 """Archive jobs.
148
149 @param opts: the command line options selected by the user
150 @type args: list
151 @param args: should contain the job IDs to be archived
152 @rtype: int
153 @return: the desired exit code
154
155 """
156 client = GetClient()
157
158 rcode = 0
159 for job_id in args:
160 if not client.ArchiveJob(job_id):
161 ToStderr("Failed to archive job with ID '%s'", job_id)
162 rcode = 1
163
164 return rcode
165
166
168 """Archive jobs based on age.
169
170 This will archive jobs based on their age, or all jobs if a 'all' is
171 passed.
172
173 @param opts: the command line options selected by the user
174 @type args: list
175 @param args: should contain only one element, the age as a time spec
176 that can be parsed by L{ganeti.cli.ParseTimespec} or the
177 keyword I{all}, which will cause all jobs to be archived
178 @rtype: int
179 @return: the desired exit code
180
181 """
182 client = GetClient()
183
184 age = args[0]
185
186 if age == "all":
187 age = -1
188 else:
189 age = ParseTimespec(age)
190
191 (archived_count, jobs_left) = client.AutoArchiveJobs(age)
192 ToStdout("Archived %s jobs, %s unchecked left", archived_count, jobs_left)
193
194 return 0
195
196
197 -def _MultiJobAction(opts, args, cl, stdout_fn, ask_fn, question, action_fn):
198 """Applies a function to multipe jobs.
199
200 @param opts: Command line options
201 @type args: list
202 @param args: Job IDs
203 @rtype: int
204 @return: Exit code
205
206 """
207 if cl is None:
208 cl = GetClient()
209
210 if stdout_fn is None:
211 stdout_fn = ToStdout
212
213 if ask_fn is None:
214 ask_fn = AskUser
215
216 result = constants.EXIT_SUCCESS
217
218 if bool(args) ^ (opts.status_filter is None):
219 raise errors.OpPrereqError("Either a status filter or job ID(s) must be"
220 " specified and never both", errors.ECODE_INVAL)
221
222 if opts.status_filter is not None:
223 response = cl.Query(constants.QR_JOB, ["id", "status", "summary"],
224 qlang.MakeSimpleFilter("status", opts.status_filter))
225
226 jobs = [i for ((_, i), _, _) in response.data]
227 if not jobs:
228 raise errors.OpPrereqError("No jobs with the requested status have been"
229 " found", errors.ECODE_STATE)
230
231 if not opts.force:
232 (_, table) = FormatQueryResult(response, header=True,
233 format_override=_JOB_LIST_FORMAT)
234 for line in table:
235 stdout_fn(line)
236
237 if not ask_fn(question):
238 return constants.EXIT_CONFIRMATION
239 else:
240 jobs = args
241
242 for job_id in jobs:
243 (success, msg) = action_fn(cl, job_id)
244
245 if not success:
246 result = constants.EXIT_FAILURE
247
248 stdout_fn(msg)
249
250 return result
251
252
254 """Cancel not-yet-started jobs.
255
256 @param opts: the command line options selected by the user
257 @type args: list
258 @param args: should contain the job IDs to be cancelled
259 @rtype: int
260 @return: the desired exit code
261
262 """
263 return _MultiJobAction(opts, args, cl, _stdout_fn, _ask_fn,
264 "Cancel job(s) listed above?",
265 lambda cl, job_id: cl.CancelJob(job_id))
266
267
269 """Change priority of jobs.
270
271 @param opts: Command line options
272 @type args: list
273 @param args: Job IDs
274 @rtype: int
275 @return: Exit code
276
277 """
278 if opts.priority is None:
279 ToStderr("--priority option must be given.")
280 return constants.EXIT_FAILURE
281
282 return _MultiJobAction(opts, args, None, None, None,
283 "Change priority of job(s) listed above?",
284 lambda cl, job_id:
285 cl.ChangeJobPriority(job_id, opts.priority))
286
287
289 """Show detailed information about jobs.
290
291 @param opts: the command line options selected by the user
292 @type args: list
293 @param args: should contain the job IDs to be queried
294 @rtype: int
295 @return: the desired exit code
296
297 """
298 def format_msg(level, text):
299 """Display the text indented."""
300 ToStdout("%s%s", " " * level, text)
301
302 def result_helper(value):
303 """Format a result field in a nice way."""
304 if isinstance(value, (tuple, list)):
305 return "[%s]" % utils.CommaJoin(value)
306 else:
307 return str(value)
308
309 selected_fields = [
310 "id", "status", "ops", "opresult", "opstatus", "oplog",
311 "opstart", "opexec", "opend", "received_ts", "start_ts", "end_ts",
312 ]
313
314 qfilter = qlang.MakeSimpleFilter("id", _ParseJobIds(args))
315 cl = GetClient(query=True)
316 result = cl.Query(constants.QR_JOB, selected_fields, qfilter).data
317
318 first = True
319
320 for entry in result:
321 if not first:
322 format_msg(0, "")
323 else:
324 first = False
325
326 ((_, job_id), (rs_status, status), (_, ops), (_, opresult), (_, opstatus),
327 (_, oplog), (_, opstart), (_, opexec), (_, opend), (_, recv_ts),
328 (_, start_ts), (_, end_ts)) = entry
329
330
331 if rs_status != constants.RS_NORMAL:
332 format_msg(0, "Job ID %s not found" % job_id)
333 continue
334
335 format_msg(0, "Job ID: %s" % job_id)
336 if status in _USER_JOB_STATUS:
337 status = _USER_JOB_STATUS[status]
338 else:
339 raise errors.ProgrammerError("Unknown job status code '%s'" % status)
340
341 format_msg(1, "Status: %s" % status)
342
343 if recv_ts is not None:
344 format_msg(1, "Received: %s" % FormatTimestamp(recv_ts))
345 else:
346 format_msg(1, "Missing received timestamp (%s)" % str(recv_ts))
347
348 if start_ts is not None:
349 if recv_ts is not None:
350 d1 = start_ts[0] - recv_ts[0] + (start_ts[1] - recv_ts[1]) / 1000000.0
351 delta = " (delta %.6fs)" % d1
352 else:
353 delta = ""
354 format_msg(1, "Processing start: %s%s" %
355 (FormatTimestamp(start_ts), delta))
356 else:
357 format_msg(1, "Processing start: unknown (%s)" % str(start_ts))
358
359 if end_ts is not None:
360 if start_ts is not None:
361 d2 = end_ts[0] - start_ts[0] + (end_ts[1] - start_ts[1]) / 1000000.0
362 delta = " (delta %.6fs)" % d2
363 else:
364 delta = ""
365 format_msg(1, "Processing end: %s%s" %
366 (FormatTimestamp(end_ts), delta))
367 else:
368 format_msg(1, "Processing end: unknown (%s)" % str(end_ts))
369
370 if end_ts is not None and recv_ts is not None:
371 d3 = end_ts[0] - recv_ts[0] + (end_ts[1] - recv_ts[1]) / 1000000.0
372 format_msg(1, "Total processing time: %.6f seconds" % d3)
373 else:
374 format_msg(1, "Total processing time: N/A")
375 format_msg(1, "Opcodes:")
376 for (opcode, result, status, log, s_ts, x_ts, e_ts) in \
377 zip(ops, opresult, opstatus, oplog, opstart, opexec, opend):
378 format_msg(2, "%s" % opcode["OP_ID"])
379 format_msg(3, "Status: %s" % status)
380 if isinstance(s_ts, (tuple, list)):
381 format_msg(3, "Processing start: %s" % FormatTimestamp(s_ts))
382 else:
383 format_msg(3, "No processing start time")
384 if isinstance(x_ts, (tuple, list)):
385 format_msg(3, "Execution start: %s" % FormatTimestamp(x_ts))
386 else:
387 format_msg(3, "No execution start time")
388 if isinstance(e_ts, (tuple, list)):
389 format_msg(3, "Processing end: %s" % FormatTimestamp(e_ts))
390 else:
391 format_msg(3, "No processing end time")
392 format_msg(3, "Input fields:")
393 for key in utils.NiceSort(opcode.keys()):
394 if key == "OP_ID":
395 continue
396 val = opcode[key]
397 if isinstance(val, (tuple, list)):
398 val = ",".join([str(item) for item in val])
399 format_msg(4, "%s: %s" % (key, val))
400 if result is None:
401 format_msg(3, "No output data")
402 elif isinstance(result, (tuple, list)):
403 if not result:
404 format_msg(3, "Result: empty sequence")
405 else:
406 format_msg(3, "Result:")
407 for elem in result:
408 format_msg(4, result_helper(elem))
409 elif isinstance(result, dict):
410 if not result:
411 format_msg(3, "Result: empty dictionary")
412 else:
413 format_msg(3, "Result:")
414 for key, val in result.iteritems():
415 format_msg(4, "%s: %s" % (key, result_helper(val)))
416 else:
417 format_msg(3, "Result: %s" % result)
418 format_msg(3, "Execution log:")
419 for serial, log_ts, log_type, log_msg in log:
420 time_txt = FormatTimestamp(log_ts)
421 encoded = FormatLogMessage(log_type, log_msg)
422 format_msg(4, "%s:%s:%s %s" % (serial, time_txt, log_type, encoded))
423 return 0
424
425
427 """Follow a job and print its output as it arrives.
428
429 @param opts: the command line options selected by the user
430 @type args: list
431 @param args: Contains the job ID
432 @rtype: int
433 @return: the desired exit code
434
435 """
436 job_id = args[0]
437
438 msg = ("Output from job %s follows" % job_id)
439 ToStdout(msg)
440 ToStdout("-" * len(msg))
441
442 retcode = 0
443 try:
444 cli.PollJob(job_id)
445 except errors.GenericError, err:
446 (retcode, job_result) = cli.FormatError(err)
447 ToStderr("Job %s failed: %s", job_id, job_result)
448
449 return retcode
450
451
453 """Wait for a job to finish, not producing any output.
454
455 @param opts: the command line options selected by the user
456 @type args: list
457 @param args: Contains the job ID
458 @rtype: int
459 @return: the desired exit code
460
461 """
462 job_id = args[0]
463
464 retcode = 0
465 try:
466 cli.PollJob(job_id, feedback_fn=lambda _: None)
467 except errors.GenericError, err:
468 (retcode, job_result) = cli.FormatError(err)
469 ToStderr("Job %s failed: %s", job_id, job_result)
470
471 return retcode
472
473
474 _PENDING_OPT = \
475 cli_option("--pending", default=None,
476 action="store_const", dest="status_filter",
477 const=constants.JOBS_PENDING,
478 help="Select jobs pending execution or being cancelled")
479
480 _RUNNING_OPT = \
481 cli_option("--running", default=None,
482 action="store_const", dest="status_filter",
483 const=frozenset([
484 constants.JOB_STATUS_RUNNING,
485 ]),
486 help="Show jobs currently running only")
487
488 _ERROR_OPT = \
489 cli_option("--error", default=None,
490 action="store_const", dest="status_filter",
491 const=frozenset([
492 constants.JOB_STATUS_ERROR,
493 ]),
494 help="Show failed jobs only")
495
496 _FINISHED_OPT = \
497 cli_option("--finished", default=None,
498 action="store_const", dest="status_filter",
499 const=constants.JOBS_FINALIZED,
500 help="Show finished jobs only")
501
502 _ARCHIVED_OPT = \
503 cli_option("--archived", default=False,
504 action="store_true", dest="archived",
505 help="Include archived jobs in list (slow and expensive)")
506
507 _QUEUED_OPT = \
508 cli_option("--queued", default=None,
509 action="store_const", dest="status_filter",
510 const=frozenset([
511 constants.JOB_STATUS_QUEUED,
512 ]),
513 help="Select queued jobs only")
514
515 _WAITING_OPT = \
516 cli_option("--waiting", default=None,
517 action="store_const", dest="status_filter",
518 const=frozenset([
519 constants.JOB_STATUS_WAITING,
520 ]),
521 help="Select waiting jobs only")
522
523
524 commands = {
525 "list": (
526 ListJobs, [ArgJobId()],
527 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, VERBOSE_OPT, FORCE_FILTER_OPT,
528 _PENDING_OPT, _RUNNING_OPT, _ERROR_OPT, _FINISHED_OPT, _ARCHIVED_OPT],
529 "[job_id ...]",
530 "Lists the jobs and their status. The available fields can be shown"
531 " using the \"list-fields\" command (see the man page for details)."
532 " The default field list is (in order): %s." %
533 utils.CommaJoin(_LIST_DEF_FIELDS)),
534 "list-fields": (
535 ListJobFields, [ArgUnknown()],
536 [NOHDR_OPT, SEP_OPT],
537 "[fields...]",
538 "Lists all available fields for jobs"),
539 "archive": (
540 ArchiveJobs, [ArgJobId(min=1)], [],
541 "<job-id> [<job-id> ...]", "Archive specified jobs"),
542 "autoarchive": (
543 AutoArchiveJobs,
544 [ArgSuggest(min=1, max=1, choices=["1d", "1w", "4w", "all"])],
545 [],
546 "<age>", "Auto archive jobs older than the given age"),
547 "cancel": (
548 CancelJobs, [ArgJobId()],
549 [FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT],
550 "{[--force] {--pending | --queued | --waiting} |"
551 " <job-id> [<job-id> ...]}",
552 "Cancel jobs"),
553 "info": (
554 ShowJobs, [ArgJobId(min=1)], [],
555 "<job-id> [<job-id> ...]",
556 "Show detailed information about the specified jobs"),
557 "wait": (
558 WaitJob, [ArgJobId(min=1, max=1)], [],
559 "<job-id>", "Wait for a job to finish"),
560 "watch": (
561 WatchJob, [ArgJobId(min=1, max=1)], [],
562 "<job-id>", "Follows a job and prints its output as it arrives"),
563 "change-priority": (
564 ChangePriority, [ArgJobId()],
565 [PRIORITY_OPT, FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT],
566 "--priority <priority> {[--force] {--pending | --queued | --waiting} |"
567 " <job-id> [<job-id> ...]}",
568 "Change the priority of jobs"),
569 }
570
571
572
573 aliases = {
574 "show": "info",
575 }
576
577
580