1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the job queue handling."""
23
24 import errno
25 import os
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import runtime
30 from ganeti import utils
31
32
34 """Reads a file containing a number.
35
36 @rtype: None or int
37 @return: None if file is not found, otherwise number
38
39 """
40 try:
41 return int(utils.ReadFile(file_name))
42 except EnvironmentError, err:
43 if err.errno in (errno.ENOENT, ):
44 return None
45 raise
46
47
55
56
64
65
67 """Open and lock job queue.
68
69 If necessary, the queue is automatically initialized.
70
71 @type must_lock: bool
72 @param must_lock: Whether an exclusive lock must be held.
73 @rtype: utils.FileLock
74 @return: Lock object for the queue. This can be used to change the
75 locking mode.
76
77 """
78 getents = runtime.GetEnts()
79
80
81 queue_lock = utils.FileLock.Open(constants.JOB_QUEUE_LOCK_FILE)
82 try:
83
84
85 if must_lock:
86 queue_lock.Exclusive(blocking=True)
87 holding_lock = True
88 else:
89 try:
90 queue_lock.Exclusive(blocking=False)
91 holding_lock = True
92 except errors.LockError:
93
94
95 holding_lock = False
96
97 if holding_lock:
98
99 version = ReadVersion()
100 if version is None:
101
102 utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
103 uid=getents.masterd_uid, gid=getents.masterd_gid,
104 data="%s\n" % constants.JOB_QUEUE_VERSION)
105
106
107 version = ReadVersion()
108
109 if version != constants.JOB_QUEUE_VERSION:
110 raise errors.JobQueueError("Found job queue version %s, expected %s",
111 version, constants.JOB_QUEUE_VERSION)
112
113 serial = ReadSerial()
114 if serial is None:
115
116 utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
117 uid=getents.masterd_uid, gid=getents.masterd_gid,
118 data="%s\n" % 0)
119
120
121 serial = ReadSerial()
122
123 if serial is None:
124
125 raise errors.JobQueueError("Can't read/parse the job queue"
126 " serial file")
127
128 if not must_lock:
129
130
131 queue_lock.Unlock()
132
133 except:
134 queue_lock.Close()
135 raise
136
137 return queue_lock
138
139
141 """Check if the queue is marked to be drained.
142
143 This currently uses the queue drain file, which makes it a per-node flag.
144 In the future this can be moved to the config file.
145
146 @rtype: boolean
147 @return: True if the job queue is marked drained
148
149 """
150 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
151
152
171