Package ganeti :: Package jqueue :: Module exec
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jqueue.exec

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module implementing executing of a job as a separate process 
 32   
 33  The complete protocol of initializing a job is described in the haskell 
 34  module Ganeti.Query.Exec 
 35  """ 
 36   
 37  import contextlib 
 38  import logging 
 39  import os 
 40  import signal 
 41  import sys 
 42  import time 
 43   
 44  from ganeti import mcpu 
 45  from ganeti.server import masterd 
 46  from ganeti.rpc import transport 
 47  from ganeti import serializer 
 48  from ganeti import utils 
 49  from ganeti import pathutils 
 50  from ganeti.utils import livelock 
 51   
 52  from ganeti.jqueue import _JobProcessor 
 53   
 54   
55 -def _GetMasterInfo():
56 """Retrieve job id, lock file name and secret params from the master process 57 58 This also closes standard input/output 59 60 @rtype: (int, string, json encoding of a list of dicts) 61 62 """ 63 logging.debug("Opening transport over stdin/out") 64 with contextlib.closing(transport.FdTransport((0, 1))) as trans: 65 logging.debug("Reading job id from the master process") 66 job_id = int(trans.Call("")) 67 logging.debug("Got job id %d", job_id) 68 logging.debug("Reading the livelock name from the master process") 69 livelock_name = livelock.LiveLockName(trans.Call("")) 70 logging.debug("Got livelock %s", livelock_name) 71 logging.debug("Reading secret parameters from the master process") 72 secret_params = trans.Call("") 73 logging.debug("Got secret parameters.") 74 return (job_id, livelock_name, secret_params)
75 76
77 -def RestorePrivateValueWrapping(json):
78 """Wrap private values in JSON decoded structure. 79 80 @param json: the json-decoded value to protect. 81 82 """ 83 result = [] 84 85 for secrets_dict in json: 86 if secrets_dict is None: 87 data = serializer.PrivateDict() 88 else: 89 data = serializer.PrivateDict(secrets_dict) 90 result.append(data) 91 return result
92 93
94 -def main():
95 96 debug = int(os.environ["GNT_DEBUG"]) 97 98 logname = pathutils.GetLogFilename("jobs") 99 utils.SetupLogging(logname, "job-startup", debug=debug) 100 101 (job_id, livelock_name, secret_params_serialized) = _GetMasterInfo() 102 103 secret_params = "" 104 if secret_params_serialized: 105 secret_params_json = serializer.LoadJson(secret_params_serialized) 106 secret_params = RestorePrivateValueWrapping(secret_params_json) 107 108 utils.SetupLogging(logname, "job-%s" % (job_id,), debug=debug) 109 110 try: 111 logging.debug("Preparing the context and the configuration") 112 context = masterd.GanetiContext(livelock_name) 113 114 logging.debug("Registering signal handlers") 115 116 cancel = [False] 117 prio_change = [False] 118 119 def _TermHandler(signum, _frame): 120 logging.info("Killed by signal %d", signum) 121 cancel[0] = True
122 signal.signal(signal.SIGTERM, _TermHandler) 123 124 def _HupHandler(signum, _frame): 125 logging.debug("Received signal %d, old flag was %s, will set to True", 126 signum, mcpu.sighupReceived) 127 mcpu.sighupReceived[0] = True 128 signal.signal(signal.SIGHUP, _HupHandler) 129 130 def _User1Handler(signum, _frame): 131 logging.info("Received signal %d, indicating priority change", signum) 132 prio_change[0] = True 133 signal.signal(signal.SIGUSR1, _User1Handler) 134 135 job = context.jobqueue.SafeLoadJobFromDisk(job_id, False) 136 137 job.SetPid(os.getpid()) 138 139 if secret_params: 140 for i in range(0, len(secret_params)): 141 if hasattr(job.ops[i].input, "osparams_secret"): 142 job.ops[i].input.osparams_secret = secret_params[i] 143 144 execfun = mcpu.Processor(context, job_id, job_id).ExecOpCode 145 proc = _JobProcessor(context.jobqueue, execfun, job) 146 result = _JobProcessor.DEFER 147 while result != _JobProcessor.FINISHED: 148 result = proc() 149 if result == _JobProcessor.WAITDEP and not cancel[0]: 150 # Normally, the scheduler should avoid starting a job where the 151 # dependencies are not yet finalised. So warn, but wait an continue. 152 logging.warning("Got started despite a dependency not yet finished") 153 time.sleep(5) 154 if cancel[0]: 155 logging.debug("Got cancel request, cancelling job %d", job_id) 156 r = context.jobqueue.CancelJob(job_id) 157 job = context.jobqueue.SafeLoadJobFromDisk(job_id, False) 158 proc = _JobProcessor(context.jobqueue, execfun, job) 159 logging.debug("CancelJob result for job %d: %s", job_id, r) 160 cancel[0] = False 161 if prio_change[0]: 162 logging.debug("Received priority-change request") 163 try: 164 fname = os.path.join(pathutils.LUXID_MESSAGE_DIR, "%d.prio" % job_id) 165 new_prio = int(utils.ReadFile(fname)) 166 utils.RemoveFile(fname) 167 logging.debug("Changing priority of job %d to %d", job_id, new_prio) 168 r = context.jobqueue.ChangeJobPriority(job_id, new_prio) 169 job = context.jobqueue.SafeLoadJobFromDisk(job_id, False) 170 proc = _JobProcessor(context.jobqueue, execfun, job) 171 logging.debug("Result of changing priority of %d to %d: %s", job_id, 172 new_prio, r) 173 except Exception: # pylint: disable=W0703 174 logging.warning("Informed of priority change, but could not" 175 " read new priority") 176 prio_change[0] = False 177 178 except Exception: # pylint: disable=W0703 179 logging.exception("Exception when trying to run job %d", job_id) 180 finally: 181 logging.debug("Job %d finalized", job_id) 182 logging.debug("Removing livelock file %s", livelock_name.GetPath()) 183 os.remove(livelock_name.GetPath()) 184 185 sys.exit(0) 186 187 if __name__ == '__main__': 188 main() 189