Package ganeti :: Module workerpool
[hide private]
[frames] | no frames]

Source Code for Module ganeti.workerpool

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2008, 2009, 2010 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Base classes for worker pools. 
 23   
 24  """ 
 25   
 26  import logging 
 27  import threading 
 28  import heapq 
 29  import itertools 
 30   
 31  from ganeti import compat 
 32  from ganeti import errors 
 33   
 34   
 35  _TERMINATE = object() 
 36  _DEFAULT_PRIORITY = 0 
 37   
 38   
39 -class DeferTask(Exception):
40 """Special exception class to defer a task. 41 42 This class can be raised by L{BaseWorker.RunTask} to defer the execution of a 43 task. Optionally, the priority of the task can be changed. 44 45 """
46 - def __init__(self, priority=None):
47 """Initializes this class. 48 49 @type priority: number 50 @param priority: New task priority (None means no change) 51 52 """ 53 Exception.__init__(self) 54 self.priority = priority
55 56
57 -class NoSuchTask(Exception):
58 """Exception raised when a task can't be found. 59 60 """
61 62
63 -class BaseWorker(threading.Thread, object):
64 """Base worker class for worker pools. 65 66 Users of a worker pool must override RunTask in a subclass. 67 68 """ 69 # pylint: disable=W0212
70 - def __init__(self, pool, worker_id):
71 """Constructor for BaseWorker thread. 72 73 @param pool: the parent worker pool 74 @param worker_id: identifier for this worker 75 76 """ 77 super(BaseWorker, self).__init__(name=worker_id) 78 self.pool = pool 79 self._worker_id = worker_id 80 self._current_task = None 81 82 assert self.getName() == worker_id
83
84 - def ShouldTerminate(self):
85 """Returns whether this worker should terminate. 86 87 Should only be called from within L{RunTask}. 88 89 """ 90 self.pool._lock.acquire() 91 try: 92 assert self._HasRunningTaskUnlocked() 93 return self.pool._ShouldWorkerTerminateUnlocked(self) 94 finally: 95 self.pool._lock.release()
96
97 - def GetCurrentPriority(self):
98 """Returns the priority of the current task. 99 100 Should only be called from within L{RunTask}. 101 102 """ 103 self.pool._lock.acquire() 104 try: 105 assert self._HasRunningTaskUnlocked() 106 107 (priority, _, _, _) = self._current_task 108 109 return priority 110 finally: 111 self.pool._lock.release()
112
113 - def SetTaskName(self, taskname):
114 """Sets the name of the current task. 115 116 Should only be called from within L{RunTask}. 117 118 @type taskname: string 119 @param taskname: Task's name 120 121 """ 122 if taskname: 123 name = "%s/%s" % (self._worker_id, taskname) 124 else: 125 name = self._worker_id 126 127 # Set thread name 128 self.setName(name)
129
130 - def _HasRunningTaskUnlocked(self):
131 """Returns whether this worker is currently running a task. 132 133 """ 134 return (self._current_task is not None)
135
137 """Returns the order and task ID of the current task. 138 139 Should only be called from within L{RunTask}. 140 141 """ 142 self.pool._lock.acquire() 143 try: 144 assert self._HasRunningTaskUnlocked() 145 146 (_, order_id, task_id, _) = self._current_task 147 148 return (order_id, task_id) 149 finally: 150 self.pool._lock.release()
151
152 - def run(self):
153 """Main thread function. 154 155 Waits for new tasks to show up in the queue. 156 157 """ 158 pool = self.pool 159 160 while True: 161 assert self._current_task is None 162 163 defer = None 164 try: 165 # Wait on lock to be told either to terminate or to do a task 166 pool._lock.acquire() 167 try: 168 task = pool._WaitForTaskUnlocked(self) 169 170 if task is _TERMINATE: 171 # Told to terminate 172 break 173 174 if task is None: 175 # Spurious notification, ignore 176 continue 177 178 self._current_task = task 179 180 # No longer needed, dispose of reference 181 del task 182 183 assert self._HasRunningTaskUnlocked() 184 185 finally: 186 pool._lock.release() 187 188 (priority, _, _, args) = self._current_task 189 try: 190 # Run the actual task 191 assert defer is None 192 logging.debug("Starting task %r, priority %s", args, priority) 193 assert self.getName() == self._worker_id 194 try: 195 self.RunTask(*args) # pylint: disable=W0142 196 finally: 197 self.SetTaskName(None) 198 logging.debug("Done with task %r, priority %s", args, priority) 199 except DeferTask, err: 200 defer = err 201 202 if defer.priority is None: 203 # Use same priority 204 defer.priority = priority 205 206 logging.debug("Deferring task %r, new priority %s", 207 args, defer.priority) 208 209 assert self._HasRunningTaskUnlocked() 210 except: # pylint: disable=W0702 211 logging.exception("Caught unhandled exception") 212 213 assert self._HasRunningTaskUnlocked() 214 finally: 215 # Notify pool 216 pool._lock.acquire() 217 try: 218 if defer: 219 assert self._current_task 220 # Schedule again for later run 221 (_, _, task_id, args) = self._current_task 222 pool._AddTaskUnlocked(args, defer.priority, task_id) 223 224 if self._current_task: 225 self._current_task = None 226 pool._worker_to_pool.notifyAll() 227 finally: 228 pool._lock.release() 229 230 assert not self._HasRunningTaskUnlocked() 231 232 logging.debug("Terminates")
233
234 - def RunTask(self, *args):
235 """Function called to start a task. 236 237 This needs to be implemented by child classes. 238 239 """ 240 raise NotImplementedError()
241 242
243 -class WorkerPool(object):
244 """Worker pool with a queue. 245 246 This class is thread-safe. 247 248 Tasks are guaranteed to be started in the order in which they're 249 added to the pool. Due to the nature of threading, they're not 250 guaranteed to finish in the same order. 251 252 @type _tasks: list of tuples 253 @ivar _tasks: Each tuple has the format (priority, order ID, task ID, 254 arguments). Priority and order ID are numeric and essentially control the 255 sort order. The order ID is an increasing number denoting the order in 256 which tasks are added to the queue. The task ID is controlled by user of 257 workerpool, see L{AddTask} for details. The task arguments are C{None} for 258 abandoned tasks, otherwise a sequence of arguments to be passed to 259 L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by 260 the C{heapq} module). 261 @type _taskdata: dict; (task IDs as keys, tuples as values) 262 @ivar _taskdata: Mapping from task IDs to entries in L{_tasks} 263 264 """
265 - def __init__(self, name, num_workers, worker_class):
266 """Constructor for worker pool. 267 268 @param num_workers: number of workers to be started 269 (dynamic resizing is not yet implemented) 270 @param worker_class: the class to be instantiated for workers; 271 should derive from L{BaseWorker} 272 273 """ 274 # Some of these variables are accessed by BaseWorker 275 self._lock = threading.Lock() 276 self._pool_to_pool = threading.Condition(self._lock) 277 self._pool_to_worker = threading.Condition(self._lock) 278 self._worker_to_pool = threading.Condition(self._lock) 279 self._worker_class = worker_class 280 self._name = name 281 self._last_worker_id = 0 282 self._workers = [] 283 self._quiescing = False 284 self._active = True 285 286 # Terminating workers 287 self._termworkers = [] 288 289 # Queued tasks 290 self._counter = itertools.count() 291 self._tasks = [] 292 self._taskdata = {} 293 294 # Start workers 295 self.Resize(num_workers)
296 297 # TODO: Implement dynamic resizing? 298
300 """Wait until the worker pool has finished quiescing. 301 302 """ 303 while self._quiescing: 304 self._pool_to_pool.wait()
305
306 - def _AddTaskUnlocked(self, args, priority, task_id):
307 """Adds a task to the internal queue. 308 309 @type args: sequence 310 @param args: Arguments passed to L{BaseWorker.RunTask} 311 @type priority: number 312 @param priority: Task priority 313 @param task_id: Task ID 314 315 """ 316 assert isinstance(args, (tuple, list)), "Arguments must be a sequence" 317 assert isinstance(priority, (int, long)), "Priority must be numeric" 318 assert task_id is None or isinstance(task_id, (int, long)), \ 319 "Task ID must be numeric or None" 320 321 task = [priority, self._counter.next(), task_id, args] 322 323 if task_id is not None: 324 assert task_id not in self._taskdata 325 # Keep a reference to change priority later if necessary 326 self._taskdata[task_id] = task 327 328 # A counter is used to ensure elements are processed in their incoming 329 # order. For processing they're sorted by priority and then counter. 330 heapq.heappush(self._tasks, task) 331 332 # Notify a waiting worker 333 self._pool_to_worker.notify()
334
335 - def AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None):
336 """Adds a task to the queue. 337 338 @type args: sequence 339 @param args: arguments passed to L{BaseWorker.RunTask} 340 @type priority: number 341 @param priority: Task priority 342 @param task_id: Task ID 343 @note: The task ID can be essentially anything that can be used as a 344 dictionary key. Callers, however, must ensure a task ID is unique while a 345 task is in the pool or while it might return to the pool due to deferring 346 using L{DeferTask}. 347 348 """ 349 self._lock.acquire() 350 try: 351 self._WaitWhileQuiescingUnlocked() 352 self._AddTaskUnlocked(args, priority, task_id) 353 finally: 354 self._lock.release()
355
356 - def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None):
357 """Add a list of tasks to the queue. 358 359 @type tasks: list of tuples 360 @param tasks: list of args passed to L{BaseWorker.RunTask} 361 @type priority: number or list of numbers 362 @param priority: Priority for all added tasks or a list with the priority 363 for each task 364 @type task_id: list 365 @param task_id: List with the ID for each task 366 @note: See L{AddTask} for a note on task IDs. 367 368 """ 369 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ 370 "Each task must be a sequence" 371 assert (isinstance(priority, (int, long)) or 372 compat.all(isinstance(prio, (int, long)) for prio in priority)), \ 373 "Priority must be numeric or be a list of numeric values" 374 assert task_id is None or isinstance(task_id, (tuple, list)), \ 375 "Task IDs must be in a sequence" 376 377 if isinstance(priority, (int, long)): 378 priority = [priority] * len(tasks) 379 elif len(priority) != len(tasks): 380 raise errors.ProgrammerError("Number of priorities (%s) doesn't match" 381 " number of tasks (%s)" % 382 (len(priority), len(tasks))) 383 384 if task_id is None: 385 task_id = [None] * len(tasks) 386 elif len(task_id) != len(tasks): 387 raise errors.ProgrammerError("Number of task IDs (%s) doesn't match" 388 " number of tasks (%s)" % 389 (len(task_id), len(tasks))) 390 391 self._lock.acquire() 392 try: 393 self._WaitWhileQuiescingUnlocked() 394 395 assert compat.all(isinstance(prio, (int, long)) for prio in priority) 396 assert len(tasks) == len(priority) 397 assert len(tasks) == len(task_id) 398 399 for (args, prio, tid) in zip(tasks, priority, task_id): 400 self._AddTaskUnlocked(args, prio, tid) 401 finally: 402 self._lock.release()
403
404 - def ChangeTaskPriority(self, task_id, priority):
405 """Changes a task's priority. 406 407 @param task_id: Task ID 408 @type priority: number 409 @param priority: New task priority 410 @raise NoSuchTask: When the task referred by C{task_id} can not be found 411 (it may never have existed, may have already been processed, or is 412 currently running) 413 414 """ 415 assert isinstance(priority, (int, long)), "Priority must be numeric" 416 417 self._lock.acquire() 418 try: 419 logging.debug("About to change priority of task %s to %s", 420 task_id, priority) 421 422 # Find old task 423 oldtask = self._taskdata.get(task_id, None) 424 if oldtask is None: 425 msg = "Task '%s' was not found" % task_id 426 logging.debug(msg) 427 raise NoSuchTask(msg) 428 429 # Prepare new task 430 newtask = [priority] + oldtask[1:] 431 432 # Mark old entry as abandoned (this doesn't change the sort order and 433 # therefore doesn't invalidate the heap property of L{self._tasks}). 434 # See also <http://docs.python.org/library/heapq.html#priority-queue- 435 # implementation-notes>. 436 oldtask[-1] = None 437 438 # Change reference to new task entry and forget the old one 439 assert task_id is not None 440 self._taskdata[task_id] = newtask 441 442 # Add a new task with the old number and arguments 443 heapq.heappush(self._tasks, newtask) 444 445 # Notify a waiting worker 446 self._pool_to_worker.notify() 447 finally: 448 self._lock.release()
449
450 - def SetActive(self, active):
451 """Enable/disable processing of tasks. 452 453 This is different from L{Quiesce} in the sense that this function just 454 changes an internal flag and doesn't wait for the queue to be empty. Tasks 455 already being processed continue normally, but no new tasks will be 456 started. New tasks can still be added. 457 458 @type active: bool 459 @param active: Whether tasks should be processed 460 461 """ 462 self._lock.acquire() 463 try: 464 self._active = active 465 466 if active: 467 # Tell all workers to continue processing 468 self._pool_to_worker.notifyAll() 469 finally: 470 self._lock.release()
471
472 - def _WaitForTaskUnlocked(self, worker):
473 """Waits for a task for a worker. 474 475 @type worker: L{BaseWorker} 476 @param worker: Worker thread 477 478 """ 479 while True: 480 if self._ShouldWorkerTerminateUnlocked(worker): 481 return _TERMINATE 482 483 # If there's a pending task, return it immediately 484 if self._active and self._tasks: 485 # Get task from queue and tell pool about it 486 try: 487 task = heapq.heappop(self._tasks) 488 finally: 489 self._worker_to_pool.notifyAll() 490 491 (_, _, task_id, args) = task 492 493 # If the priority was changed, "args" is None 494 if args is None: 495 # Try again 496 logging.debug("Found abandoned task (%r)", task) 497 continue 498 499 # Delete reference 500 if task_id is not None: 501 del self._taskdata[task_id] 502 503 return task 504 505 logging.debug("Waiting for tasks") 506 507 # wait() releases the lock and sleeps until notified 508 self._pool_to_worker.wait() 509 510 logging.debug("Notified while waiting")
511
512 - def _ShouldWorkerTerminateUnlocked(self, worker):
513 """Returns whether a worker should terminate. 514 515 """ 516 return (worker in self._termworkers)
517
518 - def _HasRunningTasksUnlocked(self):
519 """Checks whether there's a task running in a worker. 520 521 """ 522 for worker in self._workers + self._termworkers: 523 if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212 524 return True 525 return False
526
527 - def HasRunningTasks(self):
528 """Checks whether there's at least one task running. 529 530 """ 531 self._lock.acquire() 532 try: 533 return self._HasRunningTasksUnlocked() 534 finally: 535 self._lock.release()
536
537 - def Quiesce(self):
538 """Waits until the task queue is empty. 539 540 """ 541 self._lock.acquire() 542 try: 543 self._quiescing = True 544 545 # Wait while there are tasks pending or running 546 while self._tasks or self._HasRunningTasksUnlocked(): 547 self._worker_to_pool.wait() 548 549 finally: 550 self._quiescing = False 551 552 # Make sure AddTasks continues in case it was waiting 553 self._pool_to_pool.notifyAll() 554 555 self._lock.release()
556
557 - def _NewWorkerIdUnlocked(self):
558 """Return an identifier for a new worker. 559 560 """ 561 self._last_worker_id += 1 562 563 return "%s%d" % (self._name, self._last_worker_id)
564
565 - def _ResizeUnlocked(self, num_workers):
566 """Changes the number of workers. 567 568 """ 569 assert num_workers >= 0, "num_workers must be >= 0" 570 571 logging.debug("Resizing to %s workers", num_workers) 572 573 current_count = len(self._workers) 574 575 if current_count == num_workers: 576 # Nothing to do 577 pass 578 579 elif current_count > num_workers: 580 if num_workers == 0: 581 # Create copy of list to iterate over while lock isn't held. 582 termworkers = self._workers[:] 583 del self._workers[:] 584 else: 585 # TODO: Implement partial downsizing 586 raise NotImplementedError() 587 #termworkers = ... 588 589 self._termworkers += termworkers 590 591 # Notify workers that something has changed 592 self._pool_to_worker.notifyAll() 593 594 # Join all terminating workers 595 self._lock.release() 596 try: 597 for worker in termworkers: 598 logging.debug("Waiting for thread %s", worker.getName()) 599 worker.join() 600 finally: 601 self._lock.acquire() 602 603 # Remove terminated threads. This could be done in a more efficient way 604 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we 605 # don't leave zombie threads around. 606 for worker in termworkers: 607 assert worker in self._termworkers, ("Worker not in list of" 608 " terminating workers") 609 if not worker.isAlive(): 610 self._termworkers.remove(worker) 611 612 assert not self._termworkers, "Zombie worker detected" 613 614 elif current_count < num_workers: 615 # Create (num_workers - current_count) new workers 616 for _ in range(num_workers - current_count): 617 worker = self._worker_class(self, self._NewWorkerIdUnlocked()) 618 self._workers.append(worker) 619 worker.start()
620
621 - def Resize(self, num_workers):
622 """Changes the number of workers in the pool. 623 624 @param num_workers: the new number of workers 625 626 """ 627 self._lock.acquire() 628 try: 629 return self._ResizeUnlocked(num_workers) 630 finally: 631 self._lock.release()
632
633 - def TerminateWorkers(self):
634 """Terminate all worker threads. 635 636 Unstarted tasks will be ignored. 637 638 """ 639 logging.debug("Terminating all workers") 640 641 self._lock.acquire() 642 try: 643 self._ResizeUnlocked(0) 644 645 if self._tasks: 646 logging.debug("There are %s tasks left", len(self._tasks)) 647 finally: 648 self._lock.release() 649 650 logging.debug("All workers terminated")
651