1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Base classes for worker pools.
23
24 """
25
26 import logging
27 import threading
28 import heapq
29 import itertools
30
31 from ganeti import compat
32 from ganeti import errors
33
34
35 _TERMINATE = object()
36 _DEFAULT_PRIORITY = 0
37
38
40 """Special exception class to defer a task.
41
42 This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
43 task. Optionally, the priority of the task can be changed.
44
45 """
47 """Initializes this class.
48
49 @type priority: number
50 @param priority: New task priority (None means no change)
51
52 """
53 Exception.__init__(self)
54 self.priority = priority
55
56
58 """Exception raised when a task can't be found.
59
60 """
61
62
64 """Base worker class for worker pools.
65
66 Users of a worker pool must override RunTask in a subclass.
67
68 """
69
71 """Constructor for BaseWorker thread.
72
73 @param pool: the parent worker pool
74 @param worker_id: identifier for this worker
75
76 """
77 super(BaseWorker, self).__init__(name=worker_id)
78 self.pool = pool
79 self._worker_id = worker_id
80 self._current_task = None
81
82 assert self.getName() == worker_id
83
96
98 """Returns the priority of the current task.
99
100 Should only be called from within L{RunTask}.
101
102 """
103 self.pool._lock.acquire()
104 try:
105 assert self._HasRunningTaskUnlocked()
106
107 (priority, _, _, _) = self._current_task
108
109 return priority
110 finally:
111 self.pool._lock.release()
112
114 """Sets the name of the current task.
115
116 Should only be called from within L{RunTask}.
117
118 @type taskname: string
119 @param taskname: Task's name
120
121 """
122 if taskname:
123 name = "%s/%s" % (self._worker_id, taskname)
124 else:
125 name = self._worker_id
126
127
128 self.setName(name)
129
131 """Returns whether this worker is currently running a task.
132
133 """
134 return (self._current_task is not None)
135
137 """Returns the order and task ID of the current task.
138
139 Should only be called from within L{RunTask}.
140
141 """
142 self.pool._lock.acquire()
143 try:
144 assert self._HasRunningTaskUnlocked()
145
146 (_, order_id, task_id, _) = self._current_task
147
148 return (order_id, task_id)
149 finally:
150 self.pool._lock.release()
151
153 """Main thread function.
154
155 Waits for new tasks to show up in the queue.
156
157 """
158 pool = self.pool
159
160 while True:
161 assert self._current_task is None
162
163 defer = None
164 try:
165
166 pool._lock.acquire()
167 try:
168 task = pool._WaitForTaskUnlocked(self)
169
170 if task is _TERMINATE:
171
172 break
173
174 if task is None:
175
176 continue
177
178 self._current_task = task
179
180
181 del task
182
183 assert self._HasRunningTaskUnlocked()
184
185 finally:
186 pool._lock.release()
187
188 (priority, _, _, args) = self._current_task
189 try:
190
191 assert defer is None
192 logging.debug("Starting task %r, priority %s", args, priority)
193 assert self.getName() == self._worker_id
194 try:
195 self.RunTask(*args)
196 finally:
197 self.SetTaskName(None)
198 logging.debug("Done with task %r, priority %s", args, priority)
199 except DeferTask, err:
200 defer = err
201
202 if defer.priority is None:
203
204 defer.priority = priority
205
206 logging.debug("Deferring task %r, new priority %s",
207 args, defer.priority)
208
209 assert self._HasRunningTaskUnlocked()
210 except:
211 logging.exception("Caught unhandled exception")
212
213 assert self._HasRunningTaskUnlocked()
214 finally:
215
216 pool._lock.acquire()
217 try:
218 if defer:
219 assert self._current_task
220
221 (_, _, task_id, args) = self._current_task
222 pool._AddTaskUnlocked(args, defer.priority, task_id)
223
224 if self._current_task:
225 self._current_task = None
226 pool._worker_to_pool.notifyAll()
227 finally:
228 pool._lock.release()
229
230 assert not self._HasRunningTaskUnlocked()
231
232 logging.debug("Terminates")
233
235 """Function called to start a task.
236
237 This needs to be implemented by child classes.
238
239 """
240 raise NotImplementedError()
241
242
244 """Worker pool with a queue.
245
246 This class is thread-safe.
247
248 Tasks are guaranteed to be started in the order in which they're
249 added to the pool. Due to the nature of threading, they're not
250 guaranteed to finish in the same order.
251
252 @type _tasks: list of tuples
253 @ivar _tasks: Each tuple has the format (priority, order ID, task ID,
254 arguments). Priority and order ID are numeric and essentially control the
255 sort order. The order ID is an increasing number denoting the order in
256 which tasks are added to the queue. The task ID is controlled by user of
257 workerpool, see L{AddTask} for details. The task arguments are C{None} for
258 abandoned tasks, otherwise a sequence of arguments to be passed to
259 L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by
260 the C{heapq} module).
261 @type _taskdata: dict; (task IDs as keys, tuples as values)
262 @ivar _taskdata: Mapping from task IDs to entries in L{_tasks}
263
264 """
265 - def __init__(self, name, num_workers, worker_class):
266 """Constructor for worker pool.
267
268 @param num_workers: number of workers to be started
269 (dynamic resizing is not yet implemented)
270 @param worker_class: the class to be instantiated for workers;
271 should derive from L{BaseWorker}
272
273 """
274
275 self._lock = threading.Lock()
276 self._pool_to_pool = threading.Condition(self._lock)
277 self._pool_to_worker = threading.Condition(self._lock)
278 self._worker_to_pool = threading.Condition(self._lock)
279 self._worker_class = worker_class
280 self._name = name
281 self._last_worker_id = 0
282 self._workers = []
283 self._quiescing = False
284 self._active = True
285
286
287 self._termworkers = []
288
289
290 self._counter = itertools.count()
291 self._tasks = []
292 self._taskdata = {}
293
294
295 self.Resize(num_workers)
296
297
298
300 """Wait until the worker pool has finished quiescing.
301
302 """
303 while self._quiescing:
304 self._pool_to_pool.wait()
305
307 """Adds a task to the internal queue.
308
309 @type args: sequence
310 @param args: Arguments passed to L{BaseWorker.RunTask}
311 @type priority: number
312 @param priority: Task priority
313 @param task_id: Task ID
314
315 """
316 assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
317 assert isinstance(priority, (int, long)), "Priority must be numeric"
318 assert task_id is None or isinstance(task_id, (int, long)), \
319 "Task ID must be numeric or None"
320
321 task = [priority, self._counter.next(), task_id, args]
322
323 if task_id is not None:
324 assert task_id not in self._taskdata
325
326 self._taskdata[task_id] = task
327
328
329
330 heapq.heappush(self._tasks, task)
331
332
333 self._pool_to_worker.notify()
334
336 """Adds a task to the queue.
337
338 @type args: sequence
339 @param args: arguments passed to L{BaseWorker.RunTask}
340 @type priority: number
341 @param priority: Task priority
342 @param task_id: Task ID
343 @note: The task ID can be essentially anything that can be used as a
344 dictionary key. Callers, however, must ensure a task ID is unique while a
345 task is in the pool or while it might return to the pool due to deferring
346 using L{DeferTask}.
347
348 """
349 self._lock.acquire()
350 try:
351 self._WaitWhileQuiescingUnlocked()
352 self._AddTaskUnlocked(args, priority, task_id)
353 finally:
354 self._lock.release()
355
357 """Add a list of tasks to the queue.
358
359 @type tasks: list of tuples
360 @param tasks: list of args passed to L{BaseWorker.RunTask}
361 @type priority: number or list of numbers
362 @param priority: Priority for all added tasks or a list with the priority
363 for each task
364 @type task_id: list
365 @param task_id: List with the ID for each task
366 @note: See L{AddTask} for a note on task IDs.
367
368 """
369 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
370 "Each task must be a sequence"
371 assert (isinstance(priority, (int, long)) or
372 compat.all(isinstance(prio, (int, long)) for prio in priority)), \
373 "Priority must be numeric or be a list of numeric values"
374 assert task_id is None or isinstance(task_id, (tuple, list)), \
375 "Task IDs must be in a sequence"
376
377 if isinstance(priority, (int, long)):
378 priority = [priority] * len(tasks)
379 elif len(priority) != len(tasks):
380 raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
381 " number of tasks (%s)" %
382 (len(priority), len(tasks)))
383
384 if task_id is None:
385 task_id = [None] * len(tasks)
386 elif len(task_id) != len(tasks):
387 raise errors.ProgrammerError("Number of task IDs (%s) doesn't match"
388 " number of tasks (%s)" %
389 (len(task_id), len(tasks)))
390
391 self._lock.acquire()
392 try:
393 self._WaitWhileQuiescingUnlocked()
394
395 assert compat.all(isinstance(prio, (int, long)) for prio in priority)
396 assert len(tasks) == len(priority)
397 assert len(tasks) == len(task_id)
398
399 for (args, prio, tid) in zip(tasks, priority, task_id):
400 self._AddTaskUnlocked(args, prio, tid)
401 finally:
402 self._lock.release()
403
405 """Changes a task's priority.
406
407 @param task_id: Task ID
408 @type priority: number
409 @param priority: New task priority
410 @raise NoSuchTask: When the task referred by C{task_id} can not be found
411 (it may never have existed, may have already been processed, or is
412 currently running)
413
414 """
415 assert isinstance(priority, (int, long)), "Priority must be numeric"
416
417 self._lock.acquire()
418 try:
419 logging.debug("About to change priority of task %s to %s",
420 task_id, priority)
421
422
423 oldtask = self._taskdata.get(task_id, None)
424 if oldtask is None:
425 msg = "Task '%s' was not found" % task_id
426 logging.debug(msg)
427 raise NoSuchTask(msg)
428
429
430 newtask = [priority] + oldtask[1:]
431
432
433
434
435
436 oldtask[-1] = None
437
438
439 assert task_id is not None
440 self._taskdata[task_id] = newtask
441
442
443 heapq.heappush(self._tasks, newtask)
444
445
446 self._pool_to_worker.notify()
447 finally:
448 self._lock.release()
449
451 """Enable/disable processing of tasks.
452
453 This is different from L{Quiesce} in the sense that this function just
454 changes an internal flag and doesn't wait for the queue to be empty. Tasks
455 already being processed continue normally, but no new tasks will be
456 started. New tasks can still be added.
457
458 @type active: bool
459 @param active: Whether tasks should be processed
460
461 """
462 self._lock.acquire()
463 try:
464 self._active = active
465
466 if active:
467
468 self._pool_to_worker.notifyAll()
469 finally:
470 self._lock.release()
471
473 """Waits for a task for a worker.
474
475 @type worker: L{BaseWorker}
476 @param worker: Worker thread
477
478 """
479 while True:
480 if self._ShouldWorkerTerminateUnlocked(worker):
481 return _TERMINATE
482
483
484 if self._active and self._tasks:
485
486 try:
487 task = heapq.heappop(self._tasks)
488 finally:
489 self._worker_to_pool.notifyAll()
490
491 (_, _, task_id, args) = task
492
493
494 if args is None:
495
496 logging.debug("Found abandoned task (%r)", task)
497 continue
498
499
500 if task_id is not None:
501 del self._taskdata[task_id]
502
503 return task
504
505 logging.debug("Waiting for tasks")
506
507
508 self._pool_to_worker.wait()
509
510 logging.debug("Notified while waiting")
511
513 """Returns whether a worker should terminate.
514
515 """
516 return (worker in self._termworkers)
517
519 """Checks whether there's a task running in a worker.
520
521 """
522 for worker in self._workers + self._termworkers:
523 if worker._HasRunningTaskUnlocked():
524 return True
525 return False
526
536
538 """Waits until the task queue is empty.
539
540 """
541 self._lock.acquire()
542 try:
543 self._quiescing = True
544
545
546 while self._tasks or self._HasRunningTasksUnlocked():
547 self._worker_to_pool.wait()
548
549 finally:
550 self._quiescing = False
551
552
553 self._pool_to_pool.notifyAll()
554
555 self._lock.release()
556
558 """Return an identifier for a new worker.
559
560 """
561 self._last_worker_id += 1
562
563 return "%s%d" % (self._name, self._last_worker_id)
564
566 """Changes the number of workers.
567
568 """
569 assert num_workers >= 0, "num_workers must be >= 0"
570
571 logging.debug("Resizing to %s workers", num_workers)
572
573 current_count = len(self._workers)
574
575 if current_count == num_workers:
576
577 pass
578
579 elif current_count > num_workers:
580 if num_workers == 0:
581
582 termworkers = self._workers[:]
583 del self._workers[:]
584 else:
585
586 raise NotImplementedError()
587
588
589 self._termworkers += termworkers
590
591
592 self._pool_to_worker.notifyAll()
593
594
595 self._lock.release()
596 try:
597 for worker in termworkers:
598 logging.debug("Waiting for thread %s", worker.getName())
599 worker.join()
600 finally:
601 self._lock.acquire()
602
603
604
605
606 for worker in termworkers:
607 assert worker in self._termworkers, ("Worker not in list of"
608 " terminating workers")
609 if not worker.isAlive():
610 self._termworkers.remove(worker)
611
612 assert not self._termworkers, "Zombie worker detected"
613
614 elif current_count < num_workers:
615
616 for _ in range(num_workers - current_count):
617 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
618 self._workers.append(worker)
619 worker.start()
620
621 - def Resize(self, num_workers):
622 """Changes the number of workers in the pool.
623
624 @param num_workers: the new number of workers
625
626 """
627 self._lock.acquire()
628 try:
629 return self._ResizeUnlocked(num_workers)
630 finally:
631 self._lock.release()
632
634 """Terminate all worker threads.
635
636 Unstarted tasks will be ignored.
637
638 """
639 logging.debug("Terminating all workers")
640
641 self._lock.acquire()
642 try:
643 self._ResizeUnlocked(0)
644
645 if self._tasks:
646 logging.debug("There are %s tasks left", len(self._tasks))
647 finally:
648 self._lock.release()
649
650 logging.debug("All workers terminated")
651