How to change the number of parallel processes?

I have a python script that runs a method in parallel.

parsers = { 'parser1': parser1.process, 'parser2': parser2.process } def process((key, value)): parsers[key](value) pool = Pool(4) pool.map(process_items, items) 

process_items is my method, and items is a list of tuples with two elements for each tuple. The items list contains about 100 thousand items .

process_items will then call the method depending on what parameters are set. My problem is that maybe 70% of the list that I can run with high parallelism, but the remaining 30% can only work with 1/2 threads, otherwise it will crash out of my control.

So, in my code I have about 10 different parser processes. For example, 1-8 I want to work with pool (4), but 9-10 pool (2).

What is the best way to optimize this?

+6
source share
2 answers

I think your best option is to use two pools here:

 from multiprocessing import Pool # import parsers here parsers = { 'parser1': parser1.process, 'parser2': parser2.process, 'parser3': parser3.process, 'parser4': parser4.process, 'parser5': parser5.process, 'parser6': parser6.process, 'parser7': parser7.process, } # Sets that define which items can use high parallelism, # and which must use low high_par = {"parser1", "parser3", "parser4", "parser6", "parser7"} low_par = {"parser2", "parser5"} def process_items(key, value): parsers[key](value) def run_pool(func, items, num_items, check_set): pool = Pool(num_items) out = pool.map(func, (item for item in items if item[0] in check_set)) pool.close() pool.join() return out if __name__ == "__main__": items = [('parser2', x), ...] # Your list of tuples # Process with high parallelism high_results = run_pool(process_items, items, 4, high_par) # Process with low parallelism low_results = run_pool(process_items, items, 2, low_par) 

Trying to do this in one Pool is possible thanks to the clever use of synchronization primitives, but I donโ€™t think it will look much cleaner than that. In addition, it may work less efficiently, because sometimes your pool will have to wait until the work is finished, so it can process an element with a low parallelism value, even if there are higher parallelism elements in the queue.

This would be a little more complicated if you had to get the results from each call to process_items in the same order in which they fell in the original iterative value, which means that the results from each Pool should be combined, but based on your example, I donโ€™t think That is a requirement. Let me know if this is the case and I will try to adjust my answer accordingly.

+2
source

You can specify the number of parallel threads in the constructor for multiprocessing.Pool :

 from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': pool = Pool(5) # 5 is the number of parallel threads print pool.map(f, [1, 2, 3]) 
+1
source

Source: https://habr.com/ru/post/985618/


All Articles