Package ganeti :: Package client :: Module gnt_job
[hide private]
[frames] | no frames]

Source Code for Module ganeti.client.gnt_job

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30  """Job related commands""" 
 31   
 32  # pylint: disable=W0401,W0613,W0614,C0103 
 33  # W0401: Wildcard import ganeti.cli 
 34  # W0613: Unused argument, since all functions follow the same API 
 35  # W0614: Unused import %s from wildcard import (since we need cli) 
 36  # C0103: Invalid name gnt-job 
 37   
 38  from ganeti.cli import * 
 39  from ganeti import constants 
 40  from ganeti import errors 
 41  from ganeti import utils 
 42  from ganeti import cli 
 43  from ganeti import qlang 
 44   
 45   
 46  #: default list of fields for L{ListJobs} 
 47  _LIST_DEF_FIELDS = ["id", "status", "summary"] 
 48   
 49  #: map converting the job status contants to user-visible 
 50  #: names 
 51  _USER_JOB_STATUS = { 
 52    constants.JOB_STATUS_QUEUED: "queued", 
 53    constants.JOB_STATUS_WAITING: "waiting", 
 54    constants.JOB_STATUS_CANCELING: "canceling", 
 55    constants.JOB_STATUS_RUNNING: "running", 
 56    constants.JOB_STATUS_CANCELED: "canceled", 
 57    constants.JOB_STATUS_SUCCESS: "success", 
 58    constants.JOB_STATUS_ERROR: "error", 
 59    } 
 60   
 61   
62 -def _FormatStatus(value):
63 """Formats a job status. 64 65 """ 66 try: 67 return _USER_JOB_STATUS[value] 68 except KeyError: 69 raise errors.ProgrammerError("Unknown job status code '%s'" % value)
70 71
72 -def _FormatSummary(value):
73 """Formats a job's summary. Takes possible non-ascii encoding into account. 74 75 """ 76 return ','.encode('utf-8').join(item.encode('utf-8') for item in value)
77 78 79 _JOB_LIST_FORMAT = { 80 "status": (_FormatStatus, False), 81 "summary": (_FormatSummary, False), 82 } 83 _JOB_LIST_FORMAT.update(dict.fromkeys(["opstart", "opexec", "opend"], 84 (lambda value: map(FormatTimestamp, 85 value), 86 None))) 87 88
89 -def _ParseJobIds(args):
90 """Parses a list of string job IDs into integers. 91 92 @param args: list of strings 93 @return: list of integers 94 @raise OpPrereqError: in case of invalid values 95 96 """ 97 try: 98 return [int(a) for a in args] 99 except (ValueError, TypeError), err: 100 raise errors.OpPrereqError("Invalid job ID passed: %s" % err, 101 errors.ECODE_INVAL)
102 103
104 -def ListJobs(opts, args):
105 """List the jobs 106 107 @param opts: the command line options selected by the user 108 @type args: list 109 @param args: should be an empty list 110 @rtype: int 111 @return: the desired exit code 112 113 """ 114 selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS) 115 116 if opts.archived and "archived" not in selected_fields: 117 selected_fields.append("archived") 118 119 qfilter = qlang.MakeSimpleFilter("status", opts.status_filter) 120 121 cl = GetClient(query=True) 122 123 return GenericList(constants.QR_JOB, selected_fields, args, None, 124 opts.separator, not opts.no_headers, 125 format_override=_JOB_LIST_FORMAT, verbose=opts.verbose, 126 force_filter=opts.force_filter, namefield="id", 127 qfilter=qfilter, isnumeric=True, cl=cl)
128 129
130 -def ListJobFields(opts, args):
131 """List job fields. 132 133 @param opts: the command line options selected by the user 134 @type args: list 135 @param args: fields to list, or empty for all 136 @rtype: int 137 @return: the desired exit code 138 139 """ 140 cl = GetClient(query=True) 141 142 return GenericListFields(constants.QR_JOB, args, opts.separator, 143 not opts.no_headers, cl=cl)
144 145
146 -def ArchiveJobs(opts, args):
147 """Archive jobs. 148 149 @param opts: the command line options selected by the user 150 @type args: list 151 @param args: should contain the job IDs to be archived 152 @rtype: int 153 @return: the desired exit code 154 155 """ 156 client = GetClient() 157 158 rcode = 0 159 for job_id in args: 160 if not client.ArchiveJob(job_id): 161 ToStderr("Failed to archive job with ID '%s'", job_id) 162 rcode = 1 163 164 return rcode
165 166
167 -def AutoArchiveJobs(opts, args):
168 """Archive jobs based on age. 169 170 This will archive jobs based on their age, or all jobs if a 'all' is 171 passed. 172 173 @param opts: the command line options selected by the user 174 @type args: list 175 @param args: should contain only one element, the age as a time spec 176 that can be parsed by L{ganeti.cli.ParseTimespec} or the 177 keyword I{all}, which will cause all jobs to be archived 178 @rtype: int 179 @return: the desired exit code 180 181 """ 182 client = GetClient() 183 184 age = args[0] 185 186 if age == "all": 187 age = -1 188 else: 189 age = ParseTimespec(age) 190 191 (archived_count, jobs_left) = client.AutoArchiveJobs(age) 192 ToStdout("Archived %s jobs, %s unchecked left", archived_count, jobs_left) 193 194 return 0
195 196
197 -def _MultiJobAction(opts, args, cl, stdout_fn, ask_fn, question, action_fn):
198 """Applies a function to multipe jobs. 199 200 @param opts: Command line options 201 @type args: list 202 @param args: Job IDs 203 @rtype: int 204 @return: Exit code 205 206 """ 207 if cl is None: 208 cl = GetClient() 209 210 if stdout_fn is None: 211 stdout_fn = ToStdout 212 213 if ask_fn is None: 214 ask_fn = AskUser 215 216 result = constants.EXIT_SUCCESS 217 218 if bool(args) ^ (opts.status_filter is None): 219 raise errors.OpPrereqError("Either a status filter or job ID(s) must be" 220 " specified and never both", errors.ECODE_INVAL) 221 222 if opts.status_filter is not None: 223 response = cl.Query(constants.QR_JOB, ["id", "status", "summary"], 224 qlang.MakeSimpleFilter("status", opts.status_filter)) 225 226 jobs = [i for ((_, i), _, _) in response.data] 227 if not jobs: 228 raise errors.OpPrereqError("No jobs with the requested status have been" 229 " found", errors.ECODE_STATE) 230 231 if not opts.force: 232 (_, table) = FormatQueryResult(response, header=True, 233 format_override=_JOB_LIST_FORMAT) 234 for line in table: 235 stdout_fn(line) 236 237 if not ask_fn(question): 238 return constants.EXIT_CONFIRMATION 239 else: 240 jobs = args 241 242 for job_id in jobs: 243 (success, msg) = action_fn(cl, job_id) 244 245 if not success: 246 result = constants.EXIT_FAILURE 247 248 stdout_fn(msg) 249 250 return result
251 252
253 -def CancelJobs(opts, args, cl=None, _stdout_fn=ToStdout, _ask_fn=AskUser):
254 """Cancel not-yet-started jobs. 255 256 @param opts: the command line options selected by the user 257 @type args: list 258 @param args: should contain the job IDs to be cancelled 259 @rtype: int 260 @return: the desired exit code 261 262 """ 263 return _MultiJobAction(opts, args, cl, _stdout_fn, _ask_fn, 264 "Cancel job(s) listed above?", 265 lambda cl, job_id: cl.CancelJob(job_id))
266 267
268 -def ChangePriority(opts, args):
269 """Change priority of jobs. 270 271 @param opts: Command line options 272 @type args: list 273 @param args: Job IDs 274 @rtype: int 275 @return: Exit code 276 277 """ 278 if opts.priority is None: 279 ToStderr("--priority option must be given.") 280 return constants.EXIT_FAILURE 281 282 return _MultiJobAction(opts, args, None, None, None, 283 "Change priority of job(s) listed above?", 284 lambda cl, job_id: 285 cl.ChangeJobPriority(job_id, opts.priority))
286 287
288 -def ShowJobs(opts, args):
289 """Show detailed information about jobs. 290 291 @param opts: the command line options selected by the user 292 @type args: list 293 @param args: should contain the job IDs to be queried 294 @rtype: int 295 @return: the desired exit code 296 297 """ 298 def format_msg(level, text): 299 """Display the text indented.""" 300 ToStdout("%s%s", " " * level, text)
301 302 def result_helper(value): 303 """Format a result field in a nice way.""" 304 if isinstance(value, (tuple, list)): 305 return "[%s]" % utils.CommaJoin(value) 306 else: 307 return str(value) 308 309 selected_fields = [ 310 "id", "status", "ops", "opresult", "opstatus", "oplog", 311 "opstart", "opexec", "opend", "received_ts", "start_ts", "end_ts", 312 ] 313 314 qfilter = qlang.MakeSimpleFilter("id", _ParseJobIds(args)) 315 cl = GetClient(query=True) 316 result = cl.Query(constants.QR_JOB, selected_fields, qfilter).data 317 318 first = True 319 320 for entry in result: 321 if not first: 322 format_msg(0, "") 323 else: 324 first = False 325 326 ((_, job_id), (rs_status, status), (_, ops), (_, opresult), (_, opstatus), 327 (_, oplog), (_, opstart), (_, opexec), (_, opend), (_, recv_ts), 328 (_, start_ts), (_, end_ts)) = entry 329 330 # Detect non-normal results 331 if rs_status != constants.RS_NORMAL: 332 format_msg(0, "Job ID %s not found" % job_id) 333 continue 334 335 format_msg(0, "Job ID: %s" % job_id) 336 if status in _USER_JOB_STATUS: 337 status = _USER_JOB_STATUS[status] 338 else: 339 raise errors.ProgrammerError("Unknown job status code '%s'" % status) 340 341 format_msg(1, "Status: %s" % status) 342 343 if recv_ts is not None: 344 format_msg(1, "Received: %s" % FormatTimestamp(recv_ts)) 345 else: 346 format_msg(1, "Missing received timestamp (%s)" % str(recv_ts)) 347 348 if start_ts is not None: 349 if recv_ts is not None: 350 d1 = start_ts[0] - recv_ts[0] + (start_ts[1] - recv_ts[1]) / 1000000.0 351 delta = " (delta %.6fs)" % d1 352 else: 353 delta = "" 354 format_msg(1, "Processing start: %s%s" % 355 (FormatTimestamp(start_ts), delta)) 356 else: 357 format_msg(1, "Processing start: unknown (%s)" % str(start_ts)) 358 359 if end_ts is not None: 360 if start_ts is not None: 361 d2 = end_ts[0] - start_ts[0] + (end_ts[1] - start_ts[1]) / 1000000.0 362 delta = " (delta %.6fs)" % d2 363 else: 364 delta = "" 365 format_msg(1, "Processing end: %s%s" % 366 (FormatTimestamp(end_ts), delta)) 367 else: 368 format_msg(1, "Processing end: unknown (%s)" % str(end_ts)) 369 370 if end_ts is not None and recv_ts is not None: 371 d3 = end_ts[0] - recv_ts[0] + (end_ts[1] - recv_ts[1]) / 1000000.0 372 format_msg(1, "Total processing time: %.6f seconds" % d3) 373 else: 374 format_msg(1, "Total processing time: N/A") 375 format_msg(1, "Opcodes:") 376 for (opcode, result, status, log, s_ts, x_ts, e_ts) in \ 377 zip(ops, opresult, opstatus, oplog, opstart, opexec, opend): 378 format_msg(2, "%s" % opcode["OP_ID"]) 379 format_msg(3, "Status: %s" % status) 380 if isinstance(s_ts, (tuple, list)): 381 format_msg(3, "Processing start: %s" % FormatTimestamp(s_ts)) 382 else: 383 format_msg(3, "No processing start time") 384 if isinstance(x_ts, (tuple, list)): 385 format_msg(3, "Execution start: %s" % FormatTimestamp(x_ts)) 386 else: 387 format_msg(3, "No execution start time") 388 if isinstance(e_ts, (tuple, list)): 389 format_msg(3, "Processing end: %s" % FormatTimestamp(e_ts)) 390 else: 391 format_msg(3, "No processing end time") 392 format_msg(3, "Input fields:") 393 for key in utils.NiceSort(opcode.keys()): 394 if key == "OP_ID": 395 continue 396 val = opcode[key] 397 if isinstance(val, (tuple, list)): 398 val = ",".join([str(item) for item in val]) 399 format_msg(4, "%s: %s" % (key, val)) 400 if result is None: 401 format_msg(3, "No output data") 402 elif isinstance(result, (tuple, list)): 403 if not result: 404 format_msg(3, "Result: empty sequence") 405 else: 406 format_msg(3, "Result:") 407 for elem in result: 408 format_msg(4, result_helper(elem)) 409 elif isinstance(result, dict): 410 if not result: 411 format_msg(3, "Result: empty dictionary") 412 else: 413 format_msg(3, "Result:") 414 for key, val in result.iteritems(): 415 format_msg(4, "%s: %s" % (key, result_helper(val))) 416 else: 417 format_msg(3, "Result: %s" % result) 418 format_msg(3, "Execution log:") 419 for serial, log_ts, log_type, log_msg in log: 420 time_txt = FormatTimestamp(log_ts) 421 encoded = FormatLogMessage(log_type, log_msg) 422 format_msg(4, "%s:%s:%s %s" % (serial, time_txt, log_type, encoded)) 423 return 0 424 425
426 -def WatchJob(opts, args):
427 """Follow a job and print its output as it arrives. 428 429 @param opts: the command line options selected by the user 430 @type args: list 431 @param args: Contains the job ID 432 @rtype: int 433 @return: the desired exit code 434 435 """ 436 job_id = args[0] 437 438 msg = ("Output from job %s follows" % job_id) 439 ToStdout(msg) 440 ToStdout("-" * len(msg)) 441 442 retcode = 0 443 try: 444 cli.PollJob(job_id) 445 except errors.GenericError, err: 446 (retcode, job_result) = cli.FormatError(err) 447 ToStderr("Job %s failed: %s", job_id, job_result) 448 449 return retcode
450 451
452 -def WaitJob(opts, args):
453 """Wait for a job to finish, not producing any output. 454 455 @param opts: the command line options selected by the user 456 @type args: list 457 @param args: Contains the job ID 458 @rtype: int 459 @return: the desired exit code 460 461 """ 462 job_id = args[0] 463 464 retcode = 0 465 try: 466 cli.PollJob(job_id, feedback_fn=lambda _: None) 467 except errors.GenericError, err: 468 (retcode, job_result) = cli.FormatError(err) 469 ToStderr("Job %s failed: %s", job_id, job_result) 470 471 return retcode
472 473 474 _PENDING_OPT = \ 475 cli_option("--pending", default=None, 476 action="store_const", dest="status_filter", 477 const=constants.JOBS_PENDING, 478 help="Select jobs pending execution or being cancelled") 479 480 _RUNNING_OPT = \ 481 cli_option("--running", default=None, 482 action="store_const", dest="status_filter", 483 const=frozenset([ 484 constants.JOB_STATUS_RUNNING, 485 ]), 486 help="Show jobs currently running only") 487 488 _ERROR_OPT = \ 489 cli_option("--error", default=None, 490 action="store_const", dest="status_filter", 491 const=frozenset([ 492 constants.JOB_STATUS_ERROR, 493 ]), 494 help="Show failed jobs only") 495 496 _FINISHED_OPT = \ 497 cli_option("--finished", default=None, 498 action="store_const", dest="status_filter", 499 const=constants.JOBS_FINALIZED, 500 help="Show finished jobs only") 501 502 _ARCHIVED_OPT = \ 503 cli_option("--archived", default=False, 504 action="store_true", dest="archived", 505 help="Include archived jobs in list (slow and expensive)") 506 507 _QUEUED_OPT = \ 508 cli_option("--queued", default=None, 509 action="store_const", dest="status_filter", 510 const=frozenset([ 511 constants.JOB_STATUS_QUEUED, 512 ]), 513 help="Select queued jobs only") 514 515 _WAITING_OPT = \ 516 cli_option("--waiting", default=None, 517 action="store_const", dest="status_filter", 518 const=frozenset([ 519 constants.JOB_STATUS_WAITING, 520 ]), 521 help="Select waiting jobs only") 522 523 524 commands = { 525 "list": ( 526 ListJobs, [ArgJobId()], 527 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, VERBOSE_OPT, FORCE_FILTER_OPT, 528 _PENDING_OPT, _RUNNING_OPT, _ERROR_OPT, _FINISHED_OPT, _ARCHIVED_OPT], 529 "[job_id ...]", 530 "Lists the jobs and their status. The available fields can be shown" 531 " using the \"list-fields\" command (see the man page for details)." 532 " The default field list is (in order): %s." % 533 utils.CommaJoin(_LIST_DEF_FIELDS)), 534 "list-fields": ( 535 ListJobFields, [ArgUnknown()], 536 [NOHDR_OPT, SEP_OPT], 537 "[fields...]", 538 "Lists all available fields for jobs"), 539 "archive": ( 540 ArchiveJobs, [ArgJobId(min=1)], [], 541 "<job-id> [<job-id> ...]", "Archive specified jobs"), 542 "autoarchive": ( 543 AutoArchiveJobs, 544 [ArgSuggest(min=1, max=1, choices=["1d", "1w", "4w", "all"])], 545 [], 546 "<age>", "Auto archive jobs older than the given age"), 547 "cancel": ( 548 CancelJobs, [ArgJobId()], 549 [FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT], 550 "{[--force] {--pending | --queued | --waiting} |" 551 " <job-id> [<job-id> ...]}", 552 "Cancel jobs"), 553 "info": ( 554 ShowJobs, [ArgJobId(min=1)], [], 555 "<job-id> [<job-id> ...]", 556 "Show detailed information about the specified jobs"), 557 "wait": ( 558 WaitJob, [ArgJobId(min=1, max=1)], [], 559 "<job-id>", "Wait for a job to finish"), 560 "watch": ( 561 WatchJob, [ArgJobId(min=1, max=1)], [], 562 "<job-id>", "Follows a job and prints its output as it arrives"), 563 "change-priority": ( 564 ChangePriority, [ArgJobId()], 565 [PRIORITY_OPT, FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT], 566 "--priority <priority> {[--force] {--pending | --queued | --waiting} |" 567 " <job-id> [<job-id> ...]}", 568 "Change the priority of jobs"), 569 } 570 571 572 #: dictionary with aliases for commands 573 aliases = { 574 "show": "info", 575 } 576 577
578 -def Main():
579 return GenericMain(commands, aliases=aliases)
580