Package ganeti :: Module workerpool
[hide private]
[frames] | no frames]

Source Code for Module ganeti.workerpool

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2008, 2009, 2010 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Base classes for worker pools. 
 32   
 33  """ 
 34   
 35  import logging 
 36  import threading 
 37  import heapq 
 38  import itertools 
 39   
 40  from ganeti import compat 
 41  from ganeti import errors 
 42   
 43   
 44  _TERMINATE = object() 
 45  _DEFAULT_PRIORITY = 0 
 46   
 47   
48 -class DeferTask(Exception):
49 """Special exception class to defer a task. 50 51 This class can be raised by L{BaseWorker.RunTask} to defer the execution of a 52 task. Optionally, the priority of the task can be changed. 53 54 """
55 - def __init__(self, priority=None):
56 """Initializes this class. 57 58 @type priority: number 59 @param priority: New task priority (None means no change) 60 61 """ 62 Exception.__init__(self) 63 self.priority = priority
64 65
66 -class NoSuchTask(Exception):
67 """Exception raised when a task can't be found. 68 69 """
70 71
72 -class BaseWorker(threading.Thread, object):
73 """Base worker class for worker pools. 74 75 Users of a worker pool must override RunTask in a subclass. 76 77 """ 78 # pylint: disable=W0212
79 - def __init__(self, pool, worker_id):
80 """Constructor for BaseWorker thread. 81 82 @param pool: the parent worker pool 83 @param worker_id: identifier for this worker 84 85 """ 86 super(BaseWorker, self).__init__(name=worker_id) 87 self.pool = pool 88 self._worker_id = worker_id 89 self._current_task = None 90 91 assert self.getName() == worker_id
92
93 - def ShouldTerminate(self):
94 """Returns whether this worker should terminate. 95 96 Should only be called from within L{RunTask}. 97 98 """ 99 self.pool._lock.acquire() 100 try: 101 assert self._HasRunningTaskUnlocked() 102 return self.pool._ShouldWorkerTerminateUnlocked(self) 103 finally: 104 self.pool._lock.release()
105
106 - def GetCurrentPriority(self):
107 """Returns the priority of the current task. 108 109 Should only be called from within L{RunTask}. 110 111 """ 112 self.pool._lock.acquire() 113 try: 114 assert self._HasRunningTaskUnlocked() 115 116 (priority, _, _, _) = self._current_task 117 118 return priority 119 finally: 120 self.pool._lock.release()
121
122 - def SetTaskName(self, taskname):
123 """Sets the name of the current task. 124 125 Should only be called from within L{RunTask}. 126 127 @type taskname: string 128 @param taskname: Task's name 129 130 """ 131 if taskname: 132 name = "%s/%s" % (self._worker_id, taskname) 133 else: 134 name = self._worker_id 135 136 # Set thread name 137 self.setName(name)
138
139 - def _HasRunningTaskUnlocked(self):
140 """Returns whether this worker is currently running a task. 141 142 """ 143 return (self._current_task is not None)
144
146 """Returns the order and task ID of the current task. 147 148 Should only be called from within L{RunTask}. 149 150 """ 151 self.pool._lock.acquire() 152 try: 153 assert self._HasRunningTaskUnlocked() 154 155 (_, order_id, task_id, _) = self._current_task 156 157 return (order_id, task_id) 158 finally: 159 self.pool._lock.release()
160
161 - def run(self):
162 """Main thread function. 163 164 Waits for new tasks to show up in the queue. 165 166 """ 167 pool = self.pool 168 169 while True: 170 assert self._current_task is None 171 172 defer = None 173 try: 174 # Wait on lock to be told either to terminate or to do a task 175 pool._lock.acquire() 176 try: 177 task = pool._WaitForTaskUnlocked(self) 178 179 if task is _TERMINATE: 180 # Told to terminate 181 break 182 183 if task is None: 184 # Spurious notification, ignore 185 continue 186 187 self._current_task = task 188 189 # No longer needed, dispose of reference 190 del task 191 192 assert self._HasRunningTaskUnlocked() 193 194 finally: 195 pool._lock.release() 196 197 (priority, _, _, args) = self._current_task 198 try: 199 # Run the actual task 200 assert defer is None 201 logging.debug("Starting task %r, priority %s", args, priority) 202 assert self.getName() == self._worker_id 203 try: 204 self.RunTask(*args) # pylint: disable=W0142 205 finally: 206 self.SetTaskName(None) 207 logging.debug("Done with task %r, priority %s", args, priority) 208 except DeferTask, err: 209 defer = err 210 211 if defer.priority is None: 212 # Use same priority 213 defer.priority = priority 214 215 logging.debug("Deferring task %r, new priority %s", 216 args, defer.priority) 217 218 assert self._HasRunningTaskUnlocked() 219 except: # pylint: disable=W0702 220 logging.exception("Caught unhandled exception") 221 222 assert self._HasRunningTaskUnlocked() 223 finally: 224 # Notify pool 225 pool._lock.acquire() 226 try: 227 if defer: 228 assert self._current_task 229 # Schedule again for later run 230 (_, _, task_id, args) = self._current_task 231 pool._AddTaskUnlocked(args, defer.priority, task_id) 232 233 if self._current_task: 234 self._current_task = None 235 pool._worker_to_pool.notifyAll() 236 finally: 237 pool._lock.release() 238 239 assert not self._HasRunningTaskUnlocked() 240 241 logging.debug("Terminates")
242
243 - def RunTask(self, *args):
244 """Function called to start a task. 245 246 This needs to be implemented by child classes. 247 248 """ 249 raise NotImplementedError()
250 251
252 -class WorkerPool(object):
253 """Worker pool with a queue. 254 255 This class is thread-safe. 256 257 Tasks are guaranteed to be started in the order in which they're 258 added to the pool. Due to the nature of threading, they're not 259 guaranteed to finish in the same order. 260 261 @type _tasks: list of tuples 262 @ivar _tasks: Each tuple has the format (priority, order ID, task ID, 263 arguments). Priority and order ID are numeric and essentially control the 264 sort order. The order ID is an increasing number denoting the order in 265 which tasks are added to the queue. The task ID is controlled by user of 266 workerpool, see L{AddTask} for details. The task arguments are C{None} for 267 abandoned tasks, otherwise a sequence of arguments to be passed to 268 L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by 269 the C{heapq} module). 270 @type _taskdata: dict; (task IDs as keys, tuples as values) 271 @ivar _taskdata: Mapping from task IDs to entries in L{_tasks} 272 273 """
274 - def __init__(self, name, num_workers, worker_class):
275 """Constructor for worker pool. 276 277 @param num_workers: number of workers to be started 278 (dynamic resizing is not yet implemented) 279 @param worker_class: the class to be instantiated for workers; 280 should derive from L{BaseWorker} 281 282 """ 283 # Some of these variables are accessed by BaseWorker 284 self._lock = threading.Lock() 285 self._pool_to_pool = threading.Condition(self._lock) 286 self._pool_to_worker = threading.Condition(self._lock) 287 self._worker_to_pool = threading.Condition(self._lock) 288 self._worker_class = worker_class 289 self._name = name 290 self._last_worker_id = 0 291 self._workers = [] 292 self._quiescing = False 293 294 # Terminating workers 295 self._termworkers = [] 296 297 # Queued tasks 298 self._counter = itertools.count() 299 self._tasks = [] 300 self._taskdata = {} 301 302 # Start workers 303 self.Resize(num_workers)
304 305 # TODO: Implement dynamic resizing? 306
308 """Wait until the worker pool has finished quiescing. 309 310 """ 311 while self._quiescing: 312 self._pool_to_pool.wait()
313
314 - def _AddTaskUnlocked(self, args, priority, task_id):
315 """Adds a task to the internal queue. 316 317 @type args: sequence 318 @param args: Arguments passed to L{BaseWorker.RunTask} 319 @type priority: number 320 @param priority: Task priority 321 @param task_id: Task ID 322 323 """ 324 assert isinstance(args, (tuple, list)), "Arguments must be a sequence" 325 assert isinstance(priority, (int, long)), "Priority must be numeric" 326 assert task_id is None or isinstance(task_id, (int, long)), \ 327 "Task ID must be numeric or None" 328 329 task = [priority, self._counter.next(), task_id, args] 330 331 if task_id is not None: 332 assert task_id not in self._taskdata 333 # Keep a reference to change priority later if necessary 334 self._taskdata[task_id] = task 335 336 # A counter is used to ensure elements are processed in their incoming 337 # order. For processing they're sorted by priority and then counter. 338 heapq.heappush(self._tasks, task) 339 340 # Notify a waiting worker 341 self._pool_to_worker.notify()
342
343 - def AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None):
344 """Adds a task to the queue. 345 346 @type args: sequence 347 @param args: arguments passed to L{BaseWorker.RunTask} 348 @type priority: number 349 @param priority: Task priority 350 @param task_id: Task ID 351 @note: The task ID can be essentially anything that can be used as a 352 dictionary key. Callers, however, must ensure a task ID is unique while a 353 task is in the pool or while it might return to the pool due to deferring 354 using L{DeferTask}. 355 356 """ 357 self._lock.acquire() 358 try: 359 self._WaitWhileQuiescingUnlocked() 360 self._AddTaskUnlocked(args, priority, task_id) 361 finally: 362 self._lock.release()
363
364 - def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None):
365 """Add a list of tasks to the queue. 366 367 @type tasks: list of tuples 368 @param tasks: list of args passed to L{BaseWorker.RunTask} 369 @type priority: number or list of numbers 370 @param priority: Priority for all added tasks or a list with the priority 371 for each task 372 @type task_id: list 373 @param task_id: List with the ID for each task 374 @note: See L{AddTask} for a note on task IDs. 375 376 """ 377 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ 378 "Each task must be a sequence" 379 assert (isinstance(priority, (int, long)) or 380 compat.all(isinstance(prio, (int, long)) for prio in priority)), \ 381 "Priority must be numeric or be a list of numeric values" 382 assert task_id is None or isinstance(task_id, (tuple, list)), \ 383 "Task IDs must be in a sequence" 384 385 if isinstance(priority, (int, long)): 386 priority = [priority] * len(tasks) 387 elif len(priority) != len(tasks): 388 raise errors.ProgrammerError("Number of priorities (%s) doesn't match" 389 " number of tasks (%s)" % 390 (len(priority), len(tasks))) 391 392 if task_id is None: 393 task_id = [None] * len(tasks) 394 elif len(task_id) != len(tasks): 395 raise errors.ProgrammerError("Number of task IDs (%s) doesn't match" 396 " number of tasks (%s)" % 397 (len(task_id), len(tasks))) 398 399 self._lock.acquire() 400 try: 401 self._WaitWhileQuiescingUnlocked() 402 403 assert compat.all(isinstance(prio, (int, long)) for prio in priority) 404 assert len(tasks) == len(priority) 405 assert len(tasks) == len(task_id) 406 407 for (args, prio, tid) in zip(tasks, priority, task_id): 408 self._AddTaskUnlocked(args, prio, tid) 409 finally: 410 self._lock.release()
411
412 - def ChangeTaskPriority(self, task_id, priority):
413 """Changes a task's priority. 414 415 @param task_id: Task ID 416 @type priority: number 417 @param priority: New task priority 418 @raise NoSuchTask: When the task referred by C{task_id} can not be found 419 (it may never have existed, may have already been processed, or is 420 currently running) 421 422 """ 423 assert isinstance(priority, (int, long)), "Priority must be numeric" 424 425 self._lock.acquire() 426 try: 427 logging.debug("About to change priority of task %s to %s", 428 task_id, priority) 429 430 # Find old task 431 oldtask = self._taskdata.get(task_id, None) 432 if oldtask is None: 433 msg = "Task '%s' was not found" % task_id 434 logging.debug(msg) 435 raise NoSuchTask(msg) 436 437 # Prepare new task 438 newtask = [priority] + oldtask[1:] 439 440 # Mark old entry as abandoned (this doesn't change the sort order and 441 # therefore doesn't invalidate the heap property of L{self._tasks}). 442 # See also <http://docs.python.org/library/heapq.html#priority-queue- 443 # implementation-notes>. 444 oldtask[-1] = None 445 446 # Change reference to new task entry and forget the old one 447 assert task_id is not None 448 self._taskdata[task_id] = newtask 449 450 # Add a new task with the old number and arguments 451 heapq.heappush(self._tasks, newtask) 452 453 # Notify a waiting worker 454 self._pool_to_worker.notify() 455 finally: 456 self._lock.release()
457
458 - def _WaitForTaskUnlocked(self, worker):
459 """Waits for a task for a worker. 460 461 @type worker: L{BaseWorker} 462 @param worker: Worker thread 463 464 """ 465 while True: 466 if self._ShouldWorkerTerminateUnlocked(worker): 467 return _TERMINATE 468 469 # If there's a pending task, return it immediately 470 if self._tasks: 471 # Get task from queue and tell pool about it 472 try: 473 task = heapq.heappop(self._tasks) 474 finally: 475 self._worker_to_pool.notifyAll() 476 477 (_, _, task_id, args) = task 478 479 # If the priority was changed, "args" is None 480 if args is None: 481 # Try again 482 logging.debug("Found abandoned task (%r)", task) 483 continue 484 485 # Delete reference 486 if task_id is not None: 487 del self._taskdata[task_id] 488 489 return task 490 491 logging.debug("Waiting for tasks") 492 493 # wait() releases the lock and sleeps until notified 494 self._pool_to_worker.wait() 495 496 logging.debug("Notified while waiting")
497
498 - def _ShouldWorkerTerminateUnlocked(self, worker):
499 """Returns whether a worker should terminate. 500 501 """ 502 return (worker in self._termworkers)
503
504 - def _HasRunningTasksUnlocked(self):
505 """Checks whether there's a task running in a worker. 506 507 """ 508 for worker in self._workers + self._termworkers: 509 if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212 510 return True 511 return False
512
513 - def HasRunningTasks(self):
514 """Checks whether there's at least one task running. 515 516 """ 517 self._lock.acquire() 518 try: 519 return self._HasRunningTasksUnlocked() 520 finally: 521 self._lock.release()
522
523 - def Quiesce(self):
524 """Waits until the task queue is empty. 525 526 """ 527 self._lock.acquire() 528 try: 529 self._quiescing = True 530 531 # Wait while there are tasks pending or running 532 while self._tasks or self._HasRunningTasksUnlocked(): 533 self._worker_to_pool.wait() 534 535 finally: 536 self._quiescing = False 537 538 # Make sure AddTasks continues in case it was waiting 539 self._pool_to_pool.notifyAll() 540 541 self._lock.release()
542
543 - def _NewWorkerIdUnlocked(self):
544 """Return an identifier for a new worker. 545 546 """ 547 self._last_worker_id += 1 548 549 return "%s%d" % (self._name, self._last_worker_id)
550
551 - def _ResizeUnlocked(self, num_workers):
552 """Changes the number of workers. 553 554 """ 555 assert num_workers >= 0, "num_workers must be >= 0" 556 557 logging.debug("Resizing to %s workers", num_workers) 558 559 current_count = len(self._workers) 560 561 if current_count == num_workers: 562 # Nothing to do 563 pass 564 565 elif current_count > num_workers: 566 if num_workers == 0: 567 # Create copy of list to iterate over while lock isn't held. 568 termworkers = self._workers[:] 569 del self._workers[:] 570 else: 571 # TODO: Implement partial downsizing 572 raise NotImplementedError() 573 #termworkers = ... 574 575 self._termworkers += termworkers 576 577 # Notify workers that something has changed 578 self._pool_to_worker.notifyAll() 579 580 # Join all terminating workers 581 self._lock.release() 582 try: 583 for worker in termworkers: 584 logging.debug("Waiting for thread %s", worker.getName()) 585 worker.join() 586 finally: 587 self._lock.acquire() 588 589 # Remove terminated threads. This could be done in a more efficient way 590 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we 591 # don't leave zombie threads around. 592 for worker in termworkers: 593 assert worker in self._termworkers, ("Worker not in list of" 594 " terminating workers") 595 if not worker.isAlive(): 596 self._termworkers.remove(worker) 597 598 assert not self._termworkers, "Zombie worker detected" 599 600 elif current_count < num_workers: 601 # Create (num_workers - current_count) new workers 602 for _ in range(num_workers - current_count): 603 worker = self._worker_class(self, self._NewWorkerIdUnlocked()) 604 self._workers.append(worker) 605 worker.start()
606
607 - def Resize(self, num_workers):
608 """Changes the number of workers in the pool. 609 610 @param num_workers: the new number of workers 611 612 """ 613 self._lock.acquire() 614 try: 615 return self._ResizeUnlocked(num_workers) 616 finally: 617 self._lock.release()
618
619 - def TerminateWorkers(self):
620 """Terminate all worker threads. 621 622 Unstarted tasks will be ignored. 623 624 """ 625 logging.debug("Terminating all workers") 626 627 self._lock.acquire() 628 try: 629 self._ResizeUnlocked(0) 630 631 if self._tasks: 632 logging.debug("There are %s tasks left", len(self._tasks)) 633 finally: 634 self._lock.release() 635 636 logging.debug("All workers terminated")
637