Package ganeti :: Package client :: Module gnt_job
[hide private]
[frames] | no frames]

Source Code for Module ganeti.client.gnt_job

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30  """Job related commands""" 
 31   
 32  # pylint: disable=W0401,W0613,W0614,C0103 
 33  # W0401: Wildcard import ganeti.cli 
 34  # W0613: Unused argument, since all functions follow the same API 
 35  # W0614: Unused import %s from wildcard import (since we need cli) 
 36  # C0103: Invalid name gnt-job 
 37   
 38  from ganeti.cli import * 
 39  from ganeti import constants 
 40  from ganeti import errors 
 41  from ganeti import utils 
 42  from ganeti import cli 
 43  from ganeti import qlang 
 44   
 45   
 46  #: default list of fields for L{ListJobs} 
 47  _LIST_DEF_FIELDS = ["id", "status", "summary"] 
 48   
 49  #: map converting the job status contants to user-visible 
 50  #: names 
 51  _USER_JOB_STATUS = { 
 52    constants.JOB_STATUS_QUEUED: "queued", 
 53    constants.JOB_STATUS_WAITING: "waiting", 
 54    constants.JOB_STATUS_CANCELING: "canceling", 
 55    constants.JOB_STATUS_RUNNING: "running", 
 56    constants.JOB_STATUS_CANCELED: "canceled", 
 57    constants.JOB_STATUS_SUCCESS: "success", 
 58    constants.JOB_STATUS_ERROR: "error", 
 59    } 
 60   
 61   
62 -def _FormatStatus(value):
63 """Formats a job status. 64 65 """ 66 try: 67 return _USER_JOB_STATUS[value] 68 except KeyError: 69 raise errors.ProgrammerError("Unknown job status code '%s'" % value)
70 71
72 -def _FormatSummary(value):
73 """Formats a job's summary. Takes possible non-ascii encoding into account. 74 75 """ 76 return ','.encode('utf-8').join(item.encode('utf-8') for item in value)
77 78 79 _JOB_LIST_FORMAT = { 80 "status": (_FormatStatus, False), 81 "summary": (_FormatSummary, False), 82 } 83 _JOB_LIST_FORMAT.update(dict.fromkeys(["opstart", "opexec", "opend"], 84 (lambda value: map(FormatTimestamp, 85 value), 86 None))) 87 88
89 -def _ParseJobIds(args):
90 """Parses a list of string job IDs into integers. 91 92 @param args: list of strings 93 @return: list of integers 94 @raise OpPrereqError: in case of invalid values 95 96 """ 97 try: 98 return [int(a) for a in args] 99 except (ValueError, TypeError), err: 100 raise errors.OpPrereqError("Invalid job ID passed: %s" % err, 101 errors.ECODE_INVAL)
102 103
104 -def ListJobs(opts, args):
105 """List the jobs 106 107 @param opts: the command line options selected by the user 108 @type args: list 109 @param args: should be an empty list 110 @rtype: int 111 @return: the desired exit code 112 113 """ 114 selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS) 115 116 if opts.archived and "archived" not in selected_fields: 117 selected_fields.append("archived") 118 119 qfilter = qlang.MakeSimpleFilter("status", opts.status_filter) 120 121 cl = GetClient() 122 123 return GenericList(constants.QR_JOB, selected_fields, args, None, 124 opts.separator, not opts.no_headers, 125 format_override=_JOB_LIST_FORMAT, verbose=opts.verbose, 126 force_filter=opts.force_filter, namefield="id", 127 qfilter=qfilter, isnumeric=True, cl=cl)
128 129
130 -def ListJobFields(opts, args):
131 """List job fields. 132 133 @param opts: the command line options selected by the user 134 @type args: list 135 @param args: fields to list, or empty for all 136 @rtype: int 137 @return: the desired exit code 138 139 """ 140 cl = GetClient() 141 142 return GenericListFields(constants.QR_JOB, args, opts.separator, 143 not opts.no_headers, cl=cl)
144 145
146 -def ArchiveJobs(opts, args):
147 """Archive jobs. 148 149 @param opts: the command line options selected by the user 150 @type args: list 151 @param args: should contain the job IDs to be archived 152 @rtype: int 153 @return: the desired exit code 154 155 """ 156 client = GetClient() 157 158 rcode = 0 159 for job_id in args: 160 if not client.ArchiveJob(job_id): 161 ToStderr("Failed to archive job with ID '%s'", job_id) 162 rcode = 1 163 164 return rcode
165 166
167 -def AutoArchiveJobs(opts, args):
168 """Archive jobs based on age. 169 170 This will archive jobs based on their age, or all jobs if a 'all' is 171 passed. 172 173 @param opts: the command line options selected by the user 174 @type args: list 175 @param args: should contain only one element, the age as a time spec 176 that can be parsed by L{ganeti.cli.ParseTimespec} or the 177 keyword I{all}, which will cause all jobs to be archived 178 @rtype: int 179 @return: the desired exit code 180 181 """ 182 client = GetClient() 183 184 age = args[0] 185 186 if age == "all": 187 age = -1 188 else: 189 age = ParseTimespec(age) 190 191 (archived_count, jobs_left) = client.AutoArchiveJobs(age) 192 ToStdout("Archived %s jobs, %s unchecked left", archived_count, jobs_left) 193 194 return 0
195 196
197 -def _MultiJobAction(opts, args, cl, stdout_fn, ask_fn, question, action_fn):
198 """Applies a function to multipe jobs. 199 200 @param opts: Command line options 201 @type args: list 202 @param args: Job IDs 203 @rtype: int 204 @return: Exit code 205 206 """ 207 if cl is None: 208 cl = GetClient() 209 210 if stdout_fn is None: 211 stdout_fn = ToStdout 212 213 if ask_fn is None: 214 ask_fn = AskUser 215 216 result = constants.EXIT_SUCCESS 217 218 if bool(args) ^ (opts.status_filter is None): 219 raise errors.OpPrereqError("Either a status filter or job ID(s) must be" 220 " specified and never both", errors.ECODE_INVAL) 221 222 if opts.status_filter is not None: 223 response = cl.Query(constants.QR_JOB, ["id", "status", "summary"], 224 qlang.MakeSimpleFilter("status", opts.status_filter)) 225 226 jobs = [i for ((_, i), _, _) in response.data] 227 if not jobs: 228 raise errors.OpPrereqError("No jobs with the requested status have been" 229 " found", errors.ECODE_STATE) 230 231 if not opts.force: 232 (_, table) = FormatQueryResult(response, header=True, 233 format_override=_JOB_LIST_FORMAT) 234 for line in table: 235 stdout_fn(line) 236 237 if not ask_fn(question): 238 return constants.EXIT_CONFIRMATION 239 else: 240 jobs = args 241 242 for job_id in jobs: 243 (success, msg) = action_fn(cl, job_id) 244 245 if not success: 246 result = constants.EXIT_FAILURE 247 248 stdout_fn(msg) 249 250 return result
251 252
253 -def CancelJobs(opts, args, cl=None, _stdout_fn=ToStdout, _ask_fn=AskUser):
254 """Cancel not-yet-started jobs. 255 256 @param opts: the command line options selected by the user 257 @type args: list 258 @param args: should contain the job IDs to be cancelled 259 @rtype: int 260 @return: the desired exit code 261 262 """ 263 return _MultiJobAction(opts, args, cl, _stdout_fn, _ask_fn, 264 "Cancel job(s) listed above?", 265 lambda cl, job_id: cl.CancelJob(job_id))
266 267
268 -def ChangePriority(opts, args):
269 """Change priority of jobs. 270 271 @param opts: Command line options 272 @type args: list 273 @param args: Job IDs 274 @rtype: int 275 @return: Exit code 276 277 """ 278 if opts.priority is None: 279 ToStderr("--priority option must be given.") 280 return constants.EXIT_FAILURE 281 282 return _MultiJobAction(opts, args, None, None, None, 283 "Change priority of job(s) listed above?", 284 lambda cl, job_id: 285 cl.ChangeJobPriority(job_id, opts.priority))
286 287
288 -def _ListOpcodeTimestamp(name, ts, container):
289 """ Adds the opcode timestamp to the given container. 290 291 """ 292 if isinstance(ts, (tuple, list)): 293 container.append((name, FormatTimestamp(ts), "opcode_timestamp")) 294 else: 295 container.append((name, "N/A", "opcode_timestamp"))
296 297
298 -def _CalcDelta(from_ts, to_ts):
299 """ Calculates the delta between two timestamps. 300 301 """ 302 return to_ts[0] - from_ts[0] + (to_ts[1] - from_ts[1]) / 1000000.0
303 304
305 -def _ListJobTimestamp(name, ts, container, prior_ts=None):
306 """ Adds the job timestamp to the given container. 307 308 @param prior_ts: The timestamp used to calculate the amount of time that 309 passed since the given timestamp. 310 311 """ 312 if ts is not None: 313 delta = "" 314 if prior_ts is not None: 315 delta = " (delta %.6fs)" % _CalcDelta(prior_ts, ts) 316 output = "%s%s" % (FormatTimestamp(ts), delta) 317 container.append((name, output, "job_timestamp")) 318 else: 319 container.append((name, "unknown (%s)" % str(ts), "job_timestamp"))
320 321
322 -def ShowJobs(opts, args):
323 """Show detailed information about jobs. 324 325 @param opts: the command line options selected by the user 326 @type args: list 327 @param args: should contain the job IDs to be queried 328 @rtype: int 329 @return: the desired exit code 330 331 """ 332 selected_fields = [ 333 "id", "status", "ops", "opresult", "opstatus", "oplog", 334 "opstart", "opexec", "opend", "received_ts", "start_ts", "end_ts", 335 ] 336 337 qfilter = qlang.MakeSimpleFilter("id", _ParseJobIds(args)) 338 cl = GetClient() 339 result = cl.Query(constants.QR_JOB, selected_fields, qfilter).data 340 341 job_info_container = [] 342 343 for entry in result: 344 ((_, job_id), (rs_status, status), (_, ops), (_, opresult), (_, opstatus), 345 (_, oplog), (_, opstart), (_, opexec), (_, opend), (_, recv_ts), 346 (_, start_ts), (_, end_ts)) = entry 347 348 # Detect non-normal results 349 if rs_status != constants.RS_NORMAL: 350 job_info_container.append("Job ID %s not found" % job_id) 351 continue 352 353 # Container for produced data 354 job_info = [("Job ID", job_id)] 355 356 if status in _USER_JOB_STATUS: 357 status = _USER_JOB_STATUS[status] 358 else: 359 raise errors.ProgrammerError("Unknown job status code '%s'" % status) 360 361 job_info.append(("Status", status)) 362 363 _ListJobTimestamp("Received", recv_ts, job_info) 364 _ListJobTimestamp("Processing start", start_ts, job_info, prior_ts=recv_ts) 365 _ListJobTimestamp("Processing end", end_ts, job_info, prior_ts=start_ts) 366 367 if end_ts is not None and recv_ts is not None: 368 job_info.append(("Total processing time", "%.6f seconds" % 369 _CalcDelta(recv_ts, end_ts))) 370 else: 371 job_info.append(("Total processing time", "N/A")) 372 373 opcode_container = [] 374 for (opcode, result, status, log, s_ts, x_ts, e_ts) in \ 375 zip(ops, opresult, opstatus, oplog, opstart, opexec, opend): 376 opcode_info = [] 377 opcode_info.append(("Opcode", opcode["OP_ID"])) 378 opcode_info.append(("Status", status)) 379 380 _ListOpcodeTimestamp("Processing start", s_ts, opcode_info) 381 _ListOpcodeTimestamp("Execution start", x_ts, opcode_info) 382 _ListOpcodeTimestamp("Processing end", e_ts, opcode_info) 383 384 opcode_info.append(("Input fields", opcode)) 385 opcode_info.append(("Result", result)) 386 387 exec_log_container = [] 388 for serial, log_ts, log_type, log_msg in log: 389 time_txt = FormatTimestamp(log_ts) 390 encoded = FormatLogMessage(log_type, log_msg) 391 392 # Arranged in this curious way to preserve the brevity for multiple 393 # logs. This content cannot be exposed as a 4-tuple, as time contains 394 # the colon, causing some YAML parsers to fail. 395 exec_log_info = [ 396 ("Time", time_txt), 397 ("Content", (serial, log_type, encoded,)), 398 ] 399 exec_log_container.append(exec_log_info) 400 opcode_info.append(("Execution log", exec_log_container)) 401 402 opcode_container.append(opcode_info) 403 404 job_info.append(("Opcodes", opcode_container)) 405 job_info_container.append(job_info) 406 407 PrintGenericInfo(job_info_container) 408 409 return 0
410 411
412 -def WatchJob(opts, args):
413 """Follow a job and print its output as it arrives. 414 415 @param opts: the command line options selected by the user 416 @type args: list 417 @param args: Contains the job ID 418 @rtype: int 419 @return: the desired exit code 420 421 """ 422 job_id = args[0] 423 424 msg = ("Output from job %s follows" % job_id) 425 ToStdout(msg) 426 ToStdout("-" * len(msg)) 427 428 retcode = 0 429 try: 430 cli.PollJob(job_id) 431 except errors.GenericError, err: 432 (retcode, job_result) = cli.FormatError(err) 433 ToStderr("Job %s failed: %s", job_id, job_result) 434 435 return retcode
436 437
438 -def WaitJob(opts, args):
439 """Wait for a job to finish, not producing any output. 440 441 @param opts: the command line options selected by the user 442 @type args: list 443 @param args: Contains the job ID 444 @rtype: int 445 @return: the desired exit code 446 447 """ 448 job_id = args[0] 449 450 retcode = 0 451 try: 452 cli.PollJob(job_id, feedback_fn=lambda _: None) 453 except errors.GenericError, err: 454 (retcode, job_result) = cli.FormatError(err) 455 ToStderr("Job %s failed: %s", job_id, job_result) 456 457 return retcode
458 459 460 _PENDING_OPT = \ 461 cli_option("--pending", default=None, 462 action="store_const", dest="status_filter", 463 const=constants.JOBS_PENDING, 464 help="Select jobs pending execution or being cancelled") 465 466 _RUNNING_OPT = \ 467 cli_option("--running", default=None, 468 action="store_const", dest="status_filter", 469 const=frozenset([ 470 constants.JOB_STATUS_RUNNING, 471 ]), 472 help="Show jobs currently running only") 473 474 _ERROR_OPT = \ 475 cli_option("--error", default=None, 476 action="store_const", dest="status_filter", 477 const=frozenset([ 478 constants.JOB_STATUS_ERROR, 479 ]), 480 help="Show failed jobs only") 481 482 _FINISHED_OPT = \ 483 cli_option("--finished", default=None, 484 action="store_const", dest="status_filter", 485 const=constants.JOBS_FINALIZED, 486 help="Show finished jobs only") 487 488 _ARCHIVED_OPT = \ 489 cli_option("--archived", default=False, 490 action="store_true", dest="archived", 491 help="Include archived jobs in list (slow and expensive)") 492 493 _QUEUED_OPT = \ 494 cli_option("--queued", default=None, 495 action="store_const", dest="status_filter", 496 const=frozenset([ 497 constants.JOB_STATUS_QUEUED, 498 ]), 499 help="Select queued jobs only") 500 501 _WAITING_OPT = \ 502 cli_option("--waiting", default=None, 503 action="store_const", dest="status_filter", 504 const=frozenset([ 505 constants.JOB_STATUS_WAITING, 506 ]), 507 help="Select waiting jobs only") 508 509 510 commands = { 511 "list": ( 512 ListJobs, [ArgJobId()], 513 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, VERBOSE_OPT, FORCE_FILTER_OPT, 514 _PENDING_OPT, _RUNNING_OPT, _ERROR_OPT, _FINISHED_OPT, _ARCHIVED_OPT], 515 "[job_id ...]", 516 "Lists the jobs and their status. The available fields can be shown" 517 " using the \"list-fields\" command (see the man page for details)." 518 " The default field list is (in order): %s." % 519 utils.CommaJoin(_LIST_DEF_FIELDS)), 520 "list-fields": ( 521 ListJobFields, [ArgUnknown()], 522 [NOHDR_OPT, SEP_OPT], 523 "[fields...]", 524 "Lists all available fields for jobs"), 525 "archive": ( 526 ArchiveJobs, [ArgJobId(min=1)], [], 527 "<job-id> [<job-id> ...]", "Archive specified jobs"), 528 "autoarchive": ( 529 AutoArchiveJobs, 530 [ArgSuggest(min=1, max=1, choices=["1d", "1w", "4w", "all"])], 531 [], 532 "<age>", "Auto archive jobs older than the given age"), 533 "cancel": ( 534 CancelJobs, [ArgJobId()], 535 [FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT], 536 "{[--force] {--pending | --queued | --waiting} |" 537 " <job-id> [<job-id> ...]}", 538 "Cancel jobs"), 539 "info": ( 540 ShowJobs, [ArgJobId(min=1)], [], 541 "<job-id> [<job-id> ...]", 542 "Show detailed information about the specified jobs"), 543 "wait": ( 544 WaitJob, [ArgJobId(min=1, max=1)], [], 545 "<job-id>", "Wait for a job to finish"), 546 "watch": ( 547 WatchJob, [ArgJobId(min=1, max=1)], [], 548 "<job-id>", "Follows a job and prints its output as it arrives"), 549 "change-priority": ( 550 ChangePriority, [ArgJobId()], 551 [PRIORITY_OPT, FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT], 552 "--priority <priority> {[--force] {--pending | --queued | --waiting} |" 553 " <job-id> [<job-id> ...]}", 554 "Change the priority of jobs"), 555 } 556 557 558 #: dictionary with aliases for commands 559 aliases = { 560 "show": "info", 561 } 562 563
564 -def Main():
565 return GenericMain(commands, aliases=aliases)
566