Package ganeti :: Package client :: Module gnt_job
[hide private]
[frames] | no frames]

Source Code for Module ganeti.client.gnt_job

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30  """Job related commands""" 
 31   
 32  # pylint: disable=W0401,W0613,W0614,C0103 
 33  # W0401: Wildcard import ganeti.cli 
 34  # W0613: Unused argument, since all functions follow the same API 
 35  # W0614: Unused import %s from wildcard import (since we need cli) 
 36  # C0103: Invalid name gnt-job 
 37   
 38  from ganeti.cli import * 
 39  from ganeti import constants 
 40  from ganeti import errors 
 41  from ganeti import utils 
 42  from ganeti import cli 
 43  from ganeti import qlang 
 44   
 45   
 46  #: default list of fields for L{ListJobs} 
 47  _LIST_DEF_FIELDS = ["id", "status", "summary"] 
 48   
 49  #: map converting the job status contants to user-visible 
 50  #: names 
 51  _USER_JOB_STATUS = { 
 52    constants.JOB_STATUS_QUEUED: "queued", 
 53    constants.JOB_STATUS_WAITING: "waiting", 
 54    constants.JOB_STATUS_CANCELING: "canceling", 
 55    constants.JOB_STATUS_RUNNING: "running", 
 56    constants.JOB_STATUS_CANCELED: "canceled", 
 57    constants.JOB_STATUS_SUCCESS: "success", 
 58    constants.JOB_STATUS_ERROR: "error", 
 59    } 
 60   
 61   
62 -def _FormatStatus(value):
63 """Formats a job status. 64 65 """ 66 try: 67 return _USER_JOB_STATUS[value] 68 except KeyError: 69 raise errors.ProgrammerError("Unknown job status code '%s'" % value)
70 71
72 -def _FormatSummary(value):
73 """Formats a job's summary. Takes possible non-ascii encoding into account. 74 75 """ 76 return ','.encode('utf-8').join(item.encode('utf-8') for item in value)
77 78 79 _JOB_LIST_FORMAT = { 80 "status": (_FormatStatus, False), 81 "summary": (_FormatSummary, False), 82 } 83 _JOB_LIST_FORMAT.update(dict.fromkeys(["opstart", "opexec", "opend"], 84 (lambda value: map(FormatTimestamp, 85 value), 86 None))) 87 88
89 -def _ParseJobIds(args):
90 """Parses a list of string job IDs into integers. 91 92 @param args: list of strings 93 @return: list of integers 94 @raise OpPrereqError: in case of invalid values 95 96 """ 97 try: 98 return [int(a) for a in args] 99 except (ValueError, TypeError), err: 100 raise errors.OpPrereqError("Invalid job ID passed: %s" % err, 101 errors.ECODE_INVAL)
102 103
104 -def ListJobs(opts, args):
105 """List the jobs 106 107 @param opts: the command line options selected by the user 108 @type args: list 109 @param args: should be an empty list 110 @rtype: int 111 @return: the desired exit code 112 113 """ 114 selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS) 115 116 if opts.archived and "archived" not in selected_fields: 117 selected_fields.append("archived") 118 119 qfilter = qlang.MakeSimpleFilter("status", opts.status_filter) 120 121 cl = GetClient() 122 123 return GenericList(constants.QR_JOB, selected_fields, args, None, 124 opts.separator, not opts.no_headers, 125 format_override=_JOB_LIST_FORMAT, verbose=opts.verbose, 126 force_filter=opts.force_filter, namefield="id", 127 qfilter=qfilter, isnumeric=True, cl=cl)
128 129
130 -def ListJobFields(opts, args):
131 """List job fields. 132 133 @param opts: the command line options selected by the user 134 @type args: list 135 @param args: fields to list, or empty for all 136 @rtype: int 137 @return: the desired exit code 138 139 """ 140 cl = GetClient() 141 142 return GenericListFields(constants.QR_JOB, args, opts.separator, 143 not opts.no_headers, cl=cl)
144 145
146 -def ArchiveJobs(opts, args):
147 """Archive jobs. 148 149 @param opts: the command line options selected by the user 150 @type args: list 151 @param args: should contain the job IDs to be archived 152 @rtype: int 153 @return: the desired exit code 154 155 """ 156 client = GetClient() 157 158 rcode = 0 159 for job_id in args: 160 if not client.ArchiveJob(job_id): 161 ToStderr("Failed to archive job with ID '%s'", job_id) 162 rcode = 1 163 164 return rcode
165 166
167 -def AutoArchiveJobs(opts, args):
168 """Archive jobs based on age. 169 170 This will archive jobs based on their age, or all jobs if a 'all' is 171 passed. 172 173 @param opts: the command line options selected by the user 174 @type args: list 175 @param args: should contain only one element, the age as a time spec 176 that can be parsed by L{ganeti.cli.ParseTimespec} or the 177 keyword I{all}, which will cause all jobs to be archived 178 @rtype: int 179 @return: the desired exit code 180 181 """ 182 client = GetClient() 183 184 age = args[0] 185 186 if age == "all": 187 age = -1 188 else: 189 age = ParseTimespec(age) 190 191 (archived_count, jobs_left) = client.AutoArchiveJobs(age) 192 ToStdout("Archived %s jobs, %s unchecked left", archived_count, jobs_left) 193 194 return 0
195 196
197 -def _MultiJobAction(opts, args, cl, stdout_fn, ask_fn, question, action_fn):
198 """Applies a function to multipe jobs. 199 200 @param opts: Command line options 201 @type args: list 202 @param args: Job IDs 203 @rtype: int 204 @return: Exit code 205 206 """ 207 if cl is None: 208 cl = GetClient() 209 210 if stdout_fn is None: 211 stdout_fn = ToStdout 212 213 if ask_fn is None: 214 ask_fn = AskUser 215 216 result = constants.EXIT_SUCCESS 217 218 if bool(args) ^ (opts.status_filter is None): 219 raise errors.OpPrereqError("Either a status filter or job ID(s) must be" 220 " specified and never both", errors.ECODE_INVAL) 221 222 if opts.status_filter is not None: 223 response = cl.Query(constants.QR_JOB, ["id", "status", "summary"], 224 qlang.MakeSimpleFilter("status", opts.status_filter)) 225 226 jobs = [i for ((_, i), _, _) in response.data] 227 if not jobs: 228 raise errors.OpPrereqError("No jobs with the requested status have been" 229 " found", errors.ECODE_STATE) 230 231 if not opts.force: 232 (_, table) = FormatQueryResult(response, header=True, 233 format_override=_JOB_LIST_FORMAT) 234 for line in table: 235 stdout_fn(line) 236 237 if not ask_fn(question): 238 return constants.EXIT_CONFIRMATION 239 else: 240 jobs = args 241 242 for job_id in jobs: 243 (success, msg) = action_fn(cl, job_id) 244 245 if not success: 246 result = constants.EXIT_FAILURE 247 248 stdout_fn(msg) 249 250 return result
251 252
253 -def CancelJobs(opts, args, cl=None, _stdout_fn=ToStdout, _ask_fn=AskUser):
254 """Cancel not-yet-started jobs. 255 256 @param opts: the command line options selected by the user 257 @type args: list 258 @param args: should contain the job IDs to be cancelled 259 @rtype: int 260 @return: the desired exit code 261 262 """ 263 if opts.kill: 264 action_name = "KILL" 265 if not opts.yes_do_it: 266 raise errors.OpPrereqError("The --kill option must be confirmed" 267 " with --yes-do-it", errors.ECODE_INVAL) 268 else: 269 action_name = "Cancel" 270 return _MultiJobAction(opts, args, cl, _stdout_fn, _ask_fn, 271 "%s job(s) listed above?" % action_name, 272 lambda cl, job_id: cl.CancelJob(job_id, 273 kill=opts.kill))
274 275
276 -def ChangePriority(opts, args):
277 """Change priority of jobs. 278 279 @param opts: Command line options 280 @type args: list 281 @param args: Job IDs 282 @rtype: int 283 @return: Exit code 284 285 """ 286 if opts.priority is None: 287 ToStderr("--priority option must be given.") 288 return constants.EXIT_FAILURE 289 290 return _MultiJobAction(opts, args, None, None, None, 291 "Change priority of job(s) listed above?", 292 lambda cl, job_id: 293 cl.ChangeJobPriority(job_id, opts.priority))
294 295
296 -def _ListOpcodeTimestamp(name, ts, container):
297 """ Adds the opcode timestamp to the given container. 298 299 """ 300 if isinstance(ts, (tuple, list)): 301 container.append((name, FormatTimestamp(ts), "opcode_timestamp")) 302 else: 303 container.append((name, "N/A", "opcode_timestamp"))
304 305
306 -def _CalcDelta(from_ts, to_ts):
307 """ Calculates the delta between two timestamps. 308 309 """ 310 return to_ts[0] - from_ts[0] + (to_ts[1] - from_ts[1]) / 1000000.0
311 312
313 -def _ListJobTimestamp(name, ts, container, prior_ts=None):
314 """ Adds the job timestamp to the given container. 315 316 @param prior_ts: The timestamp used to calculate the amount of time that 317 passed since the given timestamp. 318 319 """ 320 if ts is not None: 321 delta = "" 322 if prior_ts is not None: 323 delta = " (delta %.6fs)" % _CalcDelta(prior_ts, ts) 324 output = "%s%s" % (FormatTimestamp(ts), delta) 325 container.append((name, output, "job_timestamp")) 326 else: 327 container.append((name, "unknown (%s)" % str(ts), "job_timestamp"))
328 329
330 -def ShowJobs(opts, args):
331 """Show detailed information about jobs. 332 333 @param opts: the command line options selected by the user 334 @type args: list 335 @param args: should contain the job IDs to be queried 336 @rtype: int 337 @return: the desired exit code 338 339 """ 340 selected_fields = [ 341 "id", "status", "ops", "opresult", "opstatus", "oplog", 342 "opstart", "opexec", "opend", "received_ts", "start_ts", "end_ts", 343 ] 344 345 qfilter = qlang.MakeSimpleFilter("id", _ParseJobIds(args)) 346 cl = GetClient() 347 result = cl.Query(constants.QR_JOB, selected_fields, qfilter).data 348 349 job_info_container = [] 350 351 for entry in result: 352 ((_, job_id), (rs_status, status), (_, ops), (_, opresult), (_, opstatus), 353 (_, oplog), (_, opstart), (_, opexec), (_, opend), (_, recv_ts), 354 (_, start_ts), (_, end_ts)) = entry 355 356 # Detect non-normal results 357 if rs_status != constants.RS_NORMAL: 358 job_info_container.append("Job ID %s not found" % job_id) 359 continue 360 361 # Container for produced data 362 job_info = [("Job ID", job_id)] 363 364 if status in _USER_JOB_STATUS: 365 status = _USER_JOB_STATUS[status] 366 else: 367 raise errors.ProgrammerError("Unknown job status code '%s'" % status) 368 369 job_info.append(("Status", status)) 370 371 _ListJobTimestamp("Received", recv_ts, job_info) 372 _ListJobTimestamp("Processing start", start_ts, job_info, prior_ts=recv_ts) 373 _ListJobTimestamp("Processing end", end_ts, job_info, prior_ts=start_ts) 374 375 if end_ts is not None and recv_ts is not None: 376 job_info.append(("Total processing time", "%.6f seconds" % 377 _CalcDelta(recv_ts, end_ts))) 378 else: 379 job_info.append(("Total processing time", "N/A")) 380 381 opcode_container = [] 382 for (opcode, result, status, log, s_ts, x_ts, e_ts) in \ 383 zip(ops, opresult, opstatus, oplog, opstart, opexec, opend): 384 opcode_info = [] 385 opcode_info.append(("Opcode", opcode["OP_ID"])) 386 opcode_info.append(("Status", status)) 387 388 _ListOpcodeTimestamp("Processing start", s_ts, opcode_info) 389 _ListOpcodeTimestamp("Execution start", x_ts, opcode_info) 390 _ListOpcodeTimestamp("Processing end", e_ts, opcode_info) 391 392 opcode_info.append(("Input fields", opcode)) 393 opcode_info.append(("Result", result)) 394 395 exec_log_container = [] 396 for serial, log_ts, log_type, log_msg in log: 397 time_txt = FormatTimestamp(log_ts) 398 encoded = FormatLogMessage(log_type, log_msg) 399 400 # Arranged in this curious way to preserve the brevity for multiple 401 # logs. This content cannot be exposed as a 4-tuple, as time contains 402 # the colon, causing some YAML parsers to fail. 403 exec_log_info = [ 404 ("Time", time_txt), 405 ("Content", (serial, log_type, encoded,)), 406 ] 407 exec_log_container.append(exec_log_info) 408 opcode_info.append(("Execution log", exec_log_container)) 409 410 opcode_container.append(opcode_info) 411 412 job_info.append(("Opcodes", opcode_container)) 413 job_info_container.append(job_info) 414 415 PrintGenericInfo(job_info_container) 416 417 return 0
418 419
420 -def WatchJob(opts, args):
421 """Follow a job and print its output as it arrives. 422 423 @param opts: the command line options selected by the user 424 @type args: list 425 @param args: Contains the job ID 426 @rtype: int 427 @return: the desired exit code 428 429 """ 430 job_id = args[0] 431 432 msg = ("Output from job %s follows" % job_id) 433 ToStdout(msg) 434 ToStdout("-" * len(msg)) 435 436 retcode = 0 437 try: 438 cli.PollJob(job_id) 439 except errors.GenericError, err: 440 (retcode, job_result) = cli.FormatError(err) 441 ToStderr("Job %s failed: %s", job_id, job_result) 442 443 return retcode
444 445
446 -def WaitJob(opts, args):
447 """Wait for a job to finish, not producing any output. 448 449 @param opts: the command line options selected by the user 450 @type args: list 451 @param args: Contains the job ID 452 @rtype: int 453 @return: the desired exit code 454 455 """ 456 job_id = args[0] 457 458 retcode = 0 459 try: 460 cli.PollJob(job_id, feedback_fn=lambda _: None) 461 except errors.GenericError, err: 462 (retcode, job_result) = cli.FormatError(err) 463 ToStderr("Job %s failed: %s", job_id, job_result) 464 465 return retcode
466 467 _KILL_OPT = \ 468 cli_option("--kill", default=False, 469 action="store_true", dest="kill", 470 help="Kill running jobs with SIGKILL") 471 472 _YES_DOIT_OPT = cli_option("--yes-do-it", "--ya-rly", dest="yes_do_it", 473 help="Really use --kill", action="store_true") 474 475 _PENDING_OPT = \ 476 cli_option("--pending", default=None, 477 action="store_const", dest="status_filter", 478 const=constants.JOBS_PENDING, 479 help="Select jobs pending execution or being cancelled") 480 481 _RUNNING_OPT = \ 482 cli_option("--running", default=None, 483 action="store_const", dest="status_filter", 484 const=frozenset([ 485 constants.JOB_STATUS_RUNNING, 486 ]), 487 help="Show jobs currently running only") 488 489 _ERROR_OPT = \ 490 cli_option("--error", default=None, 491 action="store_const", dest="status_filter", 492 const=frozenset([ 493 constants.JOB_STATUS_ERROR, 494 ]), 495 help="Show failed jobs only") 496 497 _FINISHED_OPT = \ 498 cli_option("--finished", default=None, 499 action="store_const", dest="status_filter", 500 const=constants.JOBS_FINALIZED, 501 help="Show finished jobs only") 502 503 _ARCHIVED_OPT = \ 504 cli_option("--archived", default=False, 505 action="store_true", dest="archived", 506 help="Include archived jobs in list (slow and expensive)") 507 508 _QUEUED_OPT = \ 509 cli_option("--queued", default=None, 510 action="store_const", dest="status_filter", 511 const=frozenset([ 512 constants.JOB_STATUS_QUEUED, 513 ]), 514 help="Select queued jobs only") 515 516 _WAITING_OPT = \ 517 cli_option("--waiting", default=None, 518 action="store_const", dest="status_filter", 519 const=frozenset([ 520 constants.JOB_STATUS_WAITING, 521 ]), 522 help="Select waiting jobs only") 523 524 525 commands = { 526 "list": ( 527 ListJobs, [ArgJobId()], 528 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, VERBOSE_OPT, FORCE_FILTER_OPT, 529 _PENDING_OPT, _RUNNING_OPT, _ERROR_OPT, _FINISHED_OPT, _ARCHIVED_OPT], 530 "[job_id ...]", 531 "Lists the jobs and their status. The available fields can be shown" 532 " using the \"list-fields\" command (see the man page for details)." 533 " The default field list is (in order): %s." % 534 utils.CommaJoin(_LIST_DEF_FIELDS)), 535 "list-fields": ( 536 ListJobFields, [ArgUnknown()], 537 [NOHDR_OPT, SEP_OPT], 538 "[fields...]", 539 "Lists all available fields for jobs"), 540 "archive": ( 541 ArchiveJobs, [ArgJobId(min=1)], [], 542 "<job-id> [<job-id> ...]", "Archive specified jobs"), 543 "autoarchive": ( 544 AutoArchiveJobs, 545 [ArgSuggest(min=1, max=1, choices=["1d", "1w", "4w", "all"])], 546 [], 547 "<age>", "Auto archive jobs older than the given age"), 548 "cancel": ( 549 CancelJobs, [ArgJobId()], 550 [FORCE_OPT, _KILL_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT, 551 _YES_DOIT_OPT], 552 "{[--force] [--kill --yes-do-it] {--pending | --queued | --waiting} |" 553 " <job-id> [<job-id> ...]}", 554 "Cancel jobs"), 555 "info": ( 556 ShowJobs, [ArgJobId(min=1)], [], 557 "<job-id> [<job-id> ...]", 558 "Show detailed information about the specified jobs"), 559 "wait": ( 560 WaitJob, [ArgJobId(min=1, max=1)], [], 561 "<job-id>", "Wait for a job to finish"), 562 "watch": ( 563 WatchJob, [ArgJobId(min=1, max=1)], [], 564 "<job-id>", "Follows a job and prints its output as it arrives"), 565 "change-priority": ( 566 ChangePriority, [ArgJobId()], 567 [PRIORITY_OPT, FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT], 568 "--priority <priority> {[--force] {--pending | --queued | --waiting} |" 569 " <job-id> [<job-id> ...]}", 570 "Change the priority of jobs"), 571 } 572 573 574 #: dictionary with aliases for commands 575 aliases = { 576 "show": "info", 577 } 578 579
580 -def Main():
581 return GenericMain(commands, aliases=aliases)
582