Package ganeti :: Package jqueue :: Module exec
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jqueue.exec

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module implementing executing of a job as a separate process 
 32   
 33  The complete protocol of initializing a job is described in the haskell 
 34  module Ganeti.Query.Exec 
 35  """ 
 36   
 37  import contextlib 
 38  import logging 
 39  import os 
 40  import signal 
 41  import sys 
 42  import time 
 43   
 44  from ganeti import mcpu 
 45  from ganeti.server import masterd 
 46  from ganeti.rpc import transport 
 47  from ganeti import utils 
 48  from ganeti import pathutils 
 49  from ganeti.utils import livelock 
 50   
 51   
52 -def _GetMasterInfo():
53 """Retrieves the job id and lock file name from the master process 54 55 This also closes standard input/output 56 57 """ 58 logging.debug("Opening transport over stdin/out") 59 with contextlib.closing(transport.FdTransport((0, 1))) as trans: 60 logging.debug("Reading job id from the master process") 61 job_id = int(trans.Call("")) 62 logging.debug("Got job id %d", job_id) 63 logging.debug("Reading the livelock name from the master process") 64 livelock_name = livelock.LiveLockName(trans.Call("")) 65 logging.debug("Got livelock %s", livelock_name) 66 return (job_id, livelock_name)
67 68
69 -def main():
70 71 debug = int(os.environ["GNT_DEBUG"]) 72 73 logname = pathutils.GetLogFilename("jobs") 74 utils.SetupLogging(logname, "job-startup", debug=debug) 75 76 (job_id, livelock_name) = _GetMasterInfo() 77 78 utils.SetupLogging(logname, "job-%s" % (job_id,), debug=debug) 79 80 exit_code = 1 81 try: 82 logging.debug("Preparing the context and the configuration") 83 context = masterd.GanetiContext(livelock_name) 84 85 logging.debug("Registering signal handlers") 86 87 cancel = [False] 88 prio_change = [False] 89 90 def _TermHandler(signum, _frame): 91 logging.info("Killed by signal %d", signum) 92 cancel[0] = True
93 signal.signal(signal.SIGTERM, _TermHandler) 94 95 def _HupHandler(signum, _frame): 96 logging.debug("Received signal %d, old flag was %s, will set to True", 97 signum, mcpu.sighupReceived) 98 mcpu.sighupReceived[0] = True 99 signal.signal(signal.SIGHUP, _HupHandler) 100 101 def _User1Handler(signum, _frame): 102 logging.info("Received signal %d, indicating priority change", signum) 103 prio_change[0] = True 104 signal.signal(signal.SIGUSR1, _User1Handler) 105 106 logging.debug("Picking up job %d", job_id) 107 context.jobqueue.PickupJob(job_id) 108 109 # waiting for the job to finish 110 time.sleep(1) 111 while not context.jobqueue.HasJobBeenFinalized(job_id): 112 if cancel[0]: 113 logging.debug("Got cancel request, cancelling job %d", job_id) 114 r = context.jobqueue.CancelJob(job_id) 115 logging.debug("CancelJob result for job %d: %s", job_id, r) 116 cancel[0] = False 117 if prio_change[0]: 118 logging.debug("Received priority-change request") 119 try: 120 fname = os.path.join(pathutils.LUXID_MESSAGE_DIR, "%d.prio" % job_id) 121 new_prio = int(utils.ReadFile(fname)) 122 utils.RemoveFile(fname) 123 logging.debug("Changing priority of job %d to %d", job_id, new_prio) 124 r = context.jobqueue.ChangeJobPriority(job_id, new_prio) 125 logging.debug("Result of changing priority of %d to %d: %s", job_id, 126 new_prio, r) 127 except Exception: # pylint: disable=W0703 128 logging.warning("Informed of priority change, but could not" 129 " read new priority") 130 prio_change[0] = False 131 time.sleep(1) 132 133 # wait until the queue finishes 134 logging.debug("Waiting for the queue to finish") 135 while context.jobqueue.PrepareShutdown(): 136 time.sleep(1) 137 logging.debug("Shutting the queue down") 138 context.jobqueue.Shutdown() 139 exit_code = 0 140 except Exception: # pylint: disable=W0703 141 logging.exception("Exception when trying to run job %d", job_id) 142 finally: 143 logging.debug("Job %d finalized", job_id) 144 logging.debug("Removing livelock file %s", livelock_name.GetPath()) 145 os.remove(livelock_name.GetPath()) 146 147 sys.exit(exit_code) 148 149 if __name__ == '__main__': 150 main() 151