Package ganeti :: Module workerpool
[hide private]
[frames] | no frames]

Source Code for Module ganeti.workerpool

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2008, 2009, 2010 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Base classes for worker pools. 
 23   
 24  """ 
 25   
 26  import collections 
 27  import logging 
 28  import threading 
 29   
 30  from ganeti import compat 
 31   
 32   
 33  _TERMINATE = object() 
 34   
 35   
36 -class BaseWorker(threading.Thread, object):
37 """Base worker class for worker pools. 38 39 Users of a worker pool must override RunTask in a subclass. 40 41 """ 42 # pylint: disable-msg=W0212
43 - def __init__(self, pool, worker_id):
44 """Constructor for BaseWorker thread. 45 46 @param pool: the parent worker pool 47 @param worker_id: identifier for this worker 48 49 """ 50 super(BaseWorker, self).__init__(name=worker_id) 51 self.pool = pool 52 self._worker_id = worker_id 53 self._current_task = None 54 55 assert self.getName() == worker_id
56
57 - def ShouldTerminate(self):
58 """Returns whether this worker should terminate. 59 60 Should only be called from within L{RunTask}. 61 62 """ 63 self.pool._lock.acquire() 64 try: 65 assert self._HasRunningTaskUnlocked() 66 return self.pool._ShouldWorkerTerminateUnlocked(self) 67 finally: 68 self.pool._lock.release()
69
70 - def SetTaskName(self, taskname):
71 """Sets the name of the current task. 72 73 Should only be called from within L{RunTask}. 74 75 @type taskname: string 76 @param taskname: Task's name 77 78 """ 79 if taskname: 80 name = "%s/%s" % (self._worker_id, taskname) 81 else: 82 name = self._worker_id 83 84 # Set thread name 85 self.setName(name)
86
87 - def _HasRunningTaskUnlocked(self):
88 """Returns whether this worker is currently running a task. 89 90 """ 91 return (self._current_task is not None)
92
93 - def run(self):
94 """Main thread function. 95 96 Waits for new tasks to show up in the queue. 97 98 """ 99 pool = self.pool 100 101 while True: 102 assert self._current_task is None 103 try: 104 # Wait on lock to be told either to terminate or to do a task 105 pool._lock.acquire() 106 try: 107 task = pool._WaitForTaskUnlocked(self) 108 109 if task is _TERMINATE: 110 # Told to terminate 111 break 112 113 if task is None: 114 # Spurious notification, ignore 115 continue 116 117 self._current_task = task 118 119 # No longer needed, dispose of reference 120 del task 121 122 assert self._HasRunningTaskUnlocked() 123 124 finally: 125 pool._lock.release() 126 127 # Run the actual task 128 try: 129 logging.debug("Starting task %r", self._current_task) 130 assert self.getName() == self._worker_id 131 try: 132 self.RunTask(*self._current_task) 133 finally: 134 self.SetTaskName(None) 135 logging.debug("Done with task %r", self._current_task) 136 except: # pylint: disable-msg=W0702 137 logging.exception("Caught unhandled exception") 138 139 assert self._HasRunningTaskUnlocked() 140 finally: 141 # Notify pool 142 pool._lock.acquire() 143 try: 144 if self._current_task: 145 self._current_task = None 146 pool._worker_to_pool.notifyAll() 147 finally: 148 pool._lock.release() 149 150 assert not self._HasRunningTaskUnlocked() 151 152 logging.debug("Terminates")
153
154 - def RunTask(self, *args):
155 """Function called to start a task. 156 157 This needs to be implemented by child classes. 158 159 """ 160 raise NotImplementedError()
161 162
163 -class WorkerPool(object):
164 """Worker pool with a queue. 165 166 This class is thread-safe. 167 168 Tasks are guaranteed to be started in the order in which they're 169 added to the pool. Due to the nature of threading, they're not 170 guaranteed to finish in the same order. 171 172 """
173 - def __init__(self, name, num_workers, worker_class):
174 """Constructor for worker pool. 175 176 @param num_workers: number of workers to be started 177 (dynamic resizing is not yet implemented) 178 @param worker_class: the class to be instantiated for workers; 179 should derive from L{BaseWorker} 180 181 """ 182 # Some of these variables are accessed by BaseWorker 183 self._lock = threading.Lock() 184 self._pool_to_pool = threading.Condition(self._lock) 185 self._pool_to_worker = threading.Condition(self._lock) 186 self._worker_to_pool = threading.Condition(self._lock) 187 self._worker_class = worker_class 188 self._name = name 189 self._last_worker_id = 0 190 self._workers = [] 191 self._quiescing = False 192 193 # Terminating workers 194 self._termworkers = [] 195 196 # Queued tasks 197 self._tasks = collections.deque() 198 199 # Start workers 200 self.Resize(num_workers)
201 202 # TODO: Implement dynamic resizing? 203
205 """Wait until the worker pool has finished quiescing. 206 207 """ 208 while self._quiescing: 209 self._pool_to_pool.wait()
210
211 - def _AddTaskUnlocked(self, args):
212 assert isinstance(args, (tuple, list)), "Arguments must be a sequence" 213 214 self._tasks.append(args) 215 216 # Notify a waiting worker 217 self._pool_to_worker.notify()
218
219 - def AddTask(self, args):
220 """Adds a task to the queue. 221 222 @type args: sequence 223 @param args: arguments passed to L{BaseWorker.RunTask} 224 225 """ 226 self._lock.acquire() 227 try: 228 self._WaitWhileQuiescingUnlocked() 229 self._AddTaskUnlocked(args) 230 finally: 231 self._lock.release()
232
233 - def AddManyTasks(self, tasks):
234 """Add a list of tasks to the queue. 235 236 @type tasks: list of tuples 237 @param tasks: list of args passed to L{BaseWorker.RunTask} 238 239 """ 240 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ 241 "Each task must be a sequence" 242 243 self._lock.acquire() 244 try: 245 self._WaitWhileQuiescingUnlocked() 246 247 for args in tasks: 248 self._AddTaskUnlocked(args) 249 finally: 250 self._lock.release()
251
252 - def _WaitForTaskUnlocked(self, worker):
253 """Waits for a task for a worker. 254 255 @type worker: L{BaseWorker} 256 @param worker: Worker thread 257 258 """ 259 if self._ShouldWorkerTerminateUnlocked(worker): 260 return _TERMINATE 261 262 # We only wait if there's no task for us. 263 if not self._tasks: 264 logging.debug("Waiting for tasks") 265 266 # wait() releases the lock and sleeps until notified 267 self._pool_to_worker.wait() 268 269 logging.debug("Notified while waiting") 270 271 # Were we woken up in order to terminate? 272 if self._ShouldWorkerTerminateUnlocked(worker): 273 return _TERMINATE 274 275 if not self._tasks: 276 # Spurious notification, ignore 277 return None 278 279 # Get task from queue and tell pool about it 280 try: 281 return self._tasks.popleft() 282 finally: 283 self._worker_to_pool.notifyAll()
284
285 - def _ShouldWorkerTerminateUnlocked(self, worker):
286 """Returns whether a worker should terminate. 287 288 """ 289 return (worker in self._termworkers)
290
291 - def _HasRunningTasksUnlocked(self):
292 """Checks whether there's a task running in a worker. 293 294 """ 295 for worker in self._workers + self._termworkers: 296 if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212 297 return True 298 return False
299
300 - def Quiesce(self):
301 """Waits until the task queue is empty. 302 303 """ 304 self._lock.acquire() 305 try: 306 self._quiescing = True 307 308 # Wait while there are tasks pending or running 309 while self._tasks or self._HasRunningTasksUnlocked(): 310 self._worker_to_pool.wait() 311 312 finally: 313 self._quiescing = False 314 315 # Make sure AddTasks continues in case it was waiting 316 self._pool_to_pool.notifyAll() 317 318 self._lock.release()
319
320 - def _NewWorkerIdUnlocked(self):
321 """Return an identifier for a new worker. 322 323 """ 324 self._last_worker_id += 1 325 326 return "%s%d" % (self._name, self._last_worker_id)
327
328 - def _ResizeUnlocked(self, num_workers):
329 """Changes the number of workers. 330 331 """ 332 assert num_workers >= 0, "num_workers must be >= 0" 333 334 logging.debug("Resizing to %s workers", num_workers) 335 336 current_count = len(self._workers) 337 338 if current_count == num_workers: 339 # Nothing to do 340 pass 341 342 elif current_count > num_workers: 343 if num_workers == 0: 344 # Create copy of list to iterate over while lock isn't held. 345 termworkers = self._workers[:] 346 del self._workers[:] 347 else: 348 # TODO: Implement partial downsizing 349 raise NotImplementedError() 350 #termworkers = ... 351 352 self._termworkers += termworkers 353 354 # Notify workers that something has changed 355 self._pool_to_worker.notifyAll() 356 357 # Join all terminating workers 358 self._lock.release() 359 try: 360 for worker in termworkers: 361 logging.debug("Waiting for thread %s", worker.getName()) 362 worker.join() 363 finally: 364 self._lock.acquire() 365 366 # Remove terminated threads. This could be done in a more efficient way 367 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we 368 # don't leave zombie threads around. 369 for worker in termworkers: 370 assert worker in self._termworkers, ("Worker not in list of" 371 " terminating workers") 372 if not worker.isAlive(): 373 self._termworkers.remove(worker) 374 375 assert not self._termworkers, "Zombie worker detected" 376 377 elif current_count < num_workers: 378 # Create (num_workers - current_count) new workers 379 for _ in range(num_workers - current_count): 380 worker = self._worker_class(self, self._NewWorkerIdUnlocked()) 381 self._workers.append(worker) 382 worker.start()
383
384 - def Resize(self, num_workers):
385 """Changes the number of workers in the pool. 386 387 @param num_workers: the new number of workers 388 389 """ 390 self._lock.acquire() 391 try: 392 return self._ResizeUnlocked(num_workers) 393 finally: 394 self._lock.release()
395
396 - def TerminateWorkers(self):
397 """Terminate all worker threads. 398 399 Unstarted tasks will be ignored. 400 401 """ 402 logging.debug("Terminating all workers") 403 404 self._lock.acquire() 405 try: 406 self._ResizeUnlocked(0) 407 408 if self._tasks: 409 logging.debug("There are %s tasks left", len(self._tasks)) 410 finally: 411 self._lock.release() 412 413 logging.debug("All workers terminated")
414