Package ganeti :: Package client :: Module gnt_job
[hide private]
[frames] | no frames]

Source Code for Module ganeti.client.gnt_job

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21  """Job related commands""" 
 22   
 23  # pylint: disable=W0401,W0613,W0614,C0103 
 24  # W0401: Wildcard import ganeti.cli 
 25  # W0613: Unused argument, since all functions follow the same API 
 26  # W0614: Unused import %s from wildcard import (since we need cli) 
 27  # C0103: Invalid name gnt-job 
 28   
 29  from ganeti.cli import * 
 30  from ganeti import constants 
 31  from ganeti import errors 
 32  from ganeti import utils 
 33  from ganeti import cli 
 34  from ganeti import qlang 
 35   
 36   
 37  #: default list of fields for L{ListJobs} 
 38  _LIST_DEF_FIELDS = ["id", "status", "summary"] 
 39   
 40  #: map converting the job status contants to user-visible 
 41  #: names 
 42  _USER_JOB_STATUS = { 
 43    constants.JOB_STATUS_QUEUED: "queued", 
 44    constants.JOB_STATUS_WAITING: "waiting", 
 45    constants.JOB_STATUS_CANCELING: "canceling", 
 46    constants.JOB_STATUS_RUNNING: "running", 
 47    constants.JOB_STATUS_CANCELED: "canceled", 
 48    constants.JOB_STATUS_SUCCESS: "success", 
 49    constants.JOB_STATUS_ERROR: "error", 
 50    } 
 51   
 52   
53 -def _FormatStatus(value):
54 """Formats a job status. 55 56 """ 57 try: 58 return _USER_JOB_STATUS[value] 59 except KeyError: 60 raise errors.ProgrammerError("Unknown job status code '%s'" % value)
61 62
63 -def _FormatSummary(value):
64 """Formats a job's summary. Takes possible non-ascii encoding into account. 65 66 """ 67 return ','.encode('utf-8').join(item.encode('utf-8') for item in value)
68 69 70 _JOB_LIST_FORMAT = { 71 "status": (_FormatStatus, False), 72 "summary": (_FormatSummary, False), 73 } 74 _JOB_LIST_FORMAT.update(dict.fromkeys(["opstart", "opexec", "opend"], 75 (lambda value: map(FormatTimestamp, 76 value), 77 None))) 78 79
80 -def _ParseJobIds(args):
81 """Parses a list of string job IDs into integers. 82 83 @param args: list of strings 84 @return: list of integers 85 @raise OpPrereqError: in case of invalid values 86 87 """ 88 try: 89 return [int(a) for a in args] 90 except (ValueError, TypeError), err: 91 raise errors.OpPrereqError("Invalid job ID passed: %s" % err, 92 errors.ECODE_INVAL)
93 94
95 -def ListJobs(opts, args):
96 """List the jobs 97 98 @param opts: the command line options selected by the user 99 @type args: list 100 @param args: should be an empty list 101 @rtype: int 102 @return: the desired exit code 103 104 """ 105 selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS) 106 107 if opts.archived and "archived" not in selected_fields: 108 selected_fields.append("archived") 109 110 qfilter = qlang.MakeSimpleFilter("status", opts.status_filter) 111 112 cl = GetClient(query=True) 113 114 return GenericList(constants.QR_JOB, selected_fields, args, None, 115 opts.separator, not opts.no_headers, 116 format_override=_JOB_LIST_FORMAT, verbose=opts.verbose, 117 force_filter=opts.force_filter, namefield="id", 118 qfilter=qfilter, isnumeric=True, cl=cl)
119 120
121 -def ListJobFields(opts, args):
122 """List job fields. 123 124 @param opts: the command line options selected by the user 125 @type args: list 126 @param args: fields to list, or empty for all 127 @rtype: int 128 @return: the desired exit code 129 130 """ 131 cl = GetClient(query=True) 132 133 return GenericListFields(constants.QR_JOB, args, opts.separator, 134 not opts.no_headers, cl=cl)
135 136
137 -def ArchiveJobs(opts, args):
138 """Archive jobs. 139 140 @param opts: the command line options selected by the user 141 @type args: list 142 @param args: should contain the job IDs to be archived 143 @rtype: int 144 @return: the desired exit code 145 146 """ 147 client = GetClient() 148 149 rcode = 0 150 for job_id in args: 151 if not client.ArchiveJob(job_id): 152 ToStderr("Failed to archive job with ID '%s'", job_id) 153 rcode = 1 154 155 return rcode
156 157
158 -def AutoArchiveJobs(opts, args):
159 """Archive jobs based on age. 160 161 This will archive jobs based on their age, or all jobs if a 'all' is 162 passed. 163 164 @param opts: the command line options selected by the user 165 @type args: list 166 @param args: should contain only one element, the age as a time spec 167 that can be parsed by L{ganeti.cli.ParseTimespec} or the 168 keyword I{all}, which will cause all jobs to be archived 169 @rtype: int 170 @return: the desired exit code 171 172 """ 173 client = GetClient() 174 175 age = args[0] 176 177 if age == "all": 178 age = -1 179 else: 180 age = ParseTimespec(age) 181 182 (archived_count, jobs_left) = client.AutoArchiveJobs(age) 183 ToStdout("Archived %s jobs, %s unchecked left", archived_count, jobs_left) 184 185 return 0
186 187
188 -def _MultiJobAction(opts, args, cl, stdout_fn, ask_fn, question, action_fn):
189 """Applies a function to multipe jobs. 190 191 @param opts: Command line options 192 @type args: list 193 @param args: Job IDs 194 @rtype: int 195 @return: Exit code 196 197 """ 198 if cl is None: 199 cl = GetClient() 200 201 if stdout_fn is None: 202 stdout_fn = ToStdout 203 204 if ask_fn is None: 205 ask_fn = AskUser 206 207 result = constants.EXIT_SUCCESS 208 209 if bool(args) ^ (opts.status_filter is None): 210 raise errors.OpPrereqError("Either a status filter or job ID(s) must be" 211 " specified and never both", errors.ECODE_INVAL) 212 213 if opts.status_filter is not None: 214 response = cl.Query(constants.QR_JOB, ["id", "status", "summary"], 215 qlang.MakeSimpleFilter("status", opts.status_filter)) 216 217 jobs = [i for ((_, i), _, _) in response.data] 218 if not jobs: 219 raise errors.OpPrereqError("No jobs with the requested status have been" 220 " found", errors.ECODE_STATE) 221 222 if not opts.force: 223 (_, table) = FormatQueryResult(response, header=True, 224 format_override=_JOB_LIST_FORMAT) 225 for line in table: 226 stdout_fn(line) 227 228 if not ask_fn(question): 229 return constants.EXIT_CONFIRMATION 230 else: 231 jobs = args 232 233 for job_id in jobs: 234 (success, msg) = action_fn(cl, job_id) 235 236 if not success: 237 result = constants.EXIT_FAILURE 238 239 stdout_fn(msg) 240 241 return result
242 243
244 -def CancelJobs(opts, args, cl=None, _stdout_fn=ToStdout, _ask_fn=AskUser):
245 """Cancel not-yet-started jobs. 246 247 @param opts: the command line options selected by the user 248 @type args: list 249 @param args: should contain the job IDs to be cancelled 250 @rtype: int 251 @return: the desired exit code 252 253 """ 254 return _MultiJobAction(opts, args, cl, _stdout_fn, _ask_fn, 255 "Cancel job(s) listed above?", 256 lambda cl, job_id: cl.CancelJob(job_id))
257 258
259 -def ChangePriority(opts, args):
260 """Change priority of jobs. 261 262 @param opts: Command line options 263 @type args: list 264 @param args: Job IDs 265 @rtype: int 266 @return: Exit code 267 268 """ 269 if opts.priority is None: 270 ToStderr("--priority option must be given.") 271 return constants.EXIT_FAILURE 272 273 return _MultiJobAction(opts, args, None, None, None, 274 "Change priority of job(s) listed above?", 275 lambda cl, job_id: 276 cl.ChangeJobPriority(job_id, opts.priority))
277 278
279 -def ShowJobs(opts, args):
280 """Show detailed information about jobs. 281 282 @param opts: the command line options selected by the user 283 @type args: list 284 @param args: should contain the job IDs to be queried 285 @rtype: int 286 @return: the desired exit code 287 288 """ 289 def format_msg(level, text): 290 """Display the text indented.""" 291 ToStdout("%s%s", " " * level, text)
292 293 def result_helper(value): 294 """Format a result field in a nice way.""" 295 if isinstance(value, (tuple, list)): 296 return "[%s]" % utils.CommaJoin(value) 297 else: 298 return str(value) 299 300 selected_fields = [ 301 "id", "status", "ops", "opresult", "opstatus", "oplog", 302 "opstart", "opexec", "opend", "received_ts", "start_ts", "end_ts", 303 ] 304 305 qfilter = qlang.MakeSimpleFilter("id", _ParseJobIds(args)) 306 cl = GetClient(query=True) 307 result = cl.Query(constants.QR_JOB, selected_fields, qfilter).data 308 309 first = True 310 311 for entry in result: 312 if not first: 313 format_msg(0, "") 314 else: 315 first = False 316 317 ((_, job_id), (rs_status, status), (_, ops), (_, opresult), (_, opstatus), 318 (_, oplog), (_, opstart), (_, opexec), (_, opend), (_, recv_ts), 319 (_, start_ts), (_, end_ts)) = entry 320 321 # Detect non-normal results 322 if rs_status != constants.RS_NORMAL: 323 format_msg(0, "Job ID %s not found" % job_id) 324 continue 325 326 format_msg(0, "Job ID: %s" % job_id) 327 if status in _USER_JOB_STATUS: 328 status = _USER_JOB_STATUS[status] 329 else: 330 raise errors.ProgrammerError("Unknown job status code '%s'" % status) 331 332 format_msg(1, "Status: %s" % status) 333 334 if recv_ts is not None: 335 format_msg(1, "Received: %s" % FormatTimestamp(recv_ts)) 336 else: 337 format_msg(1, "Missing received timestamp (%s)" % str(recv_ts)) 338 339 if start_ts is not None: 340 if recv_ts is not None: 341 d1 = start_ts[0] - recv_ts[0] + (start_ts[1] - recv_ts[1]) / 1000000.0 342 delta = " (delta %.6fs)" % d1 343 else: 344 delta = "" 345 format_msg(1, "Processing start: %s%s" % 346 (FormatTimestamp(start_ts), delta)) 347 else: 348 format_msg(1, "Processing start: unknown (%s)" % str(start_ts)) 349 350 if end_ts is not None: 351 if start_ts is not None: 352 d2 = end_ts[0] - start_ts[0] + (end_ts[1] - start_ts[1]) / 1000000.0 353 delta = " (delta %.6fs)" % d2 354 else: 355 delta = "" 356 format_msg(1, "Processing end: %s%s" % 357 (FormatTimestamp(end_ts), delta)) 358 else: 359 format_msg(1, "Processing end: unknown (%s)" % str(end_ts)) 360 361 if end_ts is not None and recv_ts is not None: 362 d3 = end_ts[0] - recv_ts[0] + (end_ts[1] - recv_ts[1]) / 1000000.0 363 format_msg(1, "Total processing time: %.6f seconds" % d3) 364 else: 365 format_msg(1, "Total processing time: N/A") 366 format_msg(1, "Opcodes:") 367 for (opcode, result, status, log, s_ts, x_ts, e_ts) in \ 368 zip(ops, opresult, opstatus, oplog, opstart, opexec, opend): 369 format_msg(2, "%s" % opcode["OP_ID"]) 370 format_msg(3, "Status: %s" % status) 371 if isinstance(s_ts, (tuple, list)): 372 format_msg(3, "Processing start: %s" % FormatTimestamp(s_ts)) 373 else: 374 format_msg(3, "No processing start time") 375 if isinstance(x_ts, (tuple, list)): 376 format_msg(3, "Execution start: %s" % FormatTimestamp(x_ts)) 377 else: 378 format_msg(3, "No execution start time") 379 if isinstance(e_ts, (tuple, list)): 380 format_msg(3, "Processing end: %s" % FormatTimestamp(e_ts)) 381 else: 382 format_msg(3, "No processing end time") 383 format_msg(3, "Input fields:") 384 for key in utils.NiceSort(opcode.keys()): 385 if key == "OP_ID": 386 continue 387 val = opcode[key] 388 if isinstance(val, (tuple, list)): 389 val = ",".join([str(item) for item in val]) 390 format_msg(4, "%s: %s" % (key, val)) 391 if result is None: 392 format_msg(3, "No output data") 393 elif isinstance(result, (tuple, list)): 394 if not result: 395 format_msg(3, "Result: empty sequence") 396 else: 397 format_msg(3, "Result:") 398 for elem in result: 399 format_msg(4, result_helper(elem)) 400 elif isinstance(result, dict): 401 if not result: 402 format_msg(3, "Result: empty dictionary") 403 else: 404 format_msg(3, "Result:") 405 for key, val in result.iteritems(): 406 format_msg(4, "%s: %s" % (key, result_helper(val))) 407 else: 408 format_msg(3, "Result: %s" % result) 409 format_msg(3, "Execution log:") 410 for serial, log_ts, log_type, log_msg in log: 411 time_txt = FormatTimestamp(log_ts) 412 encoded = FormatLogMessage(log_type, log_msg) 413 format_msg(4, "%s:%s:%s %s" % (serial, time_txt, log_type, encoded)) 414 return 0 415 416
417 -def WatchJob(opts, args):
418 """Follow a job and print its output as it arrives. 419 420 @param opts: the command line options selected by the user 421 @type args: list 422 @param args: Contains the job ID 423 @rtype: int 424 @return: the desired exit code 425 426 """ 427 job_id = args[0] 428 429 msg = ("Output from job %s follows" % job_id) 430 ToStdout(msg) 431 ToStdout("-" * len(msg)) 432 433 retcode = 0 434 try: 435 cli.PollJob(job_id) 436 except errors.GenericError, err: 437 (retcode, job_result) = cli.FormatError(err) 438 ToStderr("Job %s failed: %s", job_id, job_result) 439 440 return retcode
441 442
443 -def WaitJob(opts, args):
444 """Wait for a job to finish, not producing any output. 445 446 @param opts: the command line options selected by the user 447 @type args: list 448 @param args: Contains the job ID 449 @rtype: int 450 @return: the desired exit code 451 452 """ 453 job_id = args[0] 454 455 retcode = 0 456 try: 457 cli.PollJob(job_id, feedback_fn=lambda _: None) 458 except errors.GenericError, err: 459 (retcode, job_result) = cli.FormatError(err) 460 ToStderr("Job %s failed: %s", job_id, job_result) 461 462 return retcode
463 464 465 _PENDING_OPT = \ 466 cli_option("--pending", default=None, 467 action="store_const", dest="status_filter", 468 const=constants.JOBS_PENDING, 469 help="Select jobs pending execution or being cancelled") 470 471 _RUNNING_OPT = \ 472 cli_option("--running", default=None, 473 action="store_const", dest="status_filter", 474 const=frozenset([ 475 constants.JOB_STATUS_RUNNING, 476 ]), 477 help="Show jobs currently running only") 478 479 _ERROR_OPT = \ 480 cli_option("--error", default=None, 481 action="store_const", dest="status_filter", 482 const=frozenset([ 483 constants.JOB_STATUS_ERROR, 484 ]), 485 help="Show failed jobs only") 486 487 _FINISHED_OPT = \ 488 cli_option("--finished", default=None, 489 action="store_const", dest="status_filter", 490 const=constants.JOBS_FINALIZED, 491 help="Show finished jobs only") 492 493 _ARCHIVED_OPT = \ 494 cli_option("--archived", default=False, 495 action="store_true", dest="archived", 496 help="Include archived jobs in list (slow and expensive)") 497 498 _QUEUED_OPT = \ 499 cli_option("--queued", default=None, 500 action="store_const", dest="status_filter", 501 const=frozenset([ 502 constants.JOB_STATUS_QUEUED, 503 ]), 504 help="Select queued jobs only") 505 506 _WAITING_OPT = \ 507 cli_option("--waiting", default=None, 508 action="store_const", dest="status_filter", 509 const=frozenset([ 510 constants.JOB_STATUS_WAITING, 511 ]), 512 help="Select waiting jobs only") 513 514 515 commands = { 516 "list": ( 517 ListJobs, [ArgJobId()], 518 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, VERBOSE_OPT, FORCE_FILTER_OPT, 519 _PENDING_OPT, _RUNNING_OPT, _ERROR_OPT, _FINISHED_OPT, _ARCHIVED_OPT], 520 "[job_id ...]", 521 "Lists the jobs and their status. The available fields can be shown" 522 " using the \"list-fields\" command (see the man page for details)." 523 " The default field list is (in order): %s." % 524 utils.CommaJoin(_LIST_DEF_FIELDS)), 525 "list-fields": ( 526 ListJobFields, [ArgUnknown()], 527 [NOHDR_OPT, SEP_OPT], 528 "[fields...]", 529 "Lists all available fields for jobs"), 530 "archive": ( 531 ArchiveJobs, [ArgJobId(min=1)], [], 532 "<job-id> [<job-id> ...]", "Archive specified jobs"), 533 "autoarchive": ( 534 AutoArchiveJobs, 535 [ArgSuggest(min=1, max=1, choices=["1d", "1w", "4w", "all"])], 536 [], 537 "<age>", "Auto archive jobs older than the given age"), 538 "cancel": ( 539 CancelJobs, [ArgJobId()], 540 [FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT], 541 "{[--force] {--pending | --queued | --waiting} |" 542 " <job-id> [<job-id> ...]}", 543 "Cancel jobs"), 544 "info": ( 545 ShowJobs, [ArgJobId(min=1)], [], 546 "<job-id> [<job-id> ...]", 547 "Show detailed information about the specified jobs"), 548 "wait": ( 549 WaitJob, [ArgJobId(min=1, max=1)], [], 550 "<job-id>", "Wait for a job to finish"), 551 "watch": ( 552 WatchJob, [ArgJobId(min=1, max=1)], [], 553 "<job-id>", "Follows a job and prints its output as it arrives"), 554 "change-priority": ( 555 ChangePriority, [ArgJobId()], 556 [PRIORITY_OPT, FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT], 557 "--priority <priority> {[--force] {--pending | --queued | --waiting} |" 558 " <job-id> [<job-id> ...]}", 559 "Change the priority of jobs"), 560 } 561 562 563 #: dictionary with aliases for commands 564 aliases = { 565 "show": "info", 566 } 567 568
569 -def Main():
570 return GenericMain(commands, aliases=aliases)
571