1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Base classes for worker pools.
23
24 """
25
26 import logging
27 import threading
28 import heapq
29
30 from ganeti import compat
31 from ganeti import errors
32
33
34 _TERMINATE = object()
35 _DEFAULT_PRIORITY = 0
36
37
39 """Special exception class to defer a task.
40
41 This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
42 task. Optionally, the priority of the task can be changed.
43
44 """
46 """Initializes this class.
47
48 @type priority: number
49 @param priority: New task priority (None means no change)
50
51 """
52 Exception.__init__(self)
53 self.priority = priority
54
55
57 """Base worker class for worker pools.
58
59 Users of a worker pool must override RunTask in a subclass.
60
61 """
62
64 """Constructor for BaseWorker thread.
65
66 @param pool: the parent worker pool
67 @param worker_id: identifier for this worker
68
69 """
70 super(BaseWorker, self).__init__(name=worker_id)
71 self.pool = pool
72 self._worker_id = worker_id
73 self._current_task = None
74
75 assert self.getName() == worker_id
76
89
91 """Returns the priority of the current task.
92
93 Should only be called from within L{RunTask}.
94
95 """
96 self.pool._lock.acquire()
97 try:
98 assert self._HasRunningTaskUnlocked()
99
100 (priority, _, _) = self._current_task
101
102 return priority
103 finally:
104 self.pool._lock.release()
105
107 """Sets the name of the current task.
108
109 Should only be called from within L{RunTask}.
110
111 @type taskname: string
112 @param taskname: Task's name
113
114 """
115 if taskname:
116 name = "%s/%s" % (self._worker_id, taskname)
117 else:
118 name = self._worker_id
119
120
121 self.setName(name)
122
124 """Returns whether this worker is currently running a task.
125
126 """
127 return (self._current_task is not None)
128
130 """Main thread function.
131
132 Waits for new tasks to show up in the queue.
133
134 """
135 pool = self.pool
136
137 while True:
138 assert self._current_task is None
139
140 defer = None
141 try:
142
143 pool._lock.acquire()
144 try:
145 task = pool._WaitForTaskUnlocked(self)
146
147 if task is _TERMINATE:
148
149 break
150
151 if task is None:
152
153 continue
154
155 self._current_task = task
156
157
158 del task
159
160 assert self._HasRunningTaskUnlocked()
161
162 finally:
163 pool._lock.release()
164
165 (priority, _, args) = self._current_task
166 try:
167
168 assert defer is None
169 logging.debug("Starting task %r, priority %s", args, priority)
170 assert self.getName() == self._worker_id
171 try:
172 self.RunTask(*args)
173 finally:
174 self.SetTaskName(None)
175 logging.debug("Done with task %r, priority %s", args, priority)
176 except DeferTask, err:
177 defer = err
178
179 if defer.priority is None:
180
181 defer.priority = priority
182
183 logging.debug("Deferring task %r, new priority %s",
184 args, defer.priority)
185
186 assert self._HasRunningTaskUnlocked()
187 except:
188 logging.exception("Caught unhandled exception")
189
190 assert self._HasRunningTaskUnlocked()
191 finally:
192
193 pool._lock.acquire()
194 try:
195 if defer:
196 assert self._current_task
197
198 (_, _, args) = self._current_task
199 pool._AddTaskUnlocked(args, defer.priority)
200
201 if self._current_task:
202 self._current_task = None
203 pool._worker_to_pool.notifyAll()
204 finally:
205 pool._lock.release()
206
207 assert not self._HasRunningTaskUnlocked()
208
209 logging.debug("Terminates")
210
212 """Function called to start a task.
213
214 This needs to be implemented by child classes.
215
216 """
217 raise NotImplementedError()
218
219
221 """Worker pool with a queue.
222
223 This class is thread-safe.
224
225 Tasks are guaranteed to be started in the order in which they're
226 added to the pool. Due to the nature of threading, they're not
227 guaranteed to finish in the same order.
228
229 """
230 - def __init__(self, name, num_workers, worker_class):
231 """Constructor for worker pool.
232
233 @param num_workers: number of workers to be started
234 (dynamic resizing is not yet implemented)
235 @param worker_class: the class to be instantiated for workers;
236 should derive from L{BaseWorker}
237
238 """
239
240 self._lock = threading.Lock()
241 self._pool_to_pool = threading.Condition(self._lock)
242 self._pool_to_worker = threading.Condition(self._lock)
243 self._worker_to_pool = threading.Condition(self._lock)
244 self._worker_class = worker_class
245 self._name = name
246 self._last_worker_id = 0
247 self._workers = []
248 self._quiescing = False
249
250
251 self._termworkers = []
252
253
254 self._counter = 0
255 self._tasks = []
256
257
258 self.Resize(num_workers)
259
260
261
263 """Wait until the worker pool has finished quiescing.
264
265 """
266 while self._quiescing:
267 self._pool_to_pool.wait()
268
270 """Adds a task to the internal queue.
271
272 @type args: sequence
273 @param args: Arguments passed to L{BaseWorker.RunTask}
274 @type priority: number
275 @param priority: Task priority
276
277 """
278 assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
279 assert isinstance(priority, (int, long)), "Priority must be numeric"
280
281
282
283
284 self._counter += 1
285
286 heapq.heappush(self._tasks, (priority, self._counter, args))
287
288
289 self._pool_to_worker.notify()
290
292 """Adds a task to the queue.
293
294 @type args: sequence
295 @param args: arguments passed to L{BaseWorker.RunTask}
296 @type priority: number
297 @param priority: Task priority
298
299 """
300 self._lock.acquire()
301 try:
302 self._WaitWhileQuiescingUnlocked()
303 self._AddTaskUnlocked(args, priority)
304 finally:
305 self._lock.release()
306
308 """Add a list of tasks to the queue.
309
310 @type tasks: list of tuples
311 @param tasks: list of args passed to L{BaseWorker.RunTask}
312 @type priority: number or list of numbers
313 @param priority: Priority for all added tasks or a list with the priority
314 for each task
315
316 """
317 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
318 "Each task must be a sequence"
319
320 assert (isinstance(priority, (int, long)) or
321 compat.all(isinstance(prio, (int, long)) for prio in priority)), \
322 "Priority must be numeric or be a list of numeric values"
323
324 if isinstance(priority, (int, long)):
325 priority = [priority] * len(tasks)
326 elif len(priority) != len(tasks):
327 raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
328 " number of tasks (%s)" %
329 (len(priority), len(tasks)))
330
331 self._lock.acquire()
332 try:
333 self._WaitWhileQuiescingUnlocked()
334
335 assert compat.all(isinstance(prio, (int, long)) for prio in priority)
336 assert len(tasks) == len(priority)
337
338 for args, priority in zip(tasks, priority):
339 self._AddTaskUnlocked(args, priority)
340 finally:
341 self._lock.release()
342
344 """Waits for a task for a worker.
345
346 @type worker: L{BaseWorker}
347 @param worker: Worker thread
348
349 """
350 if self._ShouldWorkerTerminateUnlocked(worker):
351 return _TERMINATE
352
353
354 if not self._tasks:
355 logging.debug("Waiting for tasks")
356
357
358 self._pool_to_worker.wait()
359
360 logging.debug("Notified while waiting")
361
362
363 if self._ShouldWorkerTerminateUnlocked(worker):
364 return _TERMINATE
365
366 if not self._tasks:
367
368 return None
369
370
371 try:
372 return heapq.heappop(self._tasks)
373 finally:
374 self._worker_to_pool.notifyAll()
375
377 """Returns whether a worker should terminate.
378
379 """
380 return (worker in self._termworkers)
381
383 """Checks whether there's a task running in a worker.
384
385 """
386 for worker in self._workers + self._termworkers:
387 if worker._HasRunningTaskUnlocked():
388 return True
389 return False
390
392 """Waits until the task queue is empty.
393
394 """
395 self._lock.acquire()
396 try:
397 self._quiescing = True
398
399
400 while self._tasks or self._HasRunningTasksUnlocked():
401 self._worker_to_pool.wait()
402
403 finally:
404 self._quiescing = False
405
406
407 self._pool_to_pool.notifyAll()
408
409 self._lock.release()
410
412 """Return an identifier for a new worker.
413
414 """
415 self._last_worker_id += 1
416
417 return "%s%d" % (self._name, self._last_worker_id)
418
420 """Changes the number of workers.
421
422 """
423 assert num_workers >= 0, "num_workers must be >= 0"
424
425 logging.debug("Resizing to %s workers", num_workers)
426
427 current_count = len(self._workers)
428
429 if current_count == num_workers:
430
431 pass
432
433 elif current_count > num_workers:
434 if num_workers == 0:
435
436 termworkers = self._workers[:]
437 del self._workers[:]
438 else:
439
440 raise NotImplementedError()
441
442
443 self._termworkers += termworkers
444
445
446 self._pool_to_worker.notifyAll()
447
448
449 self._lock.release()
450 try:
451 for worker in termworkers:
452 logging.debug("Waiting for thread %s", worker.getName())
453 worker.join()
454 finally:
455 self._lock.acquire()
456
457
458
459
460 for worker in termworkers:
461 assert worker in self._termworkers, ("Worker not in list of"
462 " terminating workers")
463 if not worker.isAlive():
464 self._termworkers.remove(worker)
465
466 assert not self._termworkers, "Zombie worker detected"
467
468 elif current_count < num_workers:
469
470 for _ in range(num_workers - current_count):
471 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
472 self._workers.append(worker)
473 worker.start()
474
475 - def Resize(self, num_workers):
476 """Changes the number of workers in the pool.
477
478 @param num_workers: the new number of workers
479
480 """
481 self._lock.acquire()
482 try:
483 return self._ResizeUnlocked(num_workers)
484 finally:
485 self._lock.release()
486
488 """Terminate all worker threads.
489
490 Unstarted tasks will be ignored.
491
492 """
493 logging.debug("Terminating all workers")
494
495 self._lock.acquire()
496 try:
497 self._ResizeUnlocked(0)
498
499 if self._tasks:
500 logging.debug("There are %s tasks left", len(self._tasks))
501 finally:
502 self._lock.release()
503
504 logging.debug("All workers terminated")
505