1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 """Job related commands"""
31
32
33
34
35
36
37
38 from ganeti.cli import *
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import cli
43 from ganeti import qlang
44
45
46
47 _LIST_DEF_FIELDS = ["id", "status", "summary"]
48
49
50
51 _USER_JOB_STATUS = {
52 constants.JOB_STATUS_QUEUED: "queued",
53 constants.JOB_STATUS_WAITING: "waiting",
54 constants.JOB_STATUS_CANCELING: "canceling",
55 constants.JOB_STATUS_RUNNING: "running",
56 constants.JOB_STATUS_CANCELED: "canceled",
57 constants.JOB_STATUS_SUCCESS: "success",
58 constants.JOB_STATUS_ERROR: "error",
59 }
60
61
70
71
77
78
79 _JOB_LIST_FORMAT = {
80 "status": (_FormatStatus, False),
81 "summary": (_FormatSummary, False),
82 }
83 _JOB_LIST_FORMAT.update(dict.fromkeys(["opstart", "opexec", "opend"],
84 (lambda value: map(FormatTimestamp,
85 value),
86 None)))
87
88
90 """Parses a list of string job IDs into integers.
91
92 @param args: list of strings
93 @return: list of integers
94 @raise OpPrereqError: in case of invalid values
95
96 """
97 try:
98 return [int(a) for a in args]
99 except (ValueError, TypeError), err:
100 raise errors.OpPrereqError("Invalid job ID passed: %s" % err,
101 errors.ECODE_INVAL)
102
103
105 """List the jobs
106
107 @param opts: the command line options selected by the user
108 @type args: list
109 @param args: should be an empty list
110 @rtype: int
111 @return: the desired exit code
112
113 """
114 selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS)
115
116 if opts.archived and "archived" not in selected_fields:
117 selected_fields.append("archived")
118
119 qfilter = qlang.MakeSimpleFilter("status", opts.status_filter)
120
121 cl = GetClient()
122
123 return GenericList(constants.QR_JOB, selected_fields, args, None,
124 opts.separator, not opts.no_headers,
125 format_override=_JOB_LIST_FORMAT, verbose=opts.verbose,
126 force_filter=opts.force_filter, namefield="id",
127 qfilter=qfilter, isnumeric=True, cl=cl)
128
129
131 """List job fields.
132
133 @param opts: the command line options selected by the user
134 @type args: list
135 @param args: fields to list, or empty for all
136 @rtype: int
137 @return: the desired exit code
138
139 """
140 cl = GetClient()
141
142 return GenericListFields(constants.QR_JOB, args, opts.separator,
143 not opts.no_headers, cl=cl)
144
145
147 """Archive jobs.
148
149 @param opts: the command line options selected by the user
150 @type args: list
151 @param args: should contain the job IDs to be archived
152 @rtype: int
153 @return: the desired exit code
154
155 """
156 client = GetClient()
157
158 rcode = 0
159 for job_id in args:
160 if not client.ArchiveJob(job_id):
161 ToStderr("Failed to archive job with ID '%s'", job_id)
162 rcode = 1
163
164 return rcode
165
166
168 """Archive jobs based on age.
169
170 This will archive jobs based on their age, or all jobs if a 'all' is
171 passed.
172
173 @param opts: the command line options selected by the user
174 @type args: list
175 @param args: should contain only one element, the age as a time spec
176 that can be parsed by L{ganeti.cli.ParseTimespec} or the
177 keyword I{all}, which will cause all jobs to be archived
178 @rtype: int
179 @return: the desired exit code
180
181 """
182 client = GetClient()
183
184 age = args[0]
185
186 if age == "all":
187 age = -1
188 else:
189 age = ParseTimespec(age)
190
191 (archived_count, jobs_left) = client.AutoArchiveJobs(age)
192 ToStdout("Archived %s jobs, %s unchecked left", archived_count, jobs_left)
193
194 return 0
195
196
197 -def _MultiJobAction(opts, args, cl, stdout_fn, ask_fn, question, action_fn):
198 """Applies a function to multipe jobs.
199
200 @param opts: Command line options
201 @type args: list
202 @param args: Job IDs
203 @rtype: int
204 @return: Exit code
205
206 """
207 if cl is None:
208 cl = GetClient()
209
210 if stdout_fn is None:
211 stdout_fn = ToStdout
212
213 if ask_fn is None:
214 ask_fn = AskUser
215
216 result = constants.EXIT_SUCCESS
217
218 if bool(args) ^ (opts.status_filter is None):
219 raise errors.OpPrereqError("Either a status filter or job ID(s) must be"
220 " specified and never both", errors.ECODE_INVAL)
221
222 if opts.status_filter is not None:
223 response = cl.Query(constants.QR_JOB, ["id", "status", "summary"],
224 qlang.MakeSimpleFilter("status", opts.status_filter))
225
226 jobs = [i for ((_, i), _, _) in response.data]
227 if not jobs:
228 raise errors.OpPrereqError("No jobs with the requested status have been"
229 " found", errors.ECODE_STATE)
230
231 if not opts.force:
232 (_, table) = FormatQueryResult(response, header=True,
233 format_override=_JOB_LIST_FORMAT)
234 for line in table:
235 stdout_fn(line)
236
237 if not ask_fn(question):
238 return constants.EXIT_CONFIRMATION
239 else:
240 jobs = args
241
242 for job_id in jobs:
243 (success, msg) = action_fn(cl, job_id)
244
245 if not success:
246 result = constants.EXIT_FAILURE
247
248 stdout_fn(msg)
249
250 return result
251
252
254 """Cancel not-yet-started jobs.
255
256 @param opts: the command line options selected by the user
257 @type args: list
258 @param args: should contain the job IDs to be cancelled
259 @rtype: int
260 @return: the desired exit code
261
262 """
263 if opts.kill:
264 action_name = "KILL"
265 if not opts.yes_do_it:
266 raise errors.OpPrereqError("The --kill option must be confirmed"
267 " with --yes-do-it", errors.ECODE_INVAL)
268 else:
269 action_name = "Cancel"
270 return _MultiJobAction(opts, args, cl, _stdout_fn, _ask_fn,
271 "%s job(s) listed above?" % action_name,
272 lambda cl, job_id: cl.CancelJob(job_id,
273 kill=opts.kill))
274
275
277 """Change priority of jobs.
278
279 @param opts: Command line options
280 @type args: list
281 @param args: Job IDs
282 @rtype: int
283 @return: Exit code
284
285 """
286 if opts.priority is None:
287 ToStderr("--priority option must be given.")
288 return constants.EXIT_FAILURE
289
290 return _MultiJobAction(opts, args, None, None, None,
291 "Change priority of job(s) listed above?",
292 lambda cl, job_id:
293 cl.ChangeJobPriority(job_id, opts.priority))
294
295
297 """ Adds the opcode timestamp to the given container.
298
299 """
300 if isinstance(ts, (tuple, list)):
301 container.append((name, FormatTimestamp(ts), "opcode_timestamp"))
302 else:
303 container.append((name, "N/A", "opcode_timestamp"))
304
305
307 """ Calculates the delta between two timestamps.
308
309 """
310 return to_ts[0] - from_ts[0] + (to_ts[1] - from_ts[1]) / 1000000.0
311
312
314 """ Adds the job timestamp to the given container.
315
316 @param prior_ts: The timestamp used to calculate the amount of time that
317 passed since the given timestamp.
318
319 """
320 if ts is not None:
321 delta = ""
322 if prior_ts is not None:
323 delta = " (delta %.6fs)" % _CalcDelta(prior_ts, ts)
324 output = "%s%s" % (FormatTimestamp(ts), delta)
325 container.append((name, output, "job_timestamp"))
326 else:
327 container.append((name, "unknown (%s)" % str(ts), "job_timestamp"))
328
329
331 """Show detailed information about jobs.
332
333 @param opts: the command line options selected by the user
334 @type args: list
335 @param args: should contain the job IDs to be queried
336 @rtype: int
337 @return: the desired exit code
338
339 """
340 selected_fields = [
341 "id", "status", "ops", "opresult", "opstatus", "oplog",
342 "opstart", "opexec", "opend", "received_ts", "start_ts", "end_ts",
343 ]
344
345 qfilter = qlang.MakeSimpleFilter("id", _ParseJobIds(args))
346 cl = GetClient()
347 result = cl.Query(constants.QR_JOB, selected_fields, qfilter).data
348
349 job_info_container = []
350
351 for entry in result:
352 ((_, job_id), (rs_status, status), (_, ops), (_, opresult), (_, opstatus),
353 (_, oplog), (_, opstart), (_, opexec), (_, opend), (_, recv_ts),
354 (_, start_ts), (_, end_ts)) = entry
355
356
357 if rs_status != constants.RS_NORMAL:
358 job_info_container.append("Job ID %s not found" % job_id)
359 continue
360
361
362 job_info = [("Job ID", job_id)]
363
364 if status in _USER_JOB_STATUS:
365 status = _USER_JOB_STATUS[status]
366 else:
367 raise errors.ProgrammerError("Unknown job status code '%s'" % status)
368
369 job_info.append(("Status", status))
370
371 _ListJobTimestamp("Received", recv_ts, job_info)
372 _ListJobTimestamp("Processing start", start_ts, job_info, prior_ts=recv_ts)
373 _ListJobTimestamp("Processing end", end_ts, job_info, prior_ts=start_ts)
374
375 if end_ts is not None and recv_ts is not None:
376 job_info.append(("Total processing time", "%.6f seconds" %
377 _CalcDelta(recv_ts, end_ts)))
378 else:
379 job_info.append(("Total processing time", "N/A"))
380
381 opcode_container = []
382 for (opcode, result, status, log, s_ts, x_ts, e_ts) in \
383 zip(ops, opresult, opstatus, oplog, opstart, opexec, opend):
384 opcode_info = []
385 opcode_info.append(("Opcode", opcode["OP_ID"]))
386 opcode_info.append(("Status", status))
387
388 _ListOpcodeTimestamp("Processing start", s_ts, opcode_info)
389 _ListOpcodeTimestamp("Execution start", x_ts, opcode_info)
390 _ListOpcodeTimestamp("Processing end", e_ts, opcode_info)
391
392 opcode_info.append(("Input fields", opcode))
393 opcode_info.append(("Result", result))
394
395 exec_log_container = []
396 for serial, log_ts, log_type, log_msg in log:
397 time_txt = FormatTimestamp(log_ts)
398 encoded = FormatLogMessage(log_type, log_msg)
399
400
401
402
403 exec_log_info = [
404 ("Time", time_txt),
405 ("Content", (serial, log_type, encoded,)),
406 ]
407 exec_log_container.append(exec_log_info)
408 opcode_info.append(("Execution log", exec_log_container))
409
410 opcode_container.append(opcode_info)
411
412 job_info.append(("Opcodes", opcode_container))
413 job_info_container.append(job_info)
414
415 PrintGenericInfo(job_info_container)
416
417 return 0
418
419
421 """Follow a job and print its output as it arrives.
422
423 @param opts: the command line options selected by the user
424 @type args: list
425 @param args: Contains the job ID
426 @rtype: int
427 @return: the desired exit code
428
429 """
430 job_id = args[0]
431
432 msg = ("Output from job %s follows" % job_id)
433 ToStdout(msg)
434 ToStdout("-" * len(msg))
435
436 retcode = 0
437 try:
438 cli.PollJob(job_id)
439 except errors.GenericError, err:
440 (retcode, job_result) = cli.FormatError(err)
441 ToStderr("Job %s failed: %s", job_id, job_result)
442
443 return retcode
444
445
447 """Wait for a job to finish, not producing any output.
448
449 @param opts: the command line options selected by the user
450 @type args: list
451 @param args: Contains the job ID
452 @rtype: int
453 @return: the desired exit code
454
455 """
456 job_id = args[0]
457
458 retcode = 0
459 try:
460 cli.PollJob(job_id, feedback_fn=lambda _: None)
461 except errors.GenericError, err:
462 (retcode, job_result) = cli.FormatError(err)
463 ToStderr("Job %s failed: %s", job_id, job_result)
464
465 return retcode
466
467 _KILL_OPT = \
468 cli_option("--kill", default=False,
469 action="store_true", dest="kill",
470 help="Kill running jobs with SIGKILL")
471
472 _YES_DOIT_OPT = cli_option("--yes-do-it", "--ya-rly", dest="yes_do_it",
473 help="Really use --kill", action="store_true")
474
475 _PENDING_OPT = \
476 cli_option("--pending", default=None,
477 action="store_const", dest="status_filter",
478 const=constants.JOBS_PENDING,
479 help="Select jobs pending execution or being cancelled")
480
481 _RUNNING_OPT = \
482 cli_option("--running", default=None,
483 action="store_const", dest="status_filter",
484 const=frozenset([
485 constants.JOB_STATUS_RUNNING,
486 ]),
487 help="Show jobs currently running only")
488
489 _ERROR_OPT = \
490 cli_option("--error", default=None,
491 action="store_const", dest="status_filter",
492 const=frozenset([
493 constants.JOB_STATUS_ERROR,
494 ]),
495 help="Show failed jobs only")
496
497 _FINISHED_OPT = \
498 cli_option("--finished", default=None,
499 action="store_const", dest="status_filter",
500 const=constants.JOBS_FINALIZED,
501 help="Show finished jobs only")
502
503 _ARCHIVED_OPT = \
504 cli_option("--archived", default=False,
505 action="store_true", dest="archived",
506 help="Include archived jobs in list (slow and expensive)")
507
508 _QUEUED_OPT = \
509 cli_option("--queued", default=None,
510 action="store_const", dest="status_filter",
511 const=frozenset([
512 constants.JOB_STATUS_QUEUED,
513 ]),
514 help="Select queued jobs only")
515
516 _WAITING_OPT = \
517 cli_option("--waiting", default=None,
518 action="store_const", dest="status_filter",
519 const=frozenset([
520 constants.JOB_STATUS_WAITING,
521 ]),
522 help="Select waiting jobs only")
523
524
525 commands = {
526 "list": (
527 ListJobs, [ArgJobId()],
528 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, VERBOSE_OPT, FORCE_FILTER_OPT,
529 _PENDING_OPT, _RUNNING_OPT, _ERROR_OPT, _FINISHED_OPT, _ARCHIVED_OPT],
530 "[job_id ...]",
531 "Lists the jobs and their status. The available fields can be shown"
532 " using the \"list-fields\" command (see the man page for details)."
533 " The default field list is (in order): %s." %
534 utils.CommaJoin(_LIST_DEF_FIELDS)),
535 "list-fields": (
536 ListJobFields, [ArgUnknown()],
537 [NOHDR_OPT, SEP_OPT],
538 "[fields...]",
539 "Lists all available fields for jobs"),
540 "archive": (
541 ArchiveJobs, [ArgJobId(min=1)], [],
542 "<job-id> [<job-id> ...]", "Archive specified jobs"),
543 "autoarchive": (
544 AutoArchiveJobs,
545 [ArgSuggest(min=1, max=1, choices=["1d", "1w", "4w", "all"])],
546 [],
547 "<age>", "Auto archive jobs older than the given age"),
548 "cancel": (
549 CancelJobs, [ArgJobId()],
550 [FORCE_OPT, _KILL_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT,
551 _YES_DOIT_OPT],
552 "{[--force] [--kill --yes-do-it] {--pending | --queued | --waiting} |"
553 " <job-id> [<job-id> ...]}",
554 "Cancel jobs"),
555 "info": (
556 ShowJobs, [ArgJobId(min=1)], [],
557 "<job-id> [<job-id> ...]",
558 "Show detailed information about the specified jobs"),
559 "wait": (
560 WaitJob, [ArgJobId(min=1, max=1)], [],
561 "<job-id>", "Wait for a job to finish"),
562 "watch": (
563 WatchJob, [ArgJobId(min=1, max=1)], [],
564 "<job-id>", "Follows a job and prints its output as it arrives"),
565 "change-priority": (
566 ChangePriority, [ArgJobId()],
567 [PRIORITY_OPT, FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT],
568 "--priority <priority> {[--force] {--pending | --queued | --waiting} |"
569 " <job-id> [<job-id> ...]}",
570 "Change the priority of jobs"),
571 }
572
573
574
575 aliases = {
576 "show": "info",
577 }
578
579
582