1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing executing of a job as a separate process
32
33 The complete protocol of initializing a job is described in the haskell
34 module Ganeti.Query.Exec
35 """
36
37 import contextlib
38 import logging
39 import os
40 import signal
41 import sys
42 import time
43
44 from ganeti import mcpu
45 from ganeti.server import masterd
46 from ganeti.rpc import transport
47 from ganeti import utils
48 from ganeti import pathutils
49 from ganeti.utils import livelock
50
51
53 """Retrieves the job id and lock file name from the master process
54
55 This also closes standard input/output
56
57 """
58 logging.debug("Opening transport over stdin/out")
59 with contextlib.closing(transport.FdTransport((0, 1))) as trans:
60 logging.debug("Reading job id from the master process")
61 job_id = int(trans.Call(""))
62 logging.debug("Got job id %d", job_id)
63 logging.debug("Reading the livelock name from the master process")
64 livelock_name = livelock.LiveLockName(trans.Call(""))
65 logging.debug("Got livelock %s", livelock_name)
66 return (job_id, livelock_name)
67
68
70
71 debug = int(os.environ["GNT_DEBUG"])
72
73 logname = pathutils.GetLogFilename("jobs")
74 utils.SetupLogging(logname, "job-startup", debug=debug)
75
76 (job_id, livelock_name) = _GetMasterInfo()
77
78 utils.SetupLogging(logname, "job-%s" % (job_id,), debug=debug)
79
80 exit_code = 1
81 try:
82 logging.debug("Preparing the context and the configuration")
83 context = masterd.GanetiContext(livelock_name)
84
85 logging.debug("Registering signal handlers")
86
87 cancel = [False]
88 prio_change = [False]
89
90 def _TermHandler(signum, _frame):
91 logging.info("Killed by signal %d", signum)
92 cancel[0] = True
93 signal.signal(signal.SIGTERM, _TermHandler)
94
95 def _HupHandler(signum, _frame):
96 logging.debug("Received signal %d, old flag was %s, will set to True",
97 signum, mcpu.sighupReceived)
98 mcpu.sighupReceived[0] = True
99 signal.signal(signal.SIGHUP, _HupHandler)
100
101 def _User1Handler(signum, _frame):
102 logging.info("Received signal %d, indicating priority change", signum)
103 prio_change[0] = True
104 signal.signal(signal.SIGUSR1, _User1Handler)
105
106 logging.debug("Picking up job %d", job_id)
107 context.jobqueue.PickupJob(job_id)
108
109
110 time.sleep(1)
111 while not context.jobqueue.HasJobBeenFinalized(job_id):
112 if cancel[0]:
113 logging.debug("Got cancel request, cancelling job %d", job_id)
114 r = context.jobqueue.CancelJob(job_id)
115 logging.debug("CancelJob result for job %d: %s", job_id, r)
116 cancel[0] = False
117 if prio_change[0]:
118 logging.debug("Received priority-change request")
119 try:
120 fname = os.path.join(pathutils.LUXID_MESSAGE_DIR, "%d.prio" % job_id)
121 new_prio = int(utils.ReadFile(fname))
122 utils.RemoveFile(fname)
123 logging.debug("Changing priority of job %d to %d", job_id, new_prio)
124 r = context.jobqueue.ChangeJobPriority(job_id, new_prio)
125 logging.debug("Result of changing priority of %d to %d: %s", job_id,
126 new_prio, r)
127 except Exception:
128 logging.warning("Informed of priority change, but could not"
129 " read new priority")
130 prio_change[0] = False
131 time.sleep(1)
132
133
134 logging.debug("Waiting for the queue to finish")
135 while context.jobqueue.PrepareShutdown():
136 time.sleep(1)
137 logging.debug("Shutting the queue down")
138 context.jobqueue.Shutdown()
139 exit_code = 0
140 except Exception:
141 logging.exception("Exception when trying to run job %d", job_id)
142 finally:
143 logging.debug("Job %d finalized", job_id)
144 logging.debug("Removing livelock file %s", livelock_name.GetPath())
145 os.remove(livelock_name.GetPath())
146
147 sys.exit(exit_code)
148
149 if __name__ == '__main__':
150 main()
151