1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Base classes for worker pools.
32
33 """
34
35 import logging
36 import threading
37 import heapq
38 import itertools
39
40 from ganeti import compat
41 from ganeti import errors
42
43
44 _TERMINATE = object()
45 _DEFAULT_PRIORITY = 0
46
47
49 """Special exception class to defer a task.
50
51 This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
52 task. Optionally, the priority of the task can be changed.
53
54 """
56 """Initializes this class.
57
58 @type priority: number
59 @param priority: New task priority (None means no change)
60
61 """
62 Exception.__init__(self)
63 self.priority = priority
64
65
67 """Exception raised when a task can't be found.
68
69 """
70
71
73 """Base worker class for worker pools.
74
75 Users of a worker pool must override RunTask in a subclass.
76
77 """
78
80 """Constructor for BaseWorker thread.
81
82 @param pool: the parent worker pool
83 @param worker_id: identifier for this worker
84
85 """
86 super(BaseWorker, self).__init__(name=worker_id)
87 self.pool = pool
88 self._worker_id = worker_id
89 self._current_task = None
90
91 assert self.getName() == worker_id
92
105
107 """Returns the priority of the current task.
108
109 Should only be called from within L{RunTask}.
110
111 """
112 self.pool._lock.acquire()
113 try:
114 assert self._HasRunningTaskUnlocked()
115
116 (priority, _, _, _) = self._current_task
117
118 return priority
119 finally:
120 self.pool._lock.release()
121
123 """Sets the name of the current task.
124
125 Should only be called from within L{RunTask}.
126
127 @type taskname: string
128 @param taskname: Task's name
129
130 """
131 if taskname:
132 name = "%s/%s" % (self._worker_id, taskname)
133 else:
134 name = self._worker_id
135
136
137 self.setName(name)
138
140 """Returns whether this worker is currently running a task.
141
142 """
143 return (self._current_task is not None)
144
146 """Returns the order and task ID of the current task.
147
148 Should only be called from within L{RunTask}.
149
150 """
151 self.pool._lock.acquire()
152 try:
153 assert self._HasRunningTaskUnlocked()
154
155 (_, order_id, task_id, _) = self._current_task
156
157 return (order_id, task_id)
158 finally:
159 self.pool._lock.release()
160
162 """Main thread function.
163
164 Waits for new tasks to show up in the queue.
165
166 """
167 pool = self.pool
168
169 while True:
170 assert self._current_task is None
171
172 defer = None
173 try:
174
175 pool._lock.acquire()
176 try:
177 task = pool._WaitForTaskUnlocked(self)
178
179 if task is _TERMINATE:
180
181 break
182
183 if task is None:
184
185 continue
186
187 self._current_task = task
188
189
190 del task
191
192 assert self._HasRunningTaskUnlocked()
193
194 finally:
195 pool._lock.release()
196
197 (priority, _, _, args) = self._current_task
198 try:
199
200 assert defer is None
201 logging.debug("Starting task %r, priority %s", args, priority)
202 assert self.getName() == self._worker_id
203 try:
204 self.RunTask(*args)
205 finally:
206 self.SetTaskName(None)
207 logging.debug("Done with task %r, priority %s", args, priority)
208 except DeferTask, err:
209 defer = err
210
211 if defer.priority is None:
212
213 defer.priority = priority
214
215 logging.debug("Deferring task %r, new priority %s",
216 args, defer.priority)
217
218 assert self._HasRunningTaskUnlocked()
219 except:
220 logging.exception("Caught unhandled exception")
221
222 assert self._HasRunningTaskUnlocked()
223 finally:
224
225 pool._lock.acquire()
226 try:
227 if defer:
228 assert self._current_task
229
230 (_, _, task_id, args) = self._current_task
231 pool._AddTaskUnlocked(args, defer.priority, task_id)
232
233 if self._current_task:
234 self._current_task = None
235 pool._worker_to_pool.notifyAll()
236 finally:
237 pool._lock.release()
238
239 assert not self._HasRunningTaskUnlocked()
240
241 logging.debug("Terminates")
242
244 """Function called to start a task.
245
246 This needs to be implemented by child classes.
247
248 """
249 raise NotImplementedError()
250
251
253 """Worker pool with a queue.
254
255 This class is thread-safe.
256
257 Tasks are guaranteed to be started in the order in which they're
258 added to the pool. Due to the nature of threading, they're not
259 guaranteed to finish in the same order.
260
261 @type _tasks: list of tuples
262 @ivar _tasks: Each tuple has the format (priority, order ID, task ID,
263 arguments). Priority and order ID are numeric and essentially control the
264 sort order. The order ID is an increasing number denoting the order in
265 which tasks are added to the queue. The task ID is controlled by user of
266 workerpool, see L{AddTask} for details. The task arguments are C{None} for
267 abandoned tasks, otherwise a sequence of arguments to be passed to
268 L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by
269 the C{heapq} module).
270 @type _taskdata: dict; (task IDs as keys, tuples as values)
271 @ivar _taskdata: Mapping from task IDs to entries in L{_tasks}
272
273 """
274 - def __init__(self, name, num_workers, worker_class):
275 """Constructor for worker pool.
276
277 @param num_workers: number of workers to be started
278 (dynamic resizing is not yet implemented)
279 @param worker_class: the class to be instantiated for workers;
280 should derive from L{BaseWorker}
281
282 """
283
284 self._lock = threading.Lock()
285 self._pool_to_pool = threading.Condition(self._lock)
286 self._pool_to_worker = threading.Condition(self._lock)
287 self._worker_to_pool = threading.Condition(self._lock)
288 self._worker_class = worker_class
289 self._name = name
290 self._last_worker_id = 0
291 self._workers = []
292 self._quiescing = False
293 self._active = True
294
295
296 self._termworkers = []
297
298
299 self._counter = itertools.count()
300 self._tasks = []
301 self._taskdata = {}
302
303
304 self.Resize(num_workers)
305
306
307
309 """Wait until the worker pool has finished quiescing.
310
311 """
312 while self._quiescing:
313 self._pool_to_pool.wait()
314
316 """Adds a task to the internal queue.
317
318 @type args: sequence
319 @param args: Arguments passed to L{BaseWorker.RunTask}
320 @type priority: number
321 @param priority: Task priority
322 @param task_id: Task ID
323
324 """
325 assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
326 assert isinstance(priority, (int, long)), "Priority must be numeric"
327 assert task_id is None or isinstance(task_id, (int, long)), \
328 "Task ID must be numeric or None"
329
330 task = [priority, self._counter.next(), task_id, args]
331
332 if task_id is not None:
333 assert task_id not in self._taskdata
334
335 self._taskdata[task_id] = task
336
337
338
339 heapq.heappush(self._tasks, task)
340
341
342 self._pool_to_worker.notify()
343
345 """Adds a task to the queue.
346
347 @type args: sequence
348 @param args: arguments passed to L{BaseWorker.RunTask}
349 @type priority: number
350 @param priority: Task priority
351 @param task_id: Task ID
352 @note: The task ID can be essentially anything that can be used as a
353 dictionary key. Callers, however, must ensure a task ID is unique while a
354 task is in the pool or while it might return to the pool due to deferring
355 using L{DeferTask}.
356
357 """
358 self._lock.acquire()
359 try:
360 self._WaitWhileQuiescingUnlocked()
361 self._AddTaskUnlocked(args, priority, task_id)
362 finally:
363 self._lock.release()
364
366 """Add a list of tasks to the queue.
367
368 @type tasks: list of tuples
369 @param tasks: list of args passed to L{BaseWorker.RunTask}
370 @type priority: number or list of numbers
371 @param priority: Priority for all added tasks or a list with the priority
372 for each task
373 @type task_id: list
374 @param task_id: List with the ID for each task
375 @note: See L{AddTask} for a note on task IDs.
376
377 """
378 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
379 "Each task must be a sequence"
380 assert (isinstance(priority, (int, long)) or
381 compat.all(isinstance(prio, (int, long)) for prio in priority)), \
382 "Priority must be numeric or be a list of numeric values"
383 assert task_id is None or isinstance(task_id, (tuple, list)), \
384 "Task IDs must be in a sequence"
385
386 if isinstance(priority, (int, long)):
387 priority = [priority] * len(tasks)
388 elif len(priority) != len(tasks):
389 raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
390 " number of tasks (%s)" %
391 (len(priority), len(tasks)))
392
393 if task_id is None:
394 task_id = [None] * len(tasks)
395 elif len(task_id) != len(tasks):
396 raise errors.ProgrammerError("Number of task IDs (%s) doesn't match"
397 " number of tasks (%s)" %
398 (len(task_id), len(tasks)))
399
400 self._lock.acquire()
401 try:
402 self._WaitWhileQuiescingUnlocked()
403
404 assert compat.all(isinstance(prio, (int, long)) for prio in priority)
405 assert len(tasks) == len(priority)
406 assert len(tasks) == len(task_id)
407
408 for (args, prio, tid) in zip(tasks, priority, task_id):
409 self._AddTaskUnlocked(args, prio, tid)
410 finally:
411 self._lock.release()
412
414 """Changes a task's priority.
415
416 @param task_id: Task ID
417 @type priority: number
418 @param priority: New task priority
419 @raise NoSuchTask: When the task referred by C{task_id} can not be found
420 (it may never have existed, may have already been processed, or is
421 currently running)
422
423 """
424 assert isinstance(priority, (int, long)), "Priority must be numeric"
425
426 self._lock.acquire()
427 try:
428 logging.debug("About to change priority of task %s to %s",
429 task_id, priority)
430
431
432 oldtask = self._taskdata.get(task_id, None)
433 if oldtask is None:
434 msg = "Task '%s' was not found" % task_id
435 logging.debug(msg)
436 raise NoSuchTask(msg)
437
438
439 newtask = [priority] + oldtask[1:]
440
441
442
443
444
445 oldtask[-1] = None
446
447
448 assert task_id is not None
449 self._taskdata[task_id] = newtask
450
451
452 heapq.heappush(self._tasks, newtask)
453
454
455 self._pool_to_worker.notify()
456 finally:
457 self._lock.release()
458
460 """Enable/disable processing of tasks.
461
462 This is different from L{Quiesce} in the sense that this function just
463 changes an internal flag and doesn't wait for the queue to be empty. Tasks
464 already being processed continue normally, but no new tasks will be
465 started. New tasks can still be added.
466
467 @type active: bool
468 @param active: Whether tasks should be processed
469
470 """
471 self._lock.acquire()
472 try:
473 self._active = active
474
475 if active:
476
477 self._pool_to_worker.notifyAll()
478 finally:
479 self._lock.release()
480
482 """Waits for a task for a worker.
483
484 @type worker: L{BaseWorker}
485 @param worker: Worker thread
486
487 """
488 while True:
489 if self._ShouldWorkerTerminateUnlocked(worker):
490 return _TERMINATE
491
492
493 if self._active and self._tasks:
494
495 try:
496 task = heapq.heappop(self._tasks)
497 finally:
498 self._worker_to_pool.notifyAll()
499
500 (_, _, task_id, args) = task
501
502
503 if args is None:
504
505 logging.debug("Found abandoned task (%r)", task)
506 continue
507
508
509 if task_id is not None:
510 del self._taskdata[task_id]
511
512 return task
513
514 logging.debug("Waiting for tasks")
515
516
517 self._pool_to_worker.wait()
518
519 logging.debug("Notified while waiting")
520
522 """Returns whether a worker should terminate.
523
524 """
525 return (worker in self._termworkers)
526
528 """Checks whether there's a task running in a worker.
529
530 """
531 for worker in self._workers + self._termworkers:
532 if worker._HasRunningTaskUnlocked():
533 return True
534 return False
535
545
547 """Waits until the task queue is empty.
548
549 """
550 self._lock.acquire()
551 try:
552 self._quiescing = True
553
554
555 while self._tasks or self._HasRunningTasksUnlocked():
556 self._worker_to_pool.wait()
557
558 finally:
559 self._quiescing = False
560
561
562 self._pool_to_pool.notifyAll()
563
564 self._lock.release()
565
567 """Return an identifier for a new worker.
568
569 """
570 self._last_worker_id += 1
571
572 return "%s%d" % (self._name, self._last_worker_id)
573
575 """Changes the number of workers.
576
577 """
578 assert num_workers >= 0, "num_workers must be >= 0"
579
580 logging.debug("Resizing to %s workers", num_workers)
581
582 current_count = len(self._workers)
583
584 if current_count == num_workers:
585
586 pass
587
588 elif current_count > num_workers:
589 if num_workers == 0:
590
591 termworkers = self._workers[:]
592 del self._workers[:]
593 else:
594
595 raise NotImplementedError()
596
597
598 self._termworkers += termworkers
599
600
601 self._pool_to_worker.notifyAll()
602
603
604 self._lock.release()
605 try:
606 for worker in termworkers:
607 logging.debug("Waiting for thread %s", worker.getName())
608 worker.join()
609 finally:
610 self._lock.acquire()
611
612
613
614
615 for worker in termworkers:
616 assert worker in self._termworkers, ("Worker not in list of"
617 " terminating workers")
618 if not worker.isAlive():
619 self._termworkers.remove(worker)
620
621 assert not self._termworkers, "Zombie worker detected"
622
623 elif current_count < num_workers:
624
625 for _ in range(num_workers - current_count):
626 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
627 self._workers.append(worker)
628 worker.start()
629
630 - def Resize(self, num_workers):
631 """Changes the number of workers in the pool.
632
633 @param num_workers: the new number of workers
634
635 """
636 self._lock.acquire()
637 try:
638 return self._ResizeUnlocked(num_workers)
639 finally:
640 self._lock.release()
641
643 """Terminate all worker threads.
644
645 Unstarted tasks will be ignored.
646
647 """
648 logging.debug("Terminating all workers")
649
650 self._lock.acquire()
651 try:
652 self._ResizeUnlocked(0)
653
654 if self._tasks:
655 logging.debug("There are %s tasks left", len(self._tasks))
656 finally:
657 self._lock.release()
658
659 logging.debug("All workers terminated")
660