Package ganeti :: Module workerpool
[hide private]
[frames] | no frames]

Source Code for Module ganeti.workerpool

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2008 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Base classes for worker pools. 
 23   
 24  """ 
 25   
 26  import collections 
 27  import logging 
 28  import threading 
 29   
 30   
31 -class BaseWorker(threading.Thread, object):
32 """Base worker class for worker pools. 33 34 Users of a worker pool must override RunTask in a subclass. 35 36 """ 37 # pylint: disable-msg=W0212
38 - def __init__(self, pool, worker_id):
39 """Constructor for BaseWorker thread. 40 41 @param pool: the parent worker pool 42 @param worker_id: identifier for this worker 43 44 """ 45 super(BaseWorker, self).__init__(name=worker_id) 46 self.pool = pool 47 self._current_task = None
48
49 - def ShouldTerminate(self):
50 """Returns whether a worker should terminate. 51 52 """ 53 return self.pool.ShouldWorkerTerminate(self)
54
55 - def _HasRunningTaskUnlocked(self):
56 """Returns whether this worker is currently running a task. 57 58 """ 59 return (self._current_task is not None)
60
61 - def HasRunningTask(self):
62 """Returns whether this worker is currently running a task. 63 64 """ 65 self.pool._lock.acquire() 66 try: 67 return self._HasRunningTaskUnlocked() 68 finally: 69 self.pool._lock.release()
70
71 - def run(self):
72 """Main thread function. 73 74 Waits for new tasks to show up in the queue. 75 76 """ 77 pool = self.pool 78 79 assert not self.HasRunningTask() 80 81 while True: 82 try: 83 # We wait on lock to be told either terminate or do a task. 84 pool._lock.acquire() 85 try: 86 if pool._ShouldWorkerTerminateUnlocked(self): 87 break 88 89 # We only wait if there's no task for us. 90 if not pool._tasks: 91 logging.debug("Waiting for tasks") 92 93 # wait() releases the lock and sleeps until notified 94 pool._pool_to_worker.wait() 95 96 logging.debug("Notified while waiting") 97 98 # Were we woken up in order to terminate? 99 if pool._ShouldWorkerTerminateUnlocked(self): 100 break 101 102 if not pool._tasks: 103 # Spurious notification, ignore 104 continue 105 106 # Get task from queue and tell pool about it 107 try: 108 self._current_task = pool._tasks.popleft() 109 finally: 110 pool._worker_to_pool.notifyAll() 111 finally: 112 pool._lock.release() 113 114 # Run the actual task 115 try: 116 logging.debug("Starting task %r", self._current_task) 117 self.RunTask(*self._current_task) 118 logging.debug("Done with task %r", self._current_task) 119 except: # pylint: disable-msg=W0702 120 logging.exception("Caught unhandled exception") 121 finally: 122 # Notify pool 123 pool._lock.acquire() 124 try: 125 if self._current_task: 126 self._current_task = None 127 pool._worker_to_pool.notifyAll() 128 finally: 129 pool._lock.release() 130 131 logging.debug("Terminates")
132
133 - def RunTask(self, *args):
134 """Function called to start a task. 135 136 This needs to be implemented by child classes. 137 138 """ 139 raise NotImplementedError()
140 141
142 -class WorkerPool(object):
143 """Worker pool with a queue. 144 145 This class is thread-safe. 146 147 Tasks are guaranteed to be started in the order in which they're 148 added to the pool. Due to the nature of threading, they're not 149 guaranteed to finish in the same order. 150 151 """
152 - def __init__(self, name, num_workers, worker_class):
153 """Constructor for worker pool. 154 155 @param num_workers: number of workers to be started 156 (dynamic resizing is not yet implemented) 157 @param worker_class: the class to be instantiated for workers; 158 should derive from L{BaseWorker} 159 160 """ 161 # Some of these variables are accessed by BaseWorker 162 self._lock = threading.Lock() 163 self._pool_to_pool = threading.Condition(self._lock) 164 self._pool_to_worker = threading.Condition(self._lock) 165 self._worker_to_pool = threading.Condition(self._lock) 166 self._worker_class = worker_class 167 self._name = name 168 self._last_worker_id = 0 169 self._workers = [] 170 self._quiescing = False 171 172 # Terminating workers 173 self._termworkers = [] 174 175 # Queued tasks 176 self._tasks = collections.deque() 177 178 # Start workers 179 self.Resize(num_workers)
180 181 # TODO: Implement dynamic resizing? 182
183 - def AddTask(self, *args):
184 """Adds a task to the queue. 185 186 @param args: arguments passed to L{BaseWorker.RunTask} 187 188 """ 189 self._lock.acquire() 190 try: 191 # Don't add new tasks while we're quiescing 192 while self._quiescing: 193 self._pool_to_pool.wait() 194 195 # Add task to internal queue 196 self._tasks.append(args) 197 198 # Wake one idling worker up 199 self._pool_to_worker.notify() 200 finally: 201 self._lock.release()
202
203 - def _ShouldWorkerTerminateUnlocked(self, worker):
204 """Returns whether a worker should terminate. 205 206 """ 207 return (worker in self._termworkers)
208
209 - def ShouldWorkerTerminate(self, worker):
210 """Returns whether a worker should terminate. 211 212 """ 213 self._lock.acquire() 214 try: 215 return self._ShouldWorkerTerminateUnlocked(worker) 216 finally: 217 self._lock.release()
218
219 - def _HasRunningTasksUnlocked(self):
220 """Checks whether there's a task running in a worker. 221 222 """ 223 for worker in self._workers + self._termworkers: 224 if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212 225 return True 226 return False
227
228 - def Quiesce(self):
229 """Waits until the task queue is empty. 230 231 """ 232 self._lock.acquire() 233 try: 234 self._quiescing = True 235 236 # Wait while there are tasks pending or running 237 while self._tasks or self._HasRunningTasksUnlocked(): 238 self._worker_to_pool.wait() 239 240 finally: 241 self._quiescing = False 242 243 # Make sure AddTasks continues in case it was waiting 244 self._pool_to_pool.notifyAll() 245 246 self._lock.release()
247
248 - def _NewWorkerIdUnlocked(self):
249 """Return an identifier for a new worker. 250 251 """ 252 self._last_worker_id += 1 253 254 return "%s%d" % (self._name, self._last_worker_id)
255
256 - def _ResizeUnlocked(self, num_workers):
257 """Changes the number of workers. 258 259 """ 260 assert num_workers >= 0, "num_workers must be >= 0" 261 262 logging.debug("Resizing to %s workers", num_workers) 263 264 current_count = len(self._workers) 265 266 if current_count == num_workers: 267 # Nothing to do 268 pass 269 270 elif current_count > num_workers: 271 if num_workers == 0: 272 # Create copy of list to iterate over while lock isn't held. 273 termworkers = self._workers[:] 274 del self._workers[:] 275 else: 276 # TODO: Implement partial downsizing 277 raise NotImplementedError() 278 #termworkers = ... 279 280 self._termworkers += termworkers 281 282 # Notify workers that something has changed 283 self._pool_to_worker.notifyAll() 284 285 # Join all terminating workers 286 self._lock.release() 287 try: 288 for worker in termworkers: 289 logging.debug("Waiting for thread %s", worker.getName()) 290 worker.join() 291 finally: 292 self._lock.acquire() 293 294 # Remove terminated threads. This could be done in a more efficient way 295 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we 296 # don't leave zombie threads around. 297 for worker in termworkers: 298 assert worker in self._termworkers, ("Worker not in list of" 299 " terminating workers") 300 if not worker.isAlive(): 301 self._termworkers.remove(worker) 302 303 assert not self._termworkers, "Zombie worker detected" 304 305 elif current_count < num_workers: 306 # Create (num_workers - current_count) new workers 307 for _ in range(num_workers - current_count): 308 worker = self._worker_class(self, self._NewWorkerIdUnlocked()) 309 self._workers.append(worker) 310 worker.start()
311
312 - def Resize(self, num_workers):
313 """Changes the number of workers in the pool. 314 315 @param num_workers: the new number of workers 316 317 """ 318 self._lock.acquire() 319 try: 320 return self._ResizeUnlocked(num_workers) 321 finally: 322 self._lock.release()
323
324 - def TerminateWorkers(self):
325 """Terminate all worker threads. 326 327 Unstarted tasks will be ignored. 328 329 """ 330 logging.debug("Terminating all workers") 331 332 self._lock.acquire() 333 try: 334 self._ResizeUnlocked(0) 335 336 if self._tasks: 337 logging.debug("There are %s tasks left", len(self._tasks)) 338 finally: 339 self._lock.release() 340 341 logging.debug("All workers terminated")
342