Multiprocessing on tee'd generators

Consider the following script, in which I test two ways to perform some calculations on generators obtained using itertools.tee :

 #!/usr/bin/env python3 from sys import argv from itertools import tee from multiprocessing import Process def my_generator(): for i in range(5): print(i) yield i def double(x): return 2 * x def compute_double_sum(iterable): s = sum(map(double, iterable)) print(s) def square(x): return x * x def compute_square_sum(iterable): s = sum(map(square, iterable)) print(s) g1, g2 = tee(my_generator(), 2) try: processing_type = argv[1] except IndexError: processing_type = "no_multi" if processing_type == "multi": p1 = Process(target=compute_double_sum, args=(g1,)) p2 = Process(target=compute_square_sum, args=(g2,)) print("p1 starts") p1.start() print("p2 starts") p2.start() p1.join() print("p1 finished") p2.join() print("p2 finished") else: compute_double_sum(g1) compute_square_sum(g2) 

Here is what I get when I run the script in normal mode:

 $ ./test_tee.py 0 1 2 3 4 20 30 

And here in parallel:

 $ ./test_tee.py multi p1 starts p2 starts 0 1 2 3 4 20 0 1 2 3 4 30 p1 finished p2 finished 

The original generator is apparently somehow “copied” and executed twice.

I would like to avoid this because in my real application this seems to be causing an error in one of the external libraries that I use to create the initial generator ( https://github.com/pysam-developers/pysam/issues/ 397 ) and can still perform calculations in parallel with the same generated values.

Is there any way to achieve what I want?

+5
source share
2 answers

I found an alternative way: fooobar.com/questions/1263388 / ....

In this approach, we no longer create a generator. We simply duplicate its generated elements and pass them into a composite function that performs parallel processing of the generated elements in only one process, but we use multiprocessing with Pool (is this what is called the map / reduction approach?):

 #!/usr/bin/env python3 from itertools import starmap from multiprocessing import Pool from functools import reduce from operator import add def my_generator(): for i in range(5): print(i) yield i def double(x): return 2 * x def square(x): return x * x def double_and_square(args_list): return (double(*args_list[0]), square(*args_list[1])) def sum_tuples(tup1, tup2): return tuple(starmap(add, zip(tup1, tup2))) with Pool(processes=5) as pool: results_generator = pool.imap_unordered(double_and_square, (((arg,), (arg,)) for arg in my_generator())) print(reduce(sum_tuples, results_generator)) 

This works on the example of toys. Now I have to figure out how to organize my calculations in the real case of the application in the same way.

I tried to generalize this using a higher order function ( make_funcs_applier ) to generate a composite function ( apply_funcs ), but I get the following error:

 AttributeError: Can't pickle local object 'make_funcs_applier.<locals>.apply_funcs' 

More generalized attempt

Based on the assumption in the comments, I tried to improve the above solution so that it can be reused:

 #!/usr/bin/env python3 """This script tries to work around some limitations of multiprocessing.""" from itertools import repeat, starmap from multiprocessing import Pool from functools import reduce from operator import add # Doesn't work because local functions can't be pickled: # def make_tuple_func(funcs): # def tuple_func(args_list): # return tuple(func(args) for func, args in zip(funcs, args_list)) # return tuple_func # # test_tuple_func = make_tuple_func((plus_one, double, square)) class FuncApplier(object): """This kind of object can be used to group functions and call them on a tuple of arguments.""" __slots__ = ("funcs", ) def __init__(self, funcs): self.funcs = funcs def __len__(self): return len(self.funcs) def __call__(self, args_list): return tuple(func(args) for func, args in zip(self.funcs, args_list)) def fork_args(self, args_list): """Takes an arguments list and repeat them in a n-tuple.""" return tuple(repeat(args_list, len(self))) def sum_tuples(*tuples): """Element-wise sum of tuple items.""" return tuple(starmap(add, zip(*tuples))) # Can't define these functions in main: # They wouldn't be pickleable. def plus_one(x): return x + 1 def double(x): return 2 * x def square(x): return x * x def main(): def my_generator(): for i in range(5): print(i) yield i test_tuple_func = FuncApplier((plus_one, double, square)) with Pool(processes=5) as pool: results_generator = pool.imap_unordered( test_tuple_func, (test_tuple_func.fork_args(args_list) for args_list in my_generator())) print("sum of x+1:\t%s\nsum of 2*x:\t%s\nsum of x*x:\t%s" % reduce( sum_tuples, results_generator)) exit(0) if __name__ == "__main__": exit(main()) 

Testing:

 $ ./test_fork.py 0 1 2 3 4 sum of x+1: 15 sum of 2*x: 20 sum of x*x: 30 

There are still some annoying limitations for me, because I often define local functions in my code often.

+1
source

The multiprocessing system imports your core module into each running process. Therefore, the module code is executed in each process.

You can avoid this by always recommending

 if __name__ == '__main__': 

after defining the class and function, therefore, the code for the main program is launched only during the launch process. This should be a requirement only for Windows platforms, but it might be worth a try, as you complain that the code runs twice.

0
source

Source: https://habr.com/ru/post/1263387/


All Articles