1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Base classes for worker pools.
23
24 """
25
26 import logging
27 import threading
28 import heapq
29
30 from ganeti import compat
31 from ganeti import errors
32
33
34 _TERMINATE = object()
35 _DEFAULT_PRIORITY = 0
36
37
39 """Special exception class to defer a task.
40
41 This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
42 task. Optionally, the priority of the task can be changed.
43
44 """
46 """Initializes this class.
47
48 @type priority: number
49 @param priority: New task priority (None means no change)
50
51 """
52 Exception.__init__(self)
53 self.priority = priority
54
55
57 """Base worker class for worker pools.
58
59 Users of a worker pool must override RunTask in a subclass.
60
61 """
62
64 """Constructor for BaseWorker thread.
65
66 @param pool: the parent worker pool
67 @param worker_id: identifier for this worker
68
69 """
70 super(BaseWorker, self).__init__(name=worker_id)
71 self.pool = pool
72 self._worker_id = worker_id
73 self._current_task = None
74
75 assert self.getName() == worker_id
76
89
91 """Returns the priority of the current task.
92
93 Should only be called from within L{RunTask}.
94
95 """
96 self.pool._lock.acquire()
97 try:
98 assert self._HasRunningTaskUnlocked()
99
100 (priority, _, _) = self._current_task
101
102 return priority
103 finally:
104 self.pool._lock.release()
105
107 """Sets the name of the current task.
108
109 Should only be called from within L{RunTask}.
110
111 @type taskname: string
112 @param taskname: Task's name
113
114 """
115 if taskname:
116 name = "%s/%s" % (self._worker_id, taskname)
117 else:
118 name = self._worker_id
119
120
121 self.setName(name)
122
124 """Returns whether this worker is currently running a task.
125
126 """
127 return (self._current_task is not None)
128
130 """Main thread function.
131
132 Waits for new tasks to show up in the queue.
133
134 """
135 pool = self.pool
136
137 while True:
138 assert self._current_task is None
139
140 defer = None
141 try:
142
143 pool._lock.acquire()
144 try:
145 task = pool._WaitForTaskUnlocked(self)
146
147 if task is _TERMINATE:
148
149 break
150
151 if task is None:
152
153 continue
154
155 self._current_task = task
156
157
158 del task
159
160 assert self._HasRunningTaskUnlocked()
161
162 finally:
163 pool._lock.release()
164
165 (priority, _, args) = self._current_task
166 try:
167
168 assert defer is None
169 logging.debug("Starting task %r, priority %s", args, priority)
170 assert self.getName() == self._worker_id
171 try:
172 self.RunTask(*args)
173 finally:
174 self.SetTaskName(None)
175 logging.debug("Done with task %r, priority %s", args, priority)
176 except DeferTask, err:
177 defer = err
178
179 if defer.priority is None:
180
181 defer.priority = priority
182
183 logging.debug("Deferring task %r, new priority %s",
184 args, defer.priority)
185
186 assert self._HasRunningTaskUnlocked()
187 except:
188 logging.exception("Caught unhandled exception")
189
190 assert self._HasRunningTaskUnlocked()
191 finally:
192
193 pool._lock.acquire()
194 try:
195 if defer:
196 assert self._current_task
197
198 (_, _, args) = self._current_task
199 pool._AddTaskUnlocked(args, defer.priority)
200
201 if self._current_task:
202 self._current_task = None
203 pool._worker_to_pool.notifyAll()
204 finally:
205 pool._lock.release()
206
207 assert not self._HasRunningTaskUnlocked()
208
209 logging.debug("Terminates")
210
212 """Function called to start a task.
213
214 This needs to be implemented by child classes.
215
216 """
217 raise NotImplementedError()
218
219
221 """Worker pool with a queue.
222
223 This class is thread-safe.
224
225 Tasks are guaranteed to be started in the order in which they're
226 added to the pool. Due to the nature of threading, they're not
227 guaranteed to finish in the same order.
228
229 """
230 - def __init__(self, name, num_workers, worker_class):
231 """Constructor for worker pool.
232
233 @param num_workers: number of workers to be started
234 (dynamic resizing is not yet implemented)
235 @param worker_class: the class to be instantiated for workers;
236 should derive from L{BaseWorker}
237
238 """
239
240 self._lock = threading.Lock()
241 self._pool_to_pool = threading.Condition(self._lock)
242 self._pool_to_worker = threading.Condition(self._lock)
243 self._worker_to_pool = threading.Condition(self._lock)
244 self._worker_class = worker_class
245 self._name = name
246 self._last_worker_id = 0
247 self._workers = []
248 self._quiescing = False
249 self._active = True
250
251
252 self._termworkers = []
253
254
255 self._counter = 0
256 self._tasks = []
257
258
259 self.Resize(num_workers)
260
261
262
264 """Wait until the worker pool has finished quiescing.
265
266 """
267 while self._quiescing:
268 self._pool_to_pool.wait()
269
271 """Adds a task to the internal queue.
272
273 @type args: sequence
274 @param args: Arguments passed to L{BaseWorker.RunTask}
275 @type priority: number
276 @param priority: Task priority
277
278 """
279 assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
280 assert isinstance(priority, (int, long)), "Priority must be numeric"
281
282
283
284
285 self._counter += 1
286
287 heapq.heappush(self._tasks, (priority, self._counter, args))
288
289
290 self._pool_to_worker.notify()
291
293 """Adds a task to the queue.
294
295 @type args: sequence
296 @param args: arguments passed to L{BaseWorker.RunTask}
297 @type priority: number
298 @param priority: Task priority
299
300 """
301 self._lock.acquire()
302 try:
303 self._WaitWhileQuiescingUnlocked()
304 self._AddTaskUnlocked(args, priority)
305 finally:
306 self._lock.release()
307
309 """Add a list of tasks to the queue.
310
311 @type tasks: list of tuples
312 @param tasks: list of args passed to L{BaseWorker.RunTask}
313 @type priority: number or list of numbers
314 @param priority: Priority for all added tasks or a list with the priority
315 for each task
316
317 """
318 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
319 "Each task must be a sequence"
320
321 assert (isinstance(priority, (int, long)) or
322 compat.all(isinstance(prio, (int, long)) for prio in priority)), \
323 "Priority must be numeric or be a list of numeric values"
324
325 if isinstance(priority, (int, long)):
326 priority = [priority] * len(tasks)
327 elif len(priority) != len(tasks):
328 raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
329 " number of tasks (%s)" %
330 (len(priority), len(tasks)))
331
332 self._lock.acquire()
333 try:
334 self._WaitWhileQuiescingUnlocked()
335
336 assert compat.all(isinstance(prio, (int, long)) for prio in priority)
337 assert len(tasks) == len(priority)
338
339 for args, priority in zip(tasks, priority):
340 self._AddTaskUnlocked(args, priority)
341 finally:
342 self._lock.release()
343
345 """Enable/disable processing of tasks.
346
347 This is different from L{Quiesce} in the sense that this function just
348 changes an internal flag and doesn't wait for the queue to be empty. Tasks
349 already being processed continue normally, but no new tasks will be
350 started. New tasks can still be added.
351
352 @type active: bool
353 @param active: Whether tasks should be processed
354
355 """
356 self._lock.acquire()
357 try:
358 self._active = active
359
360 if active:
361
362 self._pool_to_worker.notifyAll()
363 finally:
364 self._lock.release()
365
367 """Waits for a task for a worker.
368
369 @type worker: L{BaseWorker}
370 @param worker: Worker thread
371
372 """
373 if self._ShouldWorkerTerminateUnlocked(worker):
374 return _TERMINATE
375
376
377 if not (self._active and self._tasks):
378 logging.debug("Waiting for tasks")
379
380 while True:
381
382 self._pool_to_worker.wait()
383
384 logging.debug("Notified while waiting")
385
386
387 if self._ShouldWorkerTerminateUnlocked(worker):
388 return _TERMINATE
389
390
391 if self._active and self._tasks:
392 break
393
394
395 try:
396 return heapq.heappop(self._tasks)
397 finally:
398 self._worker_to_pool.notifyAll()
399
401 """Returns whether a worker should terminate.
402
403 """
404 return (worker in self._termworkers)
405
407 """Checks whether there's a task running in a worker.
408
409 """
410 for worker in self._workers + self._termworkers:
411 if worker._HasRunningTaskUnlocked():
412 return True
413 return False
414
424
426 """Waits until the task queue is empty.
427
428 """
429 self._lock.acquire()
430 try:
431 self._quiescing = True
432
433
434 while self._tasks or self._HasRunningTasksUnlocked():
435 self._worker_to_pool.wait()
436
437 finally:
438 self._quiescing = False
439
440
441 self._pool_to_pool.notifyAll()
442
443 self._lock.release()
444
446 """Return an identifier for a new worker.
447
448 """
449 self._last_worker_id += 1
450
451 return "%s%d" % (self._name, self._last_worker_id)
452
454 """Changes the number of workers.
455
456 """
457 assert num_workers >= 0, "num_workers must be >= 0"
458
459 logging.debug("Resizing to %s workers", num_workers)
460
461 current_count = len(self._workers)
462
463 if current_count == num_workers:
464
465 pass
466
467 elif current_count > num_workers:
468 if num_workers == 0:
469
470 termworkers = self._workers[:]
471 del self._workers[:]
472 else:
473
474 raise NotImplementedError()
475
476
477 self._termworkers += termworkers
478
479
480 self._pool_to_worker.notifyAll()
481
482
483 self._lock.release()
484 try:
485 for worker in termworkers:
486 logging.debug("Waiting for thread %s", worker.getName())
487 worker.join()
488 finally:
489 self._lock.acquire()
490
491
492
493
494 for worker in termworkers:
495 assert worker in self._termworkers, ("Worker not in list of"
496 " terminating workers")
497 if not worker.isAlive():
498 self._termworkers.remove(worker)
499
500 assert not self._termworkers, "Zombie worker detected"
501
502 elif current_count < num_workers:
503
504 for _ in range(num_workers - current_count):
505 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
506 self._workers.append(worker)
507 worker.start()
508
509 - def Resize(self, num_workers):
510 """Changes the number of workers in the pool.
511
512 @param num_workers: the new number of workers
513
514 """
515 self._lock.acquire()
516 try:
517 return self._ResizeUnlocked(num_workers)
518 finally:
519 self._lock.release()
520
522 """Terminate all worker threads.
523
524 Unstarted tasks will be ignored.
525
526 """
527 logging.debug("Terminating all workers")
528
529 self._lock.acquire()
530 try:
531 self._ResizeUnlocked(0)
532
533 if self._tasks:
534 logging.debug("There are %s tasks left", len(self._tasks))
535 finally:
536 self._lock.release()
537
538 logging.debug("All workers terminated")
539