Package ganeti :: Module workerpool :: Class WorkerPool
[hide private]
[frames] | no frames]

Class WorkerPool

source code


Worker pool with a queue.

This class is thread-safe.

Tasks are guaranteed to be started in the order in which they're added to the pool. Due to the nature of threading, they're not guaranteed to finish in the same order.

Instance Methods [hide private]
 
__init__(self, name, num_workers, worker_class)
Constructor for worker pool.
source code
 
_WaitWhileQuiescingUnlocked(self)
Wait until the worker pool has finished quiescing.
source code
 
_AddTaskUnlocked(self, args, priority, task_id)
Adds a task to the internal queue.
source code
 
AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None)
Adds a task to the queue.
source code
 
AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None)
Add a list of tasks to the queue.
source code
 
ChangeTaskPriority(self, task_id, priority)
Changes a task's priority.
source code
 
SetActive(self, active)
Enable/disable processing of tasks.
source code
 
_WaitForTaskUnlocked(self, worker)
Waits for a task for a worker.
source code
 
_ShouldWorkerTerminateUnlocked(self, worker)
Returns whether a worker should terminate.
source code
 
_HasRunningTasksUnlocked(self)
Checks whether there's a task running in a worker.
source code
 
HasRunningTasks(self)
Checks whether there's at least one task running.
source code
 
Quiesce(self)
Waits until the task queue is empty.
source code
 
_NewWorkerIdUnlocked(self)
Return an identifier for a new worker.
source code
 
_ResizeUnlocked(self, num_workers)
Changes the number of workers.
source code
 
Resize(self, num_workers)
Changes the number of workers in the pool.
source code
 
TerminateWorkers(self)
Terminate all worker threads.
source code

Inherited from object: __delattr__, __format__, __getattribute__, __hash__, __new__, __reduce__, __reduce_ex__, __repr__, __setattr__, __sizeof__, __str__, __subclasshook__

Instance Variables [hide private]
dict; (task IDs as keys, tuples as values) _taskdata
Mapping from task IDs to entries in _tasks
list of tuples _tasks
Each tuple has the format (priority, order ID, task ID, arguments).
Properties [hide private]

Inherited from object: __class__

Method Details [hide private]

__init__(self, name, num_workers, worker_class)
(Constructor)

source code 

Constructor for worker pool.

Parameters:
  • num_workers - number of workers to be started (dynamic resizing is not yet implemented)
  • worker_class - the class to be instantiated for workers; should derive from BaseWorker
Overrides: object.__init__

_AddTaskUnlocked(self, args, priority, task_id)

source code 

Adds a task to the internal queue.

Parameters:
  • args (sequence) - Arguments passed to BaseWorker.RunTask
  • priority (number) - Task priority
  • task_id - Task ID

AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None)

source code 

Adds a task to the queue.

Parameters:
  • args (sequence) - arguments passed to BaseWorker.RunTask
  • priority (number) - Task priority
  • task_id - Task ID

Note: The task ID can be essentially anything that can be used as a dictionary key. Callers, however, must ensure a task ID is unique while a task is in the pool or while it might return to the pool due to deferring using DeferTask.

AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None)

source code 

Add a list of tasks to the queue.

Parameters:
  • tasks (list of tuples) - list of args passed to BaseWorker.RunTask
  • priority (number or list of numbers) - Priority for all added tasks or a list with the priority for each task
  • task_id (list) - List with the ID for each task

Note: See AddTask for a note on task IDs.

ChangeTaskPriority(self, task_id, priority)

source code 

Changes a task's priority.

Parameters:
  • task_id - Task ID
  • priority (number) - New task priority
Raises:
  • NoSuchTask - When the task referred by task_id can not be found (it may never have existed, may have already been processed, or is currently running)

SetActive(self, active)

source code 

Enable/disable processing of tasks.

This is different from Quiesce in the sense that this function just changes an internal flag and doesn't wait for the queue to be empty. Tasks already being processed continue normally, but no new tasks will be started. New tasks can still be added.

Parameters:
  • active (bool) - Whether tasks should be processed

_WaitForTaskUnlocked(self, worker)

source code 

Waits for a task for a worker.

Parameters:

Resize(self, num_workers)

source code 

Changes the number of workers in the pool.

Parameters:
  • num_workers - the new number of workers

TerminateWorkers(self)

source code 

Terminate all worker threads.

Unstarted tasks will be ignored.


Instance Variable Details [hide private]

_tasks

Each tuple has the format (priority, order ID, task ID, arguments). Priority and order ID are numeric and essentially control the sort order. The order ID is an increasing number denoting the order in which tasks are added to the queue. The task ID is controlled by user of workerpool, see AddTask for details. The task arguments are None for abandoned tasks, otherwise a sequence of arguments to be passed to BaseWorker.RunTask). The list must fulfill the heap property (for use by the heapq module).
Type:
list of tuples