1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing executing of a job as a separate process
32
33 The complete protocol of initializing a job is described in the haskell
34 module Ganeti.Query.Exec
35 """
36
37 import contextlib
38 import logging
39 import os
40 import signal
41 import sys
42 import time
43
44 from ganeti import mcpu
45 from ganeti.server import masterd
46 from ganeti.rpc import transport
47 from ganeti import serializer
48 from ganeti import utils
49 from ganeti import pathutils
50 from ganeti.utils import livelock
51
52 from ganeti.jqueue import _JobProcessor
53
54
56 """Retrieve job id, lock file name and secret params from the master process
57
58 This also closes standard input/output
59
60 @rtype: (int, string, json encoding of a list of dicts)
61
62 """
63 logging.debug("Opening transport over stdin/out")
64 with contextlib.closing(transport.FdTransport((0, 1))) as trans:
65 logging.debug("Reading job id from the master process")
66 job_id = int(trans.Call(""))
67 logging.debug("Got job id %d", job_id)
68 logging.debug("Reading the livelock name from the master process")
69 livelock_name = livelock.LiveLockName(trans.Call(""))
70 logging.debug("Got livelock %s", livelock_name)
71 logging.debug("Reading secret parameters from the master process")
72 secret_params = trans.Call("")
73 logging.debug("Got secret parameters.")
74 return (job_id, livelock_name, secret_params)
75
76
78 """Wrap private values in JSON decoded structure.
79
80 @param json: the json-decoded value to protect.
81
82 """
83 result = []
84
85 for secrets_dict in json:
86 if secrets_dict is None:
87 data = serializer.PrivateDict()
88 else:
89 data = serializer.PrivateDict(secrets_dict)
90 result.append(data)
91 return result
92
93
95
96 debug = int(os.environ["GNT_DEBUG"])
97
98 logname = pathutils.GetLogFilename("jobs")
99 utils.SetupLogging(logname, "job-startup", debug=debug)
100
101 (job_id, livelock_name, secret_params_serialized) = _GetMasterInfo()
102
103 secret_params = ""
104 if secret_params_serialized:
105 secret_params_json = serializer.LoadJson(secret_params_serialized)
106 secret_params = RestorePrivateValueWrapping(secret_params_json)
107
108 utils.SetupLogging(logname, "job-%s" % (job_id,), debug=debug)
109
110 try:
111 logging.debug("Preparing the context and the configuration")
112 context = masterd.GanetiContext(livelock_name)
113
114 logging.debug("Registering signal handlers")
115
116 cancel = [False]
117 prio_change = [False]
118
119 def _TermHandler(signum, _frame):
120 logging.info("Killed by signal %d", signum)
121 cancel[0] = True
122 signal.signal(signal.SIGTERM, _TermHandler)
123
124 def _HupHandler(signum, _frame):
125 logging.debug("Received signal %d, old flag was %s, will set to True",
126 signum, mcpu.sighupReceived)
127 mcpu.sighupReceived[0] = True
128 signal.signal(signal.SIGHUP, _HupHandler)
129
130 def _User1Handler(signum, _frame):
131 logging.info("Received signal %d, indicating priority change", signum)
132 prio_change[0] = True
133 signal.signal(signal.SIGUSR1, _User1Handler)
134
135 job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
136
137 job.SetPid(os.getpid())
138
139 if secret_params:
140 for i in range(0, len(secret_params)):
141 if hasattr(job.ops[i].input, "osparams_secret"):
142 job.ops[i].input.osparams_secret = secret_params[i]
143
144 execfun = mcpu.Processor(context, job_id, job_id).ExecOpCode
145 proc = _JobProcessor(context.jobqueue, execfun, job)
146 result = _JobProcessor.DEFER
147 while result != _JobProcessor.FINISHED:
148 result = proc()
149 if result == _JobProcessor.WAITDEP and not cancel[0]:
150
151
152 logging.warning("Got started despite a dependency not yet finished")
153 time.sleep(5)
154 if cancel[0]:
155 logging.debug("Got cancel request, cancelling job %d", job_id)
156 r = context.jobqueue.CancelJob(job_id)
157 job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
158 proc = _JobProcessor(context.jobqueue, execfun, job)
159 logging.debug("CancelJob result for job %d: %s", job_id, r)
160 cancel[0] = False
161 if prio_change[0]:
162 logging.debug("Received priority-change request")
163 try:
164 fname = os.path.join(pathutils.LUXID_MESSAGE_DIR, "%d.prio" % job_id)
165 new_prio = int(utils.ReadFile(fname))
166 utils.RemoveFile(fname)
167 logging.debug("Changing priority of job %d to %d", job_id, new_prio)
168 r = context.jobqueue.ChangeJobPriority(job_id, new_prio)
169 job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
170 proc = _JobProcessor(context.jobqueue, execfun, job)
171 logging.debug("Result of changing priority of %d to %d: %s", job_id,
172 new_prio, r)
173 except Exception:
174 logging.warning("Informed of priority change, but could not"
175 " read new priority")
176 prio_change[0] = False
177
178 except Exception:
179 logging.exception("Exception when trying to run job %d", job_id)
180 finally:
181 logging.debug("Job %d finalized", job_id)
182 logging.debug("Removing livelock file %s", livelock_name.GetPath())
183 os.remove(livelock_name.GetPath())
184
185 sys.exit(0)
186
187 if __name__ == '__main__':
188 main()
189