Python pool apply_async and map_async not blocking in full queue

I am new to python. I use the multiprocessing module to read lines of text on stdin, somehow converting them and writing them to the database. Here is a snippet of my code:

batch = [] pool = multiprocessing.Pool(20) i = 0 for i, content in enumerate(sys.stdin): batch.append(content) if len(batch) >= 10000: pool.apply_async(insert, args=(batch,i+1)) batch = [] pool.apply_async(insert, args=(batch,i)) pool.close() pool.join() 

Now that everything is working fine, until I process the huge input files (hundreds of millions of lines) that I connect to my python program. At some point, when my database gets slower, I see that the memory is full.

After some games, it turned out that pool.apply_async, as well as pool.map_async, were never blocked, so the queue of processed calls became more and more.

What is the correct approach to my problem? I would expect a parameter that I can set that will block the call to pool.apply_async as soon as a certain queue length is reached. AFAIR in Java can provide a ThreadPoolExecutor for a fixed-length BlockingQueue for this purpose.

Thanks!

+8
source share
4 answers

Just in case, someone ends here, here's how I solved the problem: I stopped using multiprocessing.Pool. Here's how I do it now:

 #set amount of concurrent processes that insert db data processes = multiprocessing.cpu_count() * 2 #setup batch queue queue = multiprocessing.Queue(processes * 2) #start processes for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start() #fill queue with batches batch=[] for i, content in enumerate(sys.stdin): batch.append(content) if len(batch) >= 10000: queue.put((batch,i+1)) batch = [] if batch: queue.put((batch,i+1)) #stop processes using poison-pill for _ in range(processes): queue.put((None,None)) print "all done." 

in the insertion method, processing of each batch ends in a cycle that is pulled out of the queue until it receives a poisonous pill:

 while True: batch, end = queue.get() if not batch and not end: return #poison pill! complete! [process the batch] print 'worker done.' 
+10
source

The apply_async and map_async are designed to not block the main process. For this, Pool has an internal Queue size of which, unfortunately, cannot be changed.

The way to solve the problem is to use a Semaphore initialized with the size that you want the queue to be. You acquire and release the semaphore before the pool is full and after the employee has completed the task.

Here is an example of working with Python 2.6 or higher.

 from threading import Semaphore from multiprocessing import Pool def task_wrapper(f): """Python2 does not allow a callback for method raising exceptions, this wrapper ensures the code run into the worker will be exception free. """ try: return f() except: return None def TaskManager(object): def __init__(self, processes, queue_size): self.pool = Pool(processes=processes) self.workers = Semaphore(processes + queue_size) def new_task(self, f): """Start a new task, blocks if queue is full.""" self.workers.acquire() self.pool.apply_async(task_wrapper, args=(f, ), callback=self.task_done)) def task_done(self): """Called once task is done, releases the queue is blocked.""" self.workers.release() 

Another example using the concurrent.futures pool implementation.

+9
source

apply_async returns an AsyncResult object, which you can wait on:

 if len(batch) >= 10000: r = pool.apply_async(insert, args=(batch, i+1)) r.wait() batch = [] 

Although if you want to do this in a cleaner way, you should use multiprocessing.Queue with maxsize of 10000 and derive the Worker class from multiprocessing.Process , which is extracted from such a queue.

+2
source

Not really, but you can access the internal size of the queue and wait until it drops below your maximum desired size before adding new elements:

 max_pool_queue_size = 20 for i in range(10000): pool.apply_async(some_func, args=(...)) while pool._taskqueue.qsize() > max_pool_queue_size: time.sleep(1) 
0
source

Source: https://habr.com/ru/post/910168/


All Articles