1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Base classes for worker pools.
23
24 """
25
26 import collections
27 import logging
28 import threading
29
30
32 """Base worker class for worker pools.
33
34 Users of a worker pool must override RunTask in a subclass.
35
36 """
37
39 """Constructor for BaseWorker thread.
40
41 @param pool: the parent worker pool
42 @param worker_id: identifier for this worker
43
44 """
45 super(BaseWorker, self).__init__(name=worker_id)
46 self.pool = pool
47 self._current_task = None
48
50 """Returns whether a worker should terminate.
51
52 """
53 return self.pool.ShouldWorkerTerminate(self)
54
56 """Returns whether this worker is currently running a task.
57
58 """
59 return (self._current_task is not None)
60
62 """Returns whether this worker is currently running a task.
63
64 """
65 self.pool._lock.acquire()
66 try:
67 return self._HasRunningTaskUnlocked()
68 finally:
69 self.pool._lock.release()
70
72 """Main thread function.
73
74 Waits for new tasks to show up in the queue.
75
76 """
77 pool = self.pool
78
79 assert not self.HasRunningTask()
80
81 while True:
82 try:
83
84 pool._lock.acquire()
85 try:
86 if pool._ShouldWorkerTerminateUnlocked(self):
87 break
88
89
90 if not pool._tasks:
91 logging.debug("Waiting for tasks")
92
93
94 pool._pool_to_worker.wait()
95
96 logging.debug("Notified while waiting")
97
98
99 if pool._ShouldWorkerTerminateUnlocked(self):
100 break
101
102 if not pool._tasks:
103
104 continue
105
106
107 try:
108 self._current_task = pool._tasks.popleft()
109 finally:
110 pool._worker_to_pool.notifyAll()
111 finally:
112 pool._lock.release()
113
114
115 try:
116 logging.debug("Starting task %r", self._current_task)
117 self.RunTask(*self._current_task)
118 logging.debug("Done with task %r", self._current_task)
119 except:
120 logging.exception("Caught unhandled exception")
121 finally:
122
123 pool._lock.acquire()
124 try:
125 if self._current_task:
126 self._current_task = None
127 pool._worker_to_pool.notifyAll()
128 finally:
129 pool._lock.release()
130
131 logging.debug("Terminates")
132
134 """Function called to start a task.
135
136 This needs to be implemented by child classes.
137
138 """
139 raise NotImplementedError()
140
141
143 """Worker pool with a queue.
144
145 This class is thread-safe.
146
147 Tasks are guaranteed to be started in the order in which they're
148 added to the pool. Due to the nature of threading, they're not
149 guaranteed to finish in the same order.
150
151 """
152 - def __init__(self, name, num_workers, worker_class):
153 """Constructor for worker pool.
154
155 @param num_workers: number of workers to be started
156 (dynamic resizing is not yet implemented)
157 @param worker_class: the class to be instantiated for workers;
158 should derive from L{BaseWorker}
159
160 """
161
162 self._lock = threading.Lock()
163 self._pool_to_pool = threading.Condition(self._lock)
164 self._pool_to_worker = threading.Condition(self._lock)
165 self._worker_to_pool = threading.Condition(self._lock)
166 self._worker_class = worker_class
167 self._name = name
168 self._last_worker_id = 0
169 self._workers = []
170 self._quiescing = False
171
172
173 self._termworkers = []
174
175
176 self._tasks = collections.deque()
177
178
179 self.Resize(num_workers)
180
181
182
184 """Adds a task to the queue.
185
186 @param args: arguments passed to L{BaseWorker.RunTask}
187
188 """
189 self._lock.acquire()
190 try:
191
192 while self._quiescing:
193 self._pool_to_pool.wait()
194
195
196 self._tasks.append(args)
197
198
199 self._pool_to_worker.notify()
200 finally:
201 self._lock.release()
202
204 """Returns whether a worker should terminate.
205
206 """
207 return (worker in self._termworkers)
208
218
220 """Checks whether there's a task running in a worker.
221
222 """
223 for worker in self._workers + self._termworkers:
224 if worker._HasRunningTaskUnlocked():
225 return True
226 return False
227
229 """Waits until the task queue is empty.
230
231 """
232 self._lock.acquire()
233 try:
234 self._quiescing = True
235
236
237 while self._tasks or self._HasRunningTasksUnlocked():
238 self._worker_to_pool.wait()
239
240 finally:
241 self._quiescing = False
242
243
244 self._pool_to_pool.notifyAll()
245
246 self._lock.release()
247
249 """Return an identifier for a new worker.
250
251 """
252 self._last_worker_id += 1
253
254 return "%s%d" % (self._name, self._last_worker_id)
255
257 """Changes the number of workers.
258
259 """
260 assert num_workers >= 0, "num_workers must be >= 0"
261
262 logging.debug("Resizing to %s workers", num_workers)
263
264 current_count = len(self._workers)
265
266 if current_count == num_workers:
267
268 pass
269
270 elif current_count > num_workers:
271 if num_workers == 0:
272
273 termworkers = self._workers[:]
274 del self._workers[:]
275 else:
276
277 raise NotImplementedError()
278
279
280 self._termworkers += termworkers
281
282
283 self._pool_to_worker.notifyAll()
284
285
286 self._lock.release()
287 try:
288 for worker in termworkers:
289 logging.debug("Waiting for thread %s", worker.getName())
290 worker.join()
291 finally:
292 self._lock.acquire()
293
294
295
296
297 for worker in termworkers:
298 assert worker in self._termworkers, ("Worker not in list of"
299 " terminating workers")
300 if not worker.isAlive():
301 self._termworkers.remove(worker)
302
303 assert not self._termworkers, "Zombie worker detected"
304
305 elif current_count < num_workers:
306
307 for _ in range(num_workers - current_count):
308 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
309 self._workers.append(worker)
310 worker.start()
311
312 - def Resize(self, num_workers):
313 """Changes the number of workers in the pool.
314
315 @param num_workers: the new number of workers
316
317 """
318 self._lock.acquire()
319 try:
320 return self._ResizeUnlocked(num_workers)
321 finally:
322 self._lock.release()
323
325 """Terminate all worker threads.
326
327 Unstarted tasks will be ignored.
328
329 """
330 logging.debug("Terminating all workers")
331
332 self._lock.acquire()
333 try:
334 self._ResizeUnlocked(0)
335
336 if self._tasks:
337 logging.debug("There are %s tasks left", len(self._tasks))
338 finally:
339 self._lock.release()
340
341 logging.debug("All workers terminated")
342