1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the job queue handling."""
23
24 import errno
25
26 from ganeti import constants
27 from ganeti import errors
28 from ganeti import runtime
29 from ganeti import utils
30
31
33 """Reads a file containing a number.
34
35 @rtype: None or int
36 @return: None if file is not found, otherwise number
37
38 """
39 try:
40 return int(utils.ReadFile(file_name))
41 except EnvironmentError, err:
42 if err.errno in (errno.ENOENT, ):
43 return None
44 raise
45
46
54
55
63
64
66 """Open and lock job queue.
67
68 If necessary, the queue is automatically initialized.
69
70 @type must_lock: bool
71 @param must_lock: Whether an exclusive lock must be held.
72 @rtype: utils.FileLock
73 @return: Lock object for the queue. This can be used to change the
74 locking mode.
75
76 """
77 getents = runtime.GetEnts()
78
79
80 queue_lock = utils.FileLock.Open(constants.JOB_QUEUE_LOCK_FILE)
81 try:
82
83
84 if must_lock:
85 queue_lock.Exclusive(blocking=True)
86 holding_lock = True
87 else:
88 try:
89 queue_lock.Exclusive(blocking=False)
90 holding_lock = True
91 except errors.LockError:
92
93
94 holding_lock = False
95
96 if holding_lock:
97
98 version = ReadVersion()
99 if version is None:
100
101 utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
102 uid=getents.masterd_uid, gid=getents.masterd_gid,
103 data="%s\n" % constants.JOB_QUEUE_VERSION)
104
105
106 version = ReadVersion()
107
108 if version != constants.JOB_QUEUE_VERSION:
109 raise errors.JobQueueError("Found job queue version %s, expected %s",
110 version, constants.JOB_QUEUE_VERSION)
111
112 serial = ReadSerial()
113 if serial is None:
114
115 utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
116 uid=getents.masterd_uid, gid=getents.masterd_gid,
117 data="%s\n" % 0)
118
119
120 serial = ReadSerial()
121
122 if serial is None:
123
124 raise errors.JobQueueError("Can't read/parse the job queue"
125 " serial file")
126
127 if not must_lock:
128
129
130 queue_lock.Unlock()
131
132 except:
133 queue_lock.Close()
134 raise
135
136 return queue_lock
137