I found an alternative way: fooobar.com/questions/1263388 / ....
In this approach, we no longer create a generator. We simply duplicate its generated elements and pass them into a composite function that performs parallel processing of the generated elements in only one process, but we use multiprocessing with Pool (is this what is called the map / reduction approach?):
#!/usr/bin/env python3 from itertools import starmap from multiprocessing import Pool from functools import reduce from operator import add def my_generator(): for i in range(5): print(i) yield i def double(x): return 2 * x def square(x): return x * x def double_and_square(args_list): return (double(*args_list[0]), square(*args_list[1])) def sum_tuples(tup1, tup2): return tuple(starmap(add, zip(tup1, tup2))) with Pool(processes=5) as pool: results_generator = pool.imap_unordered(double_and_square, (((arg,), (arg,)) for arg in my_generator())) print(reduce(sum_tuples, results_generator))
This works on the example of toys. Now I have to figure out how to organize my calculations in the real case of the application in the same way.
I tried to generalize this using a higher order function ( make_funcs_applier ) to generate a composite function ( apply_funcs ), but I get the following error:
AttributeError: Can't pickle local object 'make_funcs_applier.<locals>.apply_funcs'
More generalized attempt
Based on the assumption in the comments, I tried to improve the above solution so that it can be reused:
#!/usr/bin/env python3 """This script tries to work around some limitations of multiprocessing.""" from itertools import repeat, starmap from multiprocessing import Pool from functools import reduce from operator import add # Doesn't work because local functions can't be pickled: # def make_tuple_func(funcs): # def tuple_func(args_list): # return tuple(func(args) for func, args in zip(funcs, args_list)) # return tuple_func # # test_tuple_func = make_tuple_func((plus_one, double, square)) class FuncApplier(object): """This kind of object can be used to group functions and call them on a tuple of arguments.""" __slots__ = ("funcs", ) def __init__(self, funcs): self.funcs = funcs def __len__(self): return len(self.funcs) def __call__(self, args_list): return tuple(func(args) for func, args in zip(self.funcs, args_list)) def fork_args(self, args_list): """Takes an arguments list and repeat them in a n-tuple.""" return tuple(repeat(args_list, len(self))) def sum_tuples(*tuples): """Element-wise sum of tuple items.""" return tuple(starmap(add, zip(*tuples))) # Can't define these functions in main: # They wouldn't be pickleable. def plus_one(x): return x + 1 def double(x): return 2 * x def square(x): return x * x def main(): def my_generator(): for i in range(5): print(i) yield i test_tuple_func = FuncApplier((plus_one, double, square)) with Pool(processes=5) as pool: results_generator = pool.imap_unordered( test_tuple_func, (test_tuple_func.fork_args(args_list) for args_list in my_generator())) print("sum of x+1:\t%s\nsum of 2*x:\t%s\nsum of x*x:\t%s" % reduce( sum_tuples, results_generator)) exit(0) if __name__ == "__main__": exit(main())
Testing:
$ ./test_fork.py 0 1 2 3 4 sum of x+1: 15 sum of 2*x: 20 sum of x*x: 30
There are still some annoying limitations for me, because I often define local functions in my code often.