The apply_async
and map_async
are designed to not block the main process. For this, Pool
has an internal Queue
size of which, unfortunately, cannot be changed.
The way to solve the problem is to use a Semaphore
initialized with the size that you want the queue to be. You acquire and release the semaphore before the pool is full and after the employee has completed the task.
Here is an example of working with Python 2.6 or higher.
from threading import Semaphore from multiprocessing import Pool def task_wrapper(f): """Python2 does not allow a callback for method raising exceptions, this wrapper ensures the code run into the worker will be exception free. """ try: return f() except: return None def TaskManager(object): def __init__(self, processes, queue_size): self.pool = Pool(processes=processes) self.workers = Semaphore(processes + queue_size) def new_task(self, f): """Start a new task, blocks if queue is full.""" self.workers.acquire() self.pool.apply_async(task_wrapper, args=(f, ), callback=self.task_done)) def task_done(self): """Called once task is done, releases the queue is blocked.""" self.workers.release()
Another example using the concurrent.futures
pool implementation.
source share