1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the job queue handling."""
23
24 import errno
25 import os
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import runtime
30 from ganeti import utils
31 from ganeti import pathutils
32
33
34 JOBS_PER_ARCHIVE_DIRECTORY = 10000
35
36
38 """Reads a file containing a number.
39
40 @rtype: None or int
41 @return: None if file is not found, otherwise number
42
43 """
44 try:
45 contents = utils.ReadFile(file_name)
46 except EnvironmentError, err:
47 if err.errno in (errno.ENOENT, ):
48 return None
49 raise
50
51 try:
52 return int(contents)
53 except (ValueError, TypeError), err:
54
55 raise errors.JobQueueError("Content of file '%s' is not numeric: %s" %
56 (file_name, err))
57
58
66
67
75
76
78 """Open and lock job queue.
79
80 If necessary, the queue is automatically initialized.
81
82 @type must_lock: bool
83 @param must_lock: Whether an exclusive lock must be held.
84 @rtype: utils.FileLock
85 @return: Lock object for the queue. This can be used to change the
86 locking mode.
87
88 """
89 getents = runtime.GetEnts()
90
91
92 queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE)
93 try:
94
95
96 if must_lock:
97 queue_lock.Exclusive(blocking=True)
98 holding_lock = True
99 else:
100 try:
101 queue_lock.Exclusive(blocking=False)
102 holding_lock = True
103 except errors.LockError:
104
105
106 holding_lock = False
107
108 if holding_lock:
109
110 version = ReadVersion()
111 if version is None:
112
113 utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE,
114 uid=getents.masterd_uid, gid=getents.daemons_gid,
115 mode=constants.JOB_QUEUE_FILES_PERMS,
116 data="%s\n" % constants.JOB_QUEUE_VERSION)
117
118
119 version = ReadVersion()
120
121 if version != constants.JOB_QUEUE_VERSION:
122 raise errors.JobQueueError("Found job queue version %s, expected %s",
123 version, constants.JOB_QUEUE_VERSION)
124
125 serial = ReadSerial()
126 if serial is None:
127
128 utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE,
129 uid=getents.masterd_uid, gid=getents.daemons_gid,
130 mode=constants.JOB_QUEUE_FILES_PERMS,
131 data="%s\n" % 0)
132
133
134 serial = ReadSerial()
135
136 if serial is None:
137
138 raise errors.JobQueueError("Can't read/parse the job queue"
139 " serial file")
140
141 if not must_lock:
142
143
144 queue_lock.Unlock()
145
146 except:
147 queue_lock.Close()
148 raise
149
150 return queue_lock
151
152
154 """Check if the queue is marked to be drained.
155
156 This currently uses the queue drain file, which makes it a per-node flag.
157 In the future this can be moved to the config file.
158
159 @rtype: boolean
160 @return: True if the job queue is marked drained
161
162 """
163 return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
164
165
185
186
205
206
208 """Returns the archive directory for a job.
209
210 @type job_id: str
211 @param job_id: Job identifier
212 @rtype: str
213 @return: Directory name
214
215 """
216 return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
217
218
220 """Parses a job ID and converts it to integer.
221
222 """
223 try:
224 return int(job_id)
225 except (ValueError, TypeError):
226 raise errors.ParameterError("Invalid job ID '%s'" % job_id)
227