1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 """Job related commands"""
31
32
33
34
35
36
37
38 from ganeti.cli import *
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import cli
43 from ganeti import qlang
44
45
46
47 _LIST_DEF_FIELDS = ["id", "status", "summary"]
48
49
50
51 _USER_JOB_STATUS = {
52 constants.JOB_STATUS_QUEUED: "queued",
53 constants.JOB_STATUS_WAITING: "waiting",
54 constants.JOB_STATUS_CANCELING: "canceling",
55 constants.JOB_STATUS_RUNNING: "running",
56 constants.JOB_STATUS_CANCELED: "canceled",
57 constants.JOB_STATUS_SUCCESS: "success",
58 constants.JOB_STATUS_ERROR: "error",
59 }
60
61
70
71
77
78
79 _JOB_LIST_FORMAT = {
80 "status": (_FormatStatus, False),
81 "summary": (_FormatSummary, False),
82 }
83 _JOB_LIST_FORMAT.update(dict.fromkeys(["opstart", "opexec", "opend"],
84 (lambda value: map(FormatTimestamp,
85 value),
86 None)))
87
88
90 """Parses a list of string job IDs into integers.
91
92 @param args: list of strings
93 @return: list of integers
94 @raise OpPrereqError: in case of invalid values
95
96 """
97 try:
98 return [int(a) for a in args]
99 except (ValueError, TypeError), err:
100 raise errors.OpPrereqError("Invalid job ID passed: %s" % err,
101 errors.ECODE_INVAL)
102
103
105 """List the jobs
106
107 @param opts: the command line options selected by the user
108 @type args: list
109 @param args: should be an empty list
110 @rtype: int
111 @return: the desired exit code
112
113 """
114 selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS)
115
116 if opts.archived and "archived" not in selected_fields:
117 selected_fields.append("archived")
118
119 qfilter = qlang.MakeSimpleFilter("status", opts.status_filter)
120
121 cl = GetClient()
122
123 return GenericList(constants.QR_JOB, selected_fields, args, None,
124 opts.separator, not opts.no_headers,
125 format_override=_JOB_LIST_FORMAT, verbose=opts.verbose,
126 force_filter=opts.force_filter, namefield="id",
127 qfilter=qfilter, isnumeric=True, cl=cl)
128
129
131 """List job fields.
132
133 @param opts: the command line options selected by the user
134 @type args: list
135 @param args: fields to list, or empty for all
136 @rtype: int
137 @return: the desired exit code
138
139 """
140 cl = GetClient()
141
142 return GenericListFields(constants.QR_JOB, args, opts.separator,
143 not opts.no_headers, cl=cl)
144
145
147 """Archive jobs.
148
149 @param opts: the command line options selected by the user
150 @type args: list
151 @param args: should contain the job IDs to be archived
152 @rtype: int
153 @return: the desired exit code
154
155 """
156 client = GetClient()
157
158 rcode = 0
159 for job_id in args:
160 if not client.ArchiveJob(job_id):
161 ToStderr("Failed to archive job with ID '%s'", job_id)
162 rcode = 1
163
164 return rcode
165
166
168 """Archive jobs based on age.
169
170 This will archive jobs based on their age, or all jobs if a 'all' is
171 passed.
172
173 @param opts: the command line options selected by the user
174 @type args: list
175 @param args: should contain only one element, the age as a time spec
176 that can be parsed by L{ganeti.cli.ParseTimespec} or the
177 keyword I{all}, which will cause all jobs to be archived
178 @rtype: int
179 @return: the desired exit code
180
181 """
182 client = GetClient()
183
184 age = args[0]
185
186 if age == "all":
187 age = -1
188 else:
189 age = ParseTimespec(age)
190
191 (archived_count, jobs_left) = client.AutoArchiveJobs(age)
192 ToStdout("Archived %s jobs, %s unchecked left", archived_count, jobs_left)
193
194 return 0
195
196
197 -def _MultiJobAction(opts, args, cl, stdout_fn, ask_fn, question, action_fn):
198 """Applies a function to multipe jobs.
199
200 @param opts: Command line options
201 @type args: list
202 @param args: Job IDs
203 @rtype: int
204 @return: Exit code
205
206 """
207 if cl is None:
208 cl = GetClient()
209
210 if stdout_fn is None:
211 stdout_fn = ToStdout
212
213 if ask_fn is None:
214 ask_fn = AskUser
215
216 result = constants.EXIT_SUCCESS
217
218 if bool(args) ^ (opts.status_filter is None):
219 raise errors.OpPrereqError("Either a status filter or job ID(s) must be"
220 " specified and never both", errors.ECODE_INVAL)
221
222 if opts.status_filter is not None:
223 response = cl.Query(constants.QR_JOB, ["id", "status", "summary"],
224 qlang.MakeSimpleFilter("status", opts.status_filter))
225
226 jobs = [i for ((_, i), _, _) in response.data]
227 if not jobs:
228 raise errors.OpPrereqError("No jobs with the requested status have been"
229 " found", errors.ECODE_STATE)
230
231 if not opts.force:
232 (_, table) = FormatQueryResult(response, header=True,
233 format_override=_JOB_LIST_FORMAT)
234 for line in table:
235 stdout_fn(line)
236
237 if not ask_fn(question):
238 return constants.EXIT_CONFIRMATION
239 else:
240 jobs = args
241
242 for job_id in jobs:
243 (success, msg) = action_fn(cl, job_id)
244
245 if not success:
246 result = constants.EXIT_FAILURE
247
248 stdout_fn(msg)
249
250 return result
251
252
254 """Cancel not-yet-started jobs.
255
256 @param opts: the command line options selected by the user
257 @type args: list
258 @param args: should contain the job IDs to be cancelled
259 @rtype: int
260 @return: the desired exit code
261
262 """
263 return _MultiJobAction(opts, args, cl, _stdout_fn, _ask_fn,
264 "Cancel job(s) listed above?",
265 lambda cl, job_id: cl.CancelJob(job_id))
266
267
269 """Change priority of jobs.
270
271 @param opts: Command line options
272 @type args: list
273 @param args: Job IDs
274 @rtype: int
275 @return: Exit code
276
277 """
278 if opts.priority is None:
279 ToStderr("--priority option must be given.")
280 return constants.EXIT_FAILURE
281
282 return _MultiJobAction(opts, args, None, None, None,
283 "Change priority of job(s) listed above?",
284 lambda cl, job_id:
285 cl.ChangeJobPriority(job_id, opts.priority))
286
287
289 """ Adds the opcode timestamp to the given container.
290
291 """
292 if isinstance(ts, (tuple, list)):
293 container.append((name, FormatTimestamp(ts), "opcode_timestamp"))
294 else:
295 container.append((name, "N/A", "opcode_timestamp"))
296
297
299 """ Calculates the delta between two timestamps.
300
301 """
302 return to_ts[0] - from_ts[0] + (to_ts[1] - from_ts[1]) / 1000000.0
303
304
306 """ Adds the job timestamp to the given container.
307
308 @param prior_ts: The timestamp used to calculate the amount of time that
309 passed since the given timestamp.
310
311 """
312 if ts is not None:
313 delta = ""
314 if prior_ts is not None:
315 delta = " (delta %.6fs)" % _CalcDelta(prior_ts, ts)
316 output = "%s%s" % (FormatTimestamp(ts), delta)
317 container.append((name, output, "job_timestamp"))
318 else:
319 container.append((name, "unknown (%s)" % str(ts), "job_timestamp"))
320
321
323 """Show detailed information about jobs.
324
325 @param opts: the command line options selected by the user
326 @type args: list
327 @param args: should contain the job IDs to be queried
328 @rtype: int
329 @return: the desired exit code
330
331 """
332 selected_fields = [
333 "id", "status", "ops", "opresult", "opstatus", "oplog",
334 "opstart", "opexec", "opend", "received_ts", "start_ts", "end_ts",
335 ]
336
337 qfilter = qlang.MakeSimpleFilter("id", _ParseJobIds(args))
338 cl = GetClient()
339 result = cl.Query(constants.QR_JOB, selected_fields, qfilter).data
340
341 job_info_container = []
342
343 for entry in result:
344 ((_, job_id), (rs_status, status), (_, ops), (_, opresult), (_, opstatus),
345 (_, oplog), (_, opstart), (_, opexec), (_, opend), (_, recv_ts),
346 (_, start_ts), (_, end_ts)) = entry
347
348
349 if rs_status != constants.RS_NORMAL:
350 job_info_container.append("Job ID %s not found" % job_id)
351 continue
352
353
354 job_info = [("Job ID", job_id)]
355
356 if status in _USER_JOB_STATUS:
357 status = _USER_JOB_STATUS[status]
358 else:
359 raise errors.ProgrammerError("Unknown job status code '%s'" % status)
360
361 job_info.append(("Status", status))
362
363 _ListJobTimestamp("Received", recv_ts, job_info)
364 _ListJobTimestamp("Processing start", start_ts, job_info, prior_ts=recv_ts)
365 _ListJobTimestamp("Processing end", end_ts, job_info, prior_ts=start_ts)
366
367 if end_ts is not None and recv_ts is not None:
368 job_info.append(("Total processing time", "%.6f seconds" %
369 _CalcDelta(recv_ts, end_ts)))
370 else:
371 job_info.append(("Total processing time", "N/A"))
372
373 opcode_container = []
374 for (opcode, result, status, log, s_ts, x_ts, e_ts) in \
375 zip(ops, opresult, opstatus, oplog, opstart, opexec, opend):
376 opcode_info = []
377 opcode_info.append(("Opcode", opcode["OP_ID"]))
378 opcode_info.append(("Status", status))
379
380 _ListOpcodeTimestamp("Processing start", s_ts, opcode_info)
381 _ListOpcodeTimestamp("Execution start", x_ts, opcode_info)
382 _ListOpcodeTimestamp("Processing end", e_ts, opcode_info)
383
384 opcode_info.append(("Input fields", opcode))
385 opcode_info.append(("Result", result))
386
387 exec_log_container = []
388 for serial, log_ts, log_type, log_msg in log:
389 time_txt = FormatTimestamp(log_ts)
390 encoded = FormatLogMessage(log_type, log_msg)
391
392
393
394
395 exec_log_info = [
396 ("Time", time_txt),
397 ("Content", (serial, log_type, encoded,)),
398 ]
399 exec_log_container.append(exec_log_info)
400 opcode_info.append(("Execution log", exec_log_container))
401
402 opcode_container.append(opcode_info)
403
404 job_info.append(("Opcodes", opcode_container))
405 job_info_container.append(job_info)
406
407 PrintGenericInfo(job_info_container)
408
409 return 0
410
411
413 """Follow a job and print its output as it arrives.
414
415 @param opts: the command line options selected by the user
416 @type args: list
417 @param args: Contains the job ID
418 @rtype: int
419 @return: the desired exit code
420
421 """
422 job_id = args[0]
423
424 msg = ("Output from job %s follows" % job_id)
425 ToStdout(msg)
426 ToStdout("-" * len(msg))
427
428 retcode = 0
429 try:
430 cli.PollJob(job_id)
431 except errors.GenericError, err:
432 (retcode, job_result) = cli.FormatError(err)
433 ToStderr("Job %s failed: %s", job_id, job_result)
434
435 return retcode
436
437
439 """Wait for a job to finish, not producing any output.
440
441 @param opts: the command line options selected by the user
442 @type args: list
443 @param args: Contains the job ID
444 @rtype: int
445 @return: the desired exit code
446
447 """
448 job_id = args[0]
449
450 retcode = 0
451 try:
452 cli.PollJob(job_id, feedback_fn=lambda _: None)
453 except errors.GenericError, err:
454 (retcode, job_result) = cli.FormatError(err)
455 ToStderr("Job %s failed: %s", job_id, job_result)
456
457 return retcode
458
459
460 _PENDING_OPT = \
461 cli_option("--pending", default=None,
462 action="store_const", dest="status_filter",
463 const=constants.JOBS_PENDING,
464 help="Select jobs pending execution or being cancelled")
465
466 _RUNNING_OPT = \
467 cli_option("--running", default=None,
468 action="store_const", dest="status_filter",
469 const=frozenset([
470 constants.JOB_STATUS_RUNNING,
471 ]),
472 help="Show jobs currently running only")
473
474 _ERROR_OPT = \
475 cli_option("--error", default=None,
476 action="store_const", dest="status_filter",
477 const=frozenset([
478 constants.JOB_STATUS_ERROR,
479 ]),
480 help="Show failed jobs only")
481
482 _FINISHED_OPT = \
483 cli_option("--finished", default=None,
484 action="store_const", dest="status_filter",
485 const=constants.JOBS_FINALIZED,
486 help="Show finished jobs only")
487
488 _ARCHIVED_OPT = \
489 cli_option("--archived", default=False,
490 action="store_true", dest="archived",
491 help="Include archived jobs in list (slow and expensive)")
492
493 _QUEUED_OPT = \
494 cli_option("--queued", default=None,
495 action="store_const", dest="status_filter",
496 const=frozenset([
497 constants.JOB_STATUS_QUEUED,
498 ]),
499 help="Select queued jobs only")
500
501 _WAITING_OPT = \
502 cli_option("--waiting", default=None,
503 action="store_const", dest="status_filter",
504 const=frozenset([
505 constants.JOB_STATUS_WAITING,
506 ]),
507 help="Select waiting jobs only")
508
509
510 commands = {
511 "list": (
512 ListJobs, [ArgJobId()],
513 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, VERBOSE_OPT, FORCE_FILTER_OPT,
514 _PENDING_OPT, _RUNNING_OPT, _ERROR_OPT, _FINISHED_OPT, _ARCHIVED_OPT],
515 "[job_id ...]",
516 "Lists the jobs and their status. The available fields can be shown"
517 " using the \"list-fields\" command (see the man page for details)."
518 " The default field list is (in order): %s." %
519 utils.CommaJoin(_LIST_DEF_FIELDS)),
520 "list-fields": (
521 ListJobFields, [ArgUnknown()],
522 [NOHDR_OPT, SEP_OPT],
523 "[fields...]",
524 "Lists all available fields for jobs"),
525 "archive": (
526 ArchiveJobs, [ArgJobId(min=1)], [],
527 "<job-id> [<job-id> ...]", "Archive specified jobs"),
528 "autoarchive": (
529 AutoArchiveJobs,
530 [ArgSuggest(min=1, max=1, choices=["1d", "1w", "4w", "all"])],
531 [],
532 "<age>", "Auto archive jobs older than the given age"),
533 "cancel": (
534 CancelJobs, [ArgJobId()],
535 [FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT],
536 "{[--force] {--pending | --queued | --waiting} |"
537 " <job-id> [<job-id> ...]}",
538 "Cancel jobs"),
539 "info": (
540 ShowJobs, [ArgJobId(min=1)], [],
541 "<job-id> [<job-id> ...]",
542 "Show detailed information about the specified jobs"),
543 "wait": (
544 WaitJob, [ArgJobId(min=1, max=1)], [],
545 "<job-id>", "Wait for a job to finish"),
546 "watch": (
547 WatchJob, [ArgJobId(min=1, max=1)], [],
548 "<job-id>", "Follows a job and prints its output as it arrives"),
549 "change-priority": (
550 ChangePriority, [ArgJobId()],
551 [PRIORITY_OPT, FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT],
552 "--priority <priority> {[--force] {--pending | --queued | --waiting} |"
553 " <job-id> [<job-id> ...]}",
554 "Change the priority of jobs"),
555 }
556
557
558
559 aliases = {
560 "show": "info",
561 }
562
563
566