1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Base classes for worker pools.
23
24 """
25
26 import collections
27 import logging
28 import threading
29
30 from ganeti import compat
31
32
33 _TERMINATE = object()
34
35
37 """Base worker class for worker pools.
38
39 Users of a worker pool must override RunTask in a subclass.
40
41 """
42
44 """Constructor for BaseWorker thread.
45
46 @param pool: the parent worker pool
47 @param worker_id: identifier for this worker
48
49 """
50 super(BaseWorker, self).__init__(name=worker_id)
51 self.pool = pool
52 self._worker_id = worker_id
53 self._current_task = None
54
55 assert self.getName() == worker_id
56
69
71 """Sets the name of the current task.
72
73 Should only be called from within L{RunTask}.
74
75 @type taskname: string
76 @param taskname: Task's name
77
78 """
79 if taskname:
80 name = "%s/%s" % (self._worker_id, taskname)
81 else:
82 name = self._worker_id
83
84
85 self.setName(name)
86
88 """Returns whether this worker is currently running a task.
89
90 """
91 return (self._current_task is not None)
92
94 """Main thread function.
95
96 Waits for new tasks to show up in the queue.
97
98 """
99 pool = self.pool
100
101 while True:
102 assert self._current_task is None
103 try:
104
105 pool._lock.acquire()
106 try:
107 task = pool._WaitForTaskUnlocked(self)
108
109 if task is _TERMINATE:
110
111 break
112
113 if task is None:
114
115 continue
116
117 self._current_task = task
118
119
120 del task
121
122 assert self._HasRunningTaskUnlocked()
123
124 finally:
125 pool._lock.release()
126
127
128 try:
129 logging.debug("Starting task %r", self._current_task)
130 assert self.getName() == self._worker_id
131 try:
132 self.RunTask(*self._current_task)
133 finally:
134 self.SetTaskName(None)
135 logging.debug("Done with task %r", self._current_task)
136 except:
137 logging.exception("Caught unhandled exception")
138
139 assert self._HasRunningTaskUnlocked()
140 finally:
141
142 pool._lock.acquire()
143 try:
144 if self._current_task:
145 self._current_task = None
146 pool._worker_to_pool.notifyAll()
147 finally:
148 pool._lock.release()
149
150 assert not self._HasRunningTaskUnlocked()
151
152 logging.debug("Terminates")
153
155 """Function called to start a task.
156
157 This needs to be implemented by child classes.
158
159 """
160 raise NotImplementedError()
161
162
164 """Worker pool with a queue.
165
166 This class is thread-safe.
167
168 Tasks are guaranteed to be started in the order in which they're
169 added to the pool. Due to the nature of threading, they're not
170 guaranteed to finish in the same order.
171
172 """
173 - def __init__(self, name, num_workers, worker_class):
174 """Constructor for worker pool.
175
176 @param num_workers: number of workers to be started
177 (dynamic resizing is not yet implemented)
178 @param worker_class: the class to be instantiated for workers;
179 should derive from L{BaseWorker}
180
181 """
182
183 self._lock = threading.Lock()
184 self._pool_to_pool = threading.Condition(self._lock)
185 self._pool_to_worker = threading.Condition(self._lock)
186 self._worker_to_pool = threading.Condition(self._lock)
187 self._worker_class = worker_class
188 self._name = name
189 self._last_worker_id = 0
190 self._workers = []
191 self._quiescing = False
192
193
194 self._termworkers = []
195
196
197 self._tasks = collections.deque()
198
199
200 self.Resize(num_workers)
201
202
203
205 """Wait until the worker pool has finished quiescing.
206
207 """
208 while self._quiescing:
209 self._pool_to_pool.wait()
210
212 assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
213
214 self._tasks.append(args)
215
216
217 self._pool_to_worker.notify()
218
232
234 """Add a list of tasks to the queue.
235
236 @type tasks: list of tuples
237 @param tasks: list of args passed to L{BaseWorker.RunTask}
238
239 """
240 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
241 "Each task must be a sequence"
242
243 self._lock.acquire()
244 try:
245 self._WaitWhileQuiescingUnlocked()
246
247 for args in tasks:
248 self._AddTaskUnlocked(args)
249 finally:
250 self._lock.release()
251
253 """Waits for a task for a worker.
254
255 @type worker: L{BaseWorker}
256 @param worker: Worker thread
257
258 """
259 if self._ShouldWorkerTerminateUnlocked(worker):
260 return _TERMINATE
261
262
263 if not self._tasks:
264 logging.debug("Waiting for tasks")
265
266
267 self._pool_to_worker.wait()
268
269 logging.debug("Notified while waiting")
270
271
272 if self._ShouldWorkerTerminateUnlocked(worker):
273 return _TERMINATE
274
275 if not self._tasks:
276
277 return None
278
279
280 try:
281 return self._tasks.popleft()
282 finally:
283 self._worker_to_pool.notifyAll()
284
286 """Returns whether a worker should terminate.
287
288 """
289 return (worker in self._termworkers)
290
292 """Checks whether there's a task running in a worker.
293
294 """
295 for worker in self._workers + self._termworkers:
296 if worker._HasRunningTaskUnlocked():
297 return True
298 return False
299
301 """Waits until the task queue is empty.
302
303 """
304 self._lock.acquire()
305 try:
306 self._quiescing = True
307
308
309 while self._tasks or self._HasRunningTasksUnlocked():
310 self._worker_to_pool.wait()
311
312 finally:
313 self._quiescing = False
314
315
316 self._pool_to_pool.notifyAll()
317
318 self._lock.release()
319
321 """Return an identifier for a new worker.
322
323 """
324 self._last_worker_id += 1
325
326 return "%s%d" % (self._name, self._last_worker_id)
327
329 """Changes the number of workers.
330
331 """
332 assert num_workers >= 0, "num_workers must be >= 0"
333
334 logging.debug("Resizing to %s workers", num_workers)
335
336 current_count = len(self._workers)
337
338 if current_count == num_workers:
339
340 pass
341
342 elif current_count > num_workers:
343 if num_workers == 0:
344
345 termworkers = self._workers[:]
346 del self._workers[:]
347 else:
348
349 raise NotImplementedError()
350
351
352 self._termworkers += termworkers
353
354
355 self._pool_to_worker.notifyAll()
356
357
358 self._lock.release()
359 try:
360 for worker in termworkers:
361 logging.debug("Waiting for thread %s", worker.getName())
362 worker.join()
363 finally:
364 self._lock.acquire()
365
366
367
368
369 for worker in termworkers:
370 assert worker in self._termworkers, ("Worker not in list of"
371 " terminating workers")
372 if not worker.isAlive():
373 self._termworkers.remove(worker)
374
375 assert not self._termworkers, "Zombie worker detected"
376
377 elif current_count < num_workers:
378
379 for _ in range(num_workers - current_count):
380 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
381 self._workers.append(worker)
382 worker.start()
383
384 - def Resize(self, num_workers):
385 """Changes the number of workers in the pool.
386
387 @param num_workers: the new number of workers
388
389 """
390 self._lock.acquire()
391 try:
392 return self._ResizeUnlocked(num_workers)
393 finally:
394 self._lock.release()
395
397 """Terminate all worker threads.
398
399 Unstarted tasks will be ignored.
400
401 """
402 logging.debug("Terminating all workers")
403
404 self._lock.acquire()
405 try:
406 self._ResizeUnlocked(0)
407
408 if self._tasks:
409 logging.debug("There are %s tasks left", len(self._tasks))
410 finally:
411 self._lock.release()
412
413 logging.debug("All workers terminated")
414