Parallel recursive function in python?

How to parallelize a recursive function in Python?

My function looks like this:

def f(x, depth): if x==0: return ... else : return [x] + map(lambda x:f(x, depth-1), list_of_values(x)) def list_of_values(x): # heavy compute, pure function 

When you try to parallelize it using multiprocessing.Pool.map , windows open an infinite number of processes and freeze.

What is a good (preferably simple) way to parallelize it (for one multi-core machine)?

Here is the code that hangs:

 from multiprocessing import Pool pool = pool(processes=4) def f(x, depth): if x==0: return ... else : return [x] + pool.map(lambda x:f(x, depth-1), list_of_values(x)) def list_of_values(x): # heavy compute, pure function 
+4
source share
2 answers

ok, sorry for the problems with this.

I'm going to answer a slightly different question, where f() returns the sum of the values ​​in a list. this is because it is not clear to me from your example what the return type is f() , and using the whole code makes the code understandable.

this is difficult because two different things happen in parallel:

  • calculating an expensive function in a pool
  • recursive decomposition f()

I am very careful to use the pool to calculate an expensive function. Thus, we do not get an “explosion” of processes. but since it is asynchronous, we need to defer most of the work for the callback that the worker calls when an expensive function is executed.

furthermore, we need to use the countdown latch so that we know when all of the individual sub-titles f() complete.

there may be a simpler way (I'm sure there are, but I need to do other things), but maybe this gives you an idea of ​​what is possible:

 from multiprocessing import Pool, Value, RawArray, RLock from time import sleep class Latch: '''A countdown latch that lets us wait for a job of "n" parts''' def __init__(self, n): self.__counter = Value('i', n) self.__lock = RLock() def decrement(self): with self.__lock: self.__counter.value -= 1 print('dec', self.read()) return self.read() == 0 def read(self): with self.__lock: return self.__counter.value def join(self): while self.read(): sleep(1) def list_of_values(x): '''An expensive function''' print(x, ': thinking...') sleep(1) print(x, ': thought') return list(range(x)) pool = Pool() def async_f(x, on_complete=None): '''Return the sum of the values in the expensive list''' if x == 0: on_complete(0) # no list, return 0 else: n = x # need to know size of result beforehand latch = Latch(n) # wait for n entires to be calculated result = RawArray('i', n+1) # where we will assemble the map def delayed_map(values): '''This is the callback for the pool async process - it runs in a separate thread within this process once the expensive list has been calculated and orchestrates the mapping of f over the result.''' result[0] = x # first value in list is x for (v, i) in enumerate(values): def callback(fx, i=i): '''This is the callback passed to f() and is called when the function completes. If it is the last of all the calls in the map then it calls on_complete() (ie another instance of this function) for the calling f().''' result[i+1] = fx if latch.decrement(): # have completed list # at this point result contains [x]+map(f, ...) on_complete(sum(result)) # so return sum async_f(v, callback) # Ask worker to generate list then call delayed_map pool.apply_async(list_of_values, [x], callback=delayed_map) def run(): '''Tie into the same mechanism as above, for the final value.''' result = Value('i') latch = Latch(1) def final_callback(value): result.value = value latch.decrement() async_f(6, final_callback) latch.join() # wait for everything to complete return result.value print(run()) 

ps I am using python3.2, and the ugliness above is that we delay the calculation of the final results (returning to the tree) until a later time. perhaps something like generators or futures could make things easier.

In addition, I suspect that you need a cache to avoid unnecessarily recounting an expensive function when called with the same argument as before.

see also yaniv answer - parallel recursive function in python? - This is, apparently, an alternative way to change the order of assessment, indicating the depth.

+5
source

Thinking about this, I found a simple, not complete, but good enough answer:

 # a partially parallel solution , just do the first level of recursion in paralell. it might be enough work to fill all cores. import multiprocessing def f_helper(data): return f(x=data['x'],depth=data['depth'], recursion_depth=data['recursion_depth']) def f(x, depth, recursion_depth): if depth==0: return ... else : if recursion_depth == 0: pool = multiprocessing.Pool(processes=4) result = [x] + pool.map(f_helper, [{'x':_x, 'depth':depth-1, 'recursion_depth':recursion_depth+1 } _x in list_of_values(x)]) pool.close() else: result = [x] + map(f_helper, [{'x':_x, 'depth':depth-1, 'recursion_depth':recursion_depth+1 } _x in list_of_values(x)]) return result def list_of_values(x): # heavy compute, pure function 
+2
source

Source: https://habr.com/ru/post/1369396/


All Articles