Package ganeti :: Module workerpool
[hide private]
[frames] | no frames]

Source Code for Module ganeti.workerpool

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2008, 2009, 2010 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Base classes for worker pools. 
 32   
 33  """ 
 34   
 35  import logging 
 36  import threading 
 37  import heapq 
 38  import itertools 
 39   
 40  from ganeti import compat 
 41  from ganeti import errors 
 42   
 43   
 44  _TERMINATE = object() 
 45  _DEFAULT_PRIORITY = 0 
 46   
 47   
48 -class DeferTask(Exception):
49 """Special exception class to defer a task. 50 51 This class can be raised by L{BaseWorker.RunTask} to defer the execution of a 52 task. Optionally, the priority of the task can be changed. 53 54 """
55 - def __init__(self, priority=None):
56 """Initializes this class. 57 58 @type priority: number 59 @param priority: New task priority (None means no change) 60 61 """ 62 Exception.__init__(self) 63 self.priority = priority
64 65
66 -class NoSuchTask(Exception):
67 """Exception raised when a task can't be found. 68 69 """
70 71
72 -class BaseWorker(threading.Thread, object):
73 """Base worker class for worker pools. 74 75 Users of a worker pool must override RunTask in a subclass. 76 77 """ 78 # pylint: disable=W0212
79 - def __init__(self, pool, worker_id):
80 """Constructor for BaseWorker thread. 81 82 @param pool: the parent worker pool 83 @param worker_id: identifier for this worker 84 85 """ 86 super(BaseWorker, self).__init__(name=worker_id) 87 self.pool = pool 88 self._worker_id = worker_id 89 self._current_task = None 90 91 assert self.getName() == worker_id
92
93 - def ShouldTerminate(self):
94 """Returns whether this worker should terminate. 95 96 Should only be called from within L{RunTask}. 97 98 """ 99 self.pool._lock.acquire() 100 try: 101 assert self._HasRunningTaskUnlocked() 102 return self.pool._ShouldWorkerTerminateUnlocked(self) 103 finally: 104 self.pool._lock.release()
105
106 - def GetCurrentPriority(self):
107 """Returns the priority of the current task. 108 109 Should only be called from within L{RunTask}. 110 111 """ 112 self.pool._lock.acquire() 113 try: 114 assert self._HasRunningTaskUnlocked() 115 116 (priority, _, _, _) = self._current_task 117 118 return priority 119 finally: 120 self.pool._lock.release()
121
122 - def SetTaskName(self, taskname):
123 """Sets the name of the current task. 124 125 Should only be called from within L{RunTask}. 126 127 @type taskname: string 128 @param taskname: Task's name 129 130 """ 131 if taskname: 132 name = "%s/%s" % (self._worker_id, taskname) 133 else: 134 name = self._worker_id 135 136 # Set thread name 137 self.setName(name)
138
139 - def _HasRunningTaskUnlocked(self):
140 """Returns whether this worker is currently running a task. 141 142 """ 143 return (self._current_task is not None)
144
146 """Returns the order and task ID of the current task. 147 148 Should only be called from within L{RunTask}. 149 150 """ 151 self.pool._lock.acquire() 152 try: 153 assert self._HasRunningTaskUnlocked() 154 155 (_, order_id, task_id, _) = self._current_task 156 157 return (order_id, task_id) 158 finally: 159 self.pool._lock.release()
160
161 - def run(self):
162 """Main thread function. 163 164 Waits for new tasks to show up in the queue. 165 166 """ 167 pool = self.pool 168 169 while True: 170 assert self._current_task is None 171 172 defer = None 173 try: 174 # Wait on lock to be told either to terminate or to do a task 175 pool._lock.acquire() 176 try: 177 task = pool._WaitForTaskUnlocked(self) 178 179 if task is _TERMINATE: 180 # Told to terminate 181 break 182 183 if task is None: 184 # Spurious notification, ignore 185 continue 186 187 self._current_task = task 188 189 # No longer needed, dispose of reference 190 del task 191 192 assert self._HasRunningTaskUnlocked() 193 194 finally: 195 pool._lock.release() 196 197 (priority, _, _, args) = self._current_task 198 try: 199 # Run the actual task 200 assert defer is None 201 logging.debug("Starting task %r, priority %s", args, priority) 202 assert self.getName() == self._worker_id 203 try: 204 self.RunTask(*args) # pylint: disable=W0142 205 finally: 206 self.SetTaskName(None) 207 logging.debug("Done with task %r, priority %s", args, priority) 208 except DeferTask, err: 209 defer = err 210 211 if defer.priority is None: 212 # Use same priority 213 defer.priority = priority 214 215 logging.debug("Deferring task %r, new priority %s", 216 args, defer.priority) 217 218 assert self._HasRunningTaskUnlocked() 219 except: # pylint: disable=W0702 220 logging.exception("Caught unhandled exception") 221 222 assert self._HasRunningTaskUnlocked() 223 finally: 224 # Notify pool 225 pool._lock.acquire() 226 try: 227 if defer: 228 assert self._current_task 229 # Schedule again for later run 230 (_, _, task_id, args) = self._current_task 231 pool._AddTaskUnlocked(args, defer.priority, task_id) 232 233 if self._current_task: 234 self._current_task = None 235 pool._worker_to_pool.notifyAll() 236 finally: 237 pool._lock.release() 238 239 assert not self._HasRunningTaskUnlocked() 240 241 logging.debug("Terminates")
242
243 - def RunTask(self, *args):
244 """Function called to start a task. 245 246 This needs to be implemented by child classes. 247 248 """ 249 raise NotImplementedError()
250 251
252 -class WorkerPool(object):
253 """Worker pool with a queue. 254 255 This class is thread-safe. 256 257 Tasks are guaranteed to be started in the order in which they're 258 added to the pool. Due to the nature of threading, they're not 259 guaranteed to finish in the same order. 260 261 @type _tasks: list of tuples 262 @ivar _tasks: Each tuple has the format (priority, order ID, task ID, 263 arguments). Priority and order ID are numeric and essentially control the 264 sort order. The order ID is an increasing number denoting the order in 265 which tasks are added to the queue. The task ID is controlled by user of 266 workerpool, see L{AddTask} for details. The task arguments are C{None} for 267 abandoned tasks, otherwise a sequence of arguments to be passed to 268 L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by 269 the C{heapq} module). 270 @type _taskdata: dict; (task IDs as keys, tuples as values) 271 @ivar _taskdata: Mapping from task IDs to entries in L{_tasks} 272 273 """
274 - def __init__(self, name, num_workers, worker_class):
275 """Constructor for worker pool. 276 277 @param num_workers: number of workers to be started 278 (dynamic resizing is not yet implemented) 279 @param worker_class: the class to be instantiated for workers; 280 should derive from L{BaseWorker} 281 282 """ 283 # Some of these variables are accessed by BaseWorker 284 self._lock = threading.Lock() 285 self._pool_to_pool = threading.Condition(self._lock) 286 self._pool_to_worker = threading.Condition(self._lock) 287 self._worker_to_pool = threading.Condition(self._lock) 288 self._worker_class = worker_class 289 self._name = name 290 self._last_worker_id = 0 291 self._workers = [] 292 self._quiescing = False 293 self._active = True 294 295 # Terminating workers 296 self._termworkers = [] 297 298 # Queued tasks 299 self._counter = itertools.count() 300 self._tasks = [] 301 self._taskdata = {} 302 303 # Start workers 304 self.Resize(num_workers)
305 306 # TODO: Implement dynamic resizing? 307
309 """Wait until the worker pool has finished quiescing. 310 311 """ 312 while self._quiescing: 313 self._pool_to_pool.wait()
314
315 - def _AddTaskUnlocked(self, args, priority, task_id):
316 """Adds a task to the internal queue. 317 318 @type args: sequence 319 @param args: Arguments passed to L{BaseWorker.RunTask} 320 @type priority: number 321 @param priority: Task priority 322 @param task_id: Task ID 323 324 """ 325 assert isinstance(args, (tuple, list)), "Arguments must be a sequence" 326 assert isinstance(priority, (int, long)), "Priority must be numeric" 327 assert task_id is None or isinstance(task_id, (int, long)), \ 328 "Task ID must be numeric or None" 329 330 task = [priority, self._counter.next(), task_id, args] 331 332 if task_id is not None: 333 assert task_id not in self._taskdata 334 # Keep a reference to change priority later if necessary 335 self._taskdata[task_id] = task 336 337 # A counter is used to ensure elements are processed in their incoming 338 # order. For processing they're sorted by priority and then counter. 339 heapq.heappush(self._tasks, task) 340 341 # Notify a waiting worker 342 self._pool_to_worker.notify()
343
344 - def AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None):
345 """Adds a task to the queue. 346 347 @type args: sequence 348 @param args: arguments passed to L{BaseWorker.RunTask} 349 @type priority: number 350 @param priority: Task priority 351 @param task_id: Task ID 352 @note: The task ID can be essentially anything that can be used as a 353 dictionary key. Callers, however, must ensure a task ID is unique while a 354 task is in the pool or while it might return to the pool due to deferring 355 using L{DeferTask}. 356 357 """ 358 self._lock.acquire() 359 try: 360 self._WaitWhileQuiescingUnlocked() 361 self._AddTaskUnlocked(args, priority, task_id) 362 finally: 363 self._lock.release()
364
365 - def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None):
366 """Add a list of tasks to the queue. 367 368 @type tasks: list of tuples 369 @param tasks: list of args passed to L{BaseWorker.RunTask} 370 @type priority: number or list of numbers 371 @param priority: Priority for all added tasks or a list with the priority 372 for each task 373 @type task_id: list 374 @param task_id: List with the ID for each task 375 @note: See L{AddTask} for a note on task IDs. 376 377 """ 378 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ 379 "Each task must be a sequence" 380 assert (isinstance(priority, (int, long)) or 381 compat.all(isinstance(prio, (int, long)) for prio in priority)), \ 382 "Priority must be numeric or be a list of numeric values" 383 assert task_id is None or isinstance(task_id, (tuple, list)), \ 384 "Task IDs must be in a sequence" 385 386 if isinstance(priority, (int, long)): 387 priority = [priority] * len(tasks) 388 elif len(priority) != len(tasks): 389 raise errors.ProgrammerError("Number of priorities (%s) doesn't match" 390 " number of tasks (%s)" % 391 (len(priority), len(tasks))) 392 393 if task_id is None: 394 task_id = [None] * len(tasks) 395 elif len(task_id) != len(tasks): 396 raise errors.ProgrammerError("Number of task IDs (%s) doesn't match" 397 " number of tasks (%s)" % 398 (len(task_id), len(tasks))) 399 400 self._lock.acquire() 401 try: 402 self._WaitWhileQuiescingUnlocked() 403 404 assert compat.all(isinstance(prio, (int, long)) for prio in priority) 405 assert len(tasks) == len(priority) 406 assert len(tasks) == len(task_id) 407 408 for (args, prio, tid) in zip(tasks, priority, task_id): 409 self._AddTaskUnlocked(args, prio, tid) 410 finally: 411 self._lock.release()
412
413 - def ChangeTaskPriority(self, task_id, priority):
414 """Changes a task's priority. 415 416 @param task_id: Task ID 417 @type priority: number 418 @param priority: New task priority 419 @raise NoSuchTask: When the task referred by C{task_id} can not be found 420 (it may never have existed, may have already been processed, or is 421 currently running) 422 423 """ 424 assert isinstance(priority, (int, long)), "Priority must be numeric" 425 426 self._lock.acquire() 427 try: 428 logging.debug("About to change priority of task %s to %s", 429 task_id, priority) 430 431 # Find old task 432 oldtask = self._taskdata.get(task_id, None) 433 if oldtask is None: 434 msg = "Task '%s' was not found" % task_id 435 logging.debug(msg) 436 raise NoSuchTask(msg) 437 438 # Prepare new task 439 newtask = [priority] + oldtask[1:] 440 441 # Mark old entry as abandoned (this doesn't change the sort order and 442 # therefore doesn't invalidate the heap property of L{self._tasks}). 443 # See also <http://docs.python.org/library/heapq.html#priority-queue- 444 # implementation-notes>. 445 oldtask[-1] = None 446 447 # Change reference to new task entry and forget the old one 448 assert task_id is not None 449 self._taskdata[task_id] = newtask 450 451 # Add a new task with the old number and arguments 452 heapq.heappush(self._tasks, newtask) 453 454 # Notify a waiting worker 455 self._pool_to_worker.notify() 456 finally: 457 self._lock.release()
458
459 - def SetActive(self, active):
460 """Enable/disable processing of tasks. 461 462 This is different from L{Quiesce} in the sense that this function just 463 changes an internal flag and doesn't wait for the queue to be empty. Tasks 464 already being processed continue normally, but no new tasks will be 465 started. New tasks can still be added. 466 467 @type active: bool 468 @param active: Whether tasks should be processed 469 470 """ 471 self._lock.acquire() 472 try: 473 self._active = active 474 475 if active: 476 # Tell all workers to continue processing 477 self._pool_to_worker.notifyAll() 478 finally: 479 self._lock.release()
480
481 - def _WaitForTaskUnlocked(self, worker):
482 """Waits for a task for a worker. 483 484 @type worker: L{BaseWorker} 485 @param worker: Worker thread 486 487 """ 488 while True: 489 if self._ShouldWorkerTerminateUnlocked(worker): 490 return _TERMINATE 491 492 # If there's a pending task, return it immediately 493 if self._active and self._tasks: 494 # Get task from queue and tell pool about it 495 try: 496 task = heapq.heappop(self._tasks) 497 finally: 498 self._worker_to_pool.notifyAll() 499 500 (_, _, task_id, args) = task 501 502 # If the priority was changed, "args" is None 503 if args is None: 504 # Try again 505 logging.debug("Found abandoned task (%r)", task) 506 continue 507 508 # Delete reference 509 if task_id is not None: 510 del self._taskdata[task_id] 511 512 return task 513 514 logging.debug("Waiting for tasks") 515 516 # wait() releases the lock and sleeps until notified 517 self._pool_to_worker.wait() 518 519 logging.debug("Notified while waiting")
520
521 - def _ShouldWorkerTerminateUnlocked(self, worker):
522 """Returns whether a worker should terminate. 523 524 """ 525 return (worker in self._termworkers)
526
527 - def _HasRunningTasksUnlocked(self):
528 """Checks whether there's a task running in a worker. 529 530 """ 531 for worker in self._workers + self._termworkers: 532 if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212 533 return True 534 return False
535
536 - def HasRunningTasks(self):
537 """Checks whether there's at least one task running. 538 539 """ 540 self._lock.acquire() 541 try: 542 return self._HasRunningTasksUnlocked() 543 finally: 544 self._lock.release()
545
546 - def Quiesce(self):
547 """Waits until the task queue is empty. 548 549 """ 550 self._lock.acquire() 551 try: 552 self._quiescing = True 553 554 # Wait while there are tasks pending or running 555 while self._tasks or self._HasRunningTasksUnlocked(): 556 self._worker_to_pool.wait() 557 558 finally: 559 self._quiescing = False 560 561 # Make sure AddTasks continues in case it was waiting 562 self._pool_to_pool.notifyAll() 563 564 self._lock.release()
565
566 - def _NewWorkerIdUnlocked(self):
567 """Return an identifier for a new worker. 568 569 """ 570 self._last_worker_id += 1 571 572 return "%s%d" % (self._name, self._last_worker_id)
573
574 - def _ResizeUnlocked(self, num_workers):
575 """Changes the number of workers. 576 577 """ 578 assert num_workers >= 0, "num_workers must be >= 0" 579 580 logging.debug("Resizing to %s workers", num_workers) 581 582 current_count = len(self._workers) 583 584 if current_count == num_workers: 585 # Nothing to do 586 pass 587 588 elif current_count > num_workers: 589 if num_workers == 0: 590 # Create copy of list to iterate over while lock isn't held. 591 termworkers = self._workers[:] 592 del self._workers[:] 593 else: 594 # TODO: Implement partial downsizing 595 raise NotImplementedError() 596 #termworkers = ... 597 598 self._termworkers += termworkers 599 600 # Notify workers that something has changed 601 self._pool_to_worker.notifyAll() 602 603 # Join all terminating workers 604 self._lock.release() 605 try: 606 for worker in termworkers: 607 logging.debug("Waiting for thread %s", worker.getName()) 608 worker.join() 609 finally: 610 self._lock.acquire() 611 612 # Remove terminated threads. This could be done in a more efficient way 613 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we 614 # don't leave zombie threads around. 615 for worker in termworkers: 616 assert worker in self._termworkers, ("Worker not in list of" 617 " terminating workers") 618 if not worker.isAlive(): 619 self._termworkers.remove(worker) 620 621 assert not self._termworkers, "Zombie worker detected" 622 623 elif current_count < num_workers: 624 # Create (num_workers - current_count) new workers 625 for _ in range(num_workers - current_count): 626 worker = self._worker_class(self, self._NewWorkerIdUnlocked()) 627 self._workers.append(worker) 628 worker.start()
629
630 - def Resize(self, num_workers):
631 """Changes the number of workers in the pool. 632 633 @param num_workers: the new number of workers 634 635 """ 636 self._lock.acquire() 637 try: 638 return self._ResizeUnlocked(num_workers) 639 finally: 640 self._lock.release()
641
642 - def TerminateWorkers(self):
643 """Terminate all worker threads. 644 645 Unstarted tasks will be ignored. 646 647 """ 648 logging.debug("Terminating all workers") 649 650 self._lock.acquire() 651 try: 652 self._ResizeUnlocked(0) 653 654 if self._tasks: 655 logging.debug("There are %s tasks left", len(self._tasks)) 656 finally: 657 self._lock.release() 658 659 logging.debug("All workers terminated")
660