1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Base classes for worker pools.
32
33 """
34
35 import logging
36 import threading
37 import heapq
38 import itertools
39
40 from ganeti import compat
41 from ganeti import errors
42
43
44 _TERMINATE = object()
45 _DEFAULT_PRIORITY = 0
46
47
49 """Special exception class to defer a task.
50
51 This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
52 task. Optionally, the priority of the task can be changed.
53
54 """
56 """Initializes this class.
57
58 @type priority: number
59 @param priority: New task priority (None means no change)
60
61 """
62 Exception.__init__(self)
63 self.priority = priority
64
65
67 """Exception raised when a task can't be found.
68
69 """
70
71
73 """Base worker class for worker pools.
74
75 Users of a worker pool must override RunTask in a subclass.
76
77 """
78
80 """Constructor for BaseWorker thread.
81
82 @param pool: the parent worker pool
83 @param worker_id: identifier for this worker
84
85 """
86 super(BaseWorker, self).__init__(name=worker_id)
87 self.pool = pool
88 self._worker_id = worker_id
89 self._current_task = None
90
91 assert self.getName() == worker_id
92
105
107 """Returns the priority of the current task.
108
109 Should only be called from within L{RunTask}.
110
111 """
112 self.pool._lock.acquire()
113 try:
114 assert self._HasRunningTaskUnlocked()
115
116 (priority, _, _, _) = self._current_task
117
118 return priority
119 finally:
120 self.pool._lock.release()
121
123 """Sets the name of the current task.
124
125 Should only be called from within L{RunTask}.
126
127 @type taskname: string
128 @param taskname: Task's name
129
130 """
131 if taskname:
132 name = "%s/%s" % (self._worker_id, taskname)
133 else:
134 name = self._worker_id
135
136
137 self.setName(name)
138
140 """Returns whether this worker is currently running a task.
141
142 """
143 return (self._current_task is not None)
144
146 """Returns the order and task ID of the current task.
147
148 Should only be called from within L{RunTask}.
149
150 """
151 self.pool._lock.acquire()
152 try:
153 assert self._HasRunningTaskUnlocked()
154
155 (_, order_id, task_id, _) = self._current_task
156
157 return (order_id, task_id)
158 finally:
159 self.pool._lock.release()
160
162 """Main thread function.
163
164 Waits for new tasks to show up in the queue.
165
166 """
167 pool = self.pool
168
169 while True:
170 assert self._current_task is None
171
172 defer = None
173 try:
174
175 pool._lock.acquire()
176 try:
177 task = pool._WaitForTaskUnlocked(self)
178
179 if task is _TERMINATE:
180
181 break
182
183 if task is None:
184
185 continue
186
187 self._current_task = task
188
189
190 del task
191
192 assert self._HasRunningTaskUnlocked()
193
194 finally:
195 pool._lock.release()
196
197 (priority, _, _, args) = self._current_task
198 try:
199
200 assert defer is None
201 logging.debug("Starting task %r, priority %s", args, priority)
202 assert self.getName() == self._worker_id
203 try:
204 self.RunTask(*args)
205 finally:
206 self.SetTaskName(None)
207 logging.debug("Done with task %r, priority %s", args, priority)
208 except DeferTask, err:
209 defer = err
210
211 if defer.priority is None:
212
213 defer.priority = priority
214
215 logging.debug("Deferring task %r, new priority %s",
216 args, defer.priority)
217
218 assert self._HasRunningTaskUnlocked()
219 except:
220 logging.exception("Caught unhandled exception")
221
222 assert self._HasRunningTaskUnlocked()
223 finally:
224
225 pool._lock.acquire()
226 try:
227 if defer:
228 assert self._current_task
229
230 (_, _, task_id, args) = self._current_task
231 pool._AddTaskUnlocked(args, defer.priority, task_id)
232
233 if self._current_task:
234 self._current_task = None
235 pool._worker_to_pool.notifyAll()
236 finally:
237 pool._lock.release()
238
239 assert not self._HasRunningTaskUnlocked()
240
241 logging.debug("Terminates")
242
244 """Function called to start a task.
245
246 This needs to be implemented by child classes.
247
248 """
249 raise NotImplementedError()
250
251
253 """Worker pool with a queue.
254
255 This class is thread-safe.
256
257 Tasks are guaranteed to be started in the order in which they're
258 added to the pool. Due to the nature of threading, they're not
259 guaranteed to finish in the same order.
260
261 @type _tasks: list of tuples
262 @ivar _tasks: Each tuple has the format (priority, order ID, task ID,
263 arguments). Priority and order ID are numeric and essentially control the
264 sort order. The order ID is an increasing number denoting the order in
265 which tasks are added to the queue. The task ID is controlled by user of
266 workerpool, see L{AddTask} for details. The task arguments are C{None} for
267 abandoned tasks, otherwise a sequence of arguments to be passed to
268 L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by
269 the C{heapq} module).
270 @type _taskdata: dict; (task IDs as keys, tuples as values)
271 @ivar _taskdata: Mapping from task IDs to entries in L{_tasks}
272
273 """
274 - def __init__(self, name, num_workers, worker_class):
275 """Constructor for worker pool.
276
277 @param num_workers: number of workers to be started
278 (dynamic resizing is not yet implemented)
279 @param worker_class: the class to be instantiated for workers;
280 should derive from L{BaseWorker}
281
282 """
283
284 self._lock = threading.Lock()
285 self._pool_to_pool = threading.Condition(self._lock)
286 self._pool_to_worker = threading.Condition(self._lock)
287 self._worker_to_pool = threading.Condition(self._lock)
288 self._worker_class = worker_class
289 self._name = name
290 self._last_worker_id = 0
291 self._workers = []
292 self._quiescing = False
293
294
295 self._termworkers = []
296
297
298 self._counter = itertools.count()
299 self._tasks = []
300 self._taskdata = {}
301
302
303 self.Resize(num_workers)
304
305
306
308 """Wait until the worker pool has finished quiescing.
309
310 """
311 while self._quiescing:
312 self._pool_to_pool.wait()
313
315 """Adds a task to the internal queue.
316
317 @type args: sequence
318 @param args: Arguments passed to L{BaseWorker.RunTask}
319 @type priority: number
320 @param priority: Task priority
321 @param task_id: Task ID
322
323 """
324 assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
325 assert isinstance(priority, (int, long)), "Priority must be numeric"
326 assert task_id is None or isinstance(task_id, (int, long)), \
327 "Task ID must be numeric or None"
328
329 task = [priority, self._counter.next(), task_id, args]
330
331 if task_id is not None:
332 assert task_id not in self._taskdata
333
334 self._taskdata[task_id] = task
335
336
337
338 heapq.heappush(self._tasks, task)
339
340
341 self._pool_to_worker.notify()
342
344 """Adds a task to the queue.
345
346 @type args: sequence
347 @param args: arguments passed to L{BaseWorker.RunTask}
348 @type priority: number
349 @param priority: Task priority
350 @param task_id: Task ID
351 @note: The task ID can be essentially anything that can be used as a
352 dictionary key. Callers, however, must ensure a task ID is unique while a
353 task is in the pool or while it might return to the pool due to deferring
354 using L{DeferTask}.
355
356 """
357 self._lock.acquire()
358 try:
359 self._WaitWhileQuiescingUnlocked()
360 self._AddTaskUnlocked(args, priority, task_id)
361 finally:
362 self._lock.release()
363
365 """Add a list of tasks to the queue.
366
367 @type tasks: list of tuples
368 @param tasks: list of args passed to L{BaseWorker.RunTask}
369 @type priority: number or list of numbers
370 @param priority: Priority for all added tasks or a list with the priority
371 for each task
372 @type task_id: list
373 @param task_id: List with the ID for each task
374 @note: See L{AddTask} for a note on task IDs.
375
376 """
377 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
378 "Each task must be a sequence"
379 assert (isinstance(priority, (int, long)) or
380 compat.all(isinstance(prio, (int, long)) for prio in priority)), \
381 "Priority must be numeric or be a list of numeric values"
382 assert task_id is None or isinstance(task_id, (tuple, list)), \
383 "Task IDs must be in a sequence"
384
385 if isinstance(priority, (int, long)):
386 priority = [priority] * len(tasks)
387 elif len(priority) != len(tasks):
388 raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
389 " number of tasks (%s)" %
390 (len(priority), len(tasks)))
391
392 if task_id is None:
393 task_id = [None] * len(tasks)
394 elif len(task_id) != len(tasks):
395 raise errors.ProgrammerError("Number of task IDs (%s) doesn't match"
396 " number of tasks (%s)" %
397 (len(task_id), len(tasks)))
398
399 self._lock.acquire()
400 try:
401 self._WaitWhileQuiescingUnlocked()
402
403 assert compat.all(isinstance(prio, (int, long)) for prio in priority)
404 assert len(tasks) == len(priority)
405 assert len(tasks) == len(task_id)
406
407 for (args, prio, tid) in zip(tasks, priority, task_id):
408 self._AddTaskUnlocked(args, prio, tid)
409 finally:
410 self._lock.release()
411
413 """Changes a task's priority.
414
415 @param task_id: Task ID
416 @type priority: number
417 @param priority: New task priority
418 @raise NoSuchTask: When the task referred by C{task_id} can not be found
419 (it may never have existed, may have already been processed, or is
420 currently running)
421
422 """
423 assert isinstance(priority, (int, long)), "Priority must be numeric"
424
425 self._lock.acquire()
426 try:
427 logging.debug("About to change priority of task %s to %s",
428 task_id, priority)
429
430
431 oldtask = self._taskdata.get(task_id, None)
432 if oldtask is None:
433 msg = "Task '%s' was not found" % task_id
434 logging.debug(msg)
435 raise NoSuchTask(msg)
436
437
438 newtask = [priority] + oldtask[1:]
439
440
441
442
443
444 oldtask[-1] = None
445
446
447 assert task_id is not None
448 self._taskdata[task_id] = newtask
449
450
451 heapq.heappush(self._tasks, newtask)
452
453
454 self._pool_to_worker.notify()
455 finally:
456 self._lock.release()
457
459 """Waits for a task for a worker.
460
461 @type worker: L{BaseWorker}
462 @param worker: Worker thread
463
464 """
465 while True:
466 if self._ShouldWorkerTerminateUnlocked(worker):
467 return _TERMINATE
468
469
470 if self._tasks:
471
472 try:
473 task = heapq.heappop(self._tasks)
474 finally:
475 self._worker_to_pool.notifyAll()
476
477 (_, _, task_id, args) = task
478
479
480 if args is None:
481
482 logging.debug("Found abandoned task (%r)", task)
483 continue
484
485
486 if task_id is not None:
487 del self._taskdata[task_id]
488
489 return task
490
491 logging.debug("Waiting for tasks")
492
493
494 self._pool_to_worker.wait()
495
496 logging.debug("Notified while waiting")
497
499 """Returns whether a worker should terminate.
500
501 """
502 return (worker in self._termworkers)
503
505 """Checks whether there's a task running in a worker.
506
507 """
508 for worker in self._workers + self._termworkers:
509 if worker._HasRunningTaskUnlocked():
510 return True
511 return False
512
522
524 """Waits until the task queue is empty.
525
526 """
527 self._lock.acquire()
528 try:
529 self._quiescing = True
530
531
532 while self._tasks or self._HasRunningTasksUnlocked():
533 self._worker_to_pool.wait()
534
535 finally:
536 self._quiescing = False
537
538
539 self._pool_to_pool.notifyAll()
540
541 self._lock.release()
542
544 """Return an identifier for a new worker.
545
546 """
547 self._last_worker_id += 1
548
549 return "%s%d" % (self._name, self._last_worker_id)
550
552 """Changes the number of workers.
553
554 """
555 assert num_workers >= 0, "num_workers must be >= 0"
556
557 logging.debug("Resizing to %s workers", num_workers)
558
559 current_count = len(self._workers)
560
561 if current_count == num_workers:
562
563 pass
564
565 elif current_count > num_workers:
566 if num_workers == 0:
567
568 termworkers = self._workers[:]
569 del self._workers[:]
570 else:
571
572 raise NotImplementedError()
573
574
575 self._termworkers += termworkers
576
577
578 self._pool_to_worker.notifyAll()
579
580
581 self._lock.release()
582 try:
583 for worker in termworkers:
584 logging.debug("Waiting for thread %s", worker.getName())
585 worker.join()
586 finally:
587 self._lock.acquire()
588
589
590
591
592 for worker in termworkers:
593 assert worker in self._termworkers, ("Worker not in list of"
594 " terminating workers")
595 if not worker.isAlive():
596 self._termworkers.remove(worker)
597
598 assert not self._termworkers, "Zombie worker detected"
599
600 elif current_count < num_workers:
601
602 for _ in range(num_workers - current_count):
603 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
604 self._workers.append(worker)
605 worker.start()
606
607 - def Resize(self, num_workers):
608 """Changes the number of workers in the pool.
609
610 @param num_workers: the new number of workers
611
612 """
613 self._lock.acquire()
614 try:
615 return self._ResizeUnlocked(num_workers)
616 finally:
617 self._lock.release()
618
620 """Terminate all worker threads.
621
622 Unstarted tasks will be ignored.
623
624 """
625 logging.debug("Terminating all workers")
626
627 self._lock.acquire()
628 try:
629 self._ResizeUnlocked(0)
630
631 if self._tasks:
632 logging.debug("There are %s tasks left", len(self._tasks))
633 finally:
634 self._lock.release()
635
636 logging.debug("All workers terminated")
637