Retrying python multiprocessing attempts

Is there a way to forward some of the data for processing if the original calculation failed using a simple pool?

import random from multiprocessing import Pool def f(x): if random.getrandbits(1): raise ValueError("Retry this computation") return x*x p = Pool(5) # If one of these f(x) calls fails, retry it with another (or same) process p.map(f, [1,2,3]) 
+6
source share
2 answers

If you can (or don't mind) try again immediately, use a decorator that wraps the function:

 import random from multiprocessing import Pool from functools import wraps def retry(f): @wraps(f) def wrapped(*args, **kwargs): while True: try: return f(*args, **kwargs) except ValueError: pass return wrapped @retry def f(x): if random.getrandbits(1): raise ValueError("Retry this computation") return x*x p = Pool(5) # If one of these f(x) calls fails, retry it with another (or same) process p.map(f, [1,2,3]) 
+9
source

You can use Queue to return failures to Pool through a loop at the beginning of the Process :

 import multiprocessing as mp import random def f(x): if random.getrandbits(1): # on failure / exception catch fqput(x) return None return x*x def f_init(q): fq = q def main(pending): total_items = len(pending) successful = [] failure_tracker = [] q = mp.Queue() p = mp.Pool(None, f_init, [q]) results = p.imap(f, pending) retry_results = [] while len(successful) < total_items: successful.extend([r for r in results if not r is None]) successful.extend([r for r in retry_results if not r is None]) failed_items = [] while not q.empty(): failed_items.append(q.get()) if failed_items: failure_tracker.append(failed_items) retry_results = p.imap(f, failed_items); p.close() p.join() print "Results: %s" % successful print "Failures: %s" % failure_tracker if __name__ == '__main__': main(range(1, 10)) 

The output is as follows:

 Results: [1, 4, 36, 49, 25, 81, 16, 64, 9] Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []] 

A Pool cannot be shared between multiple processes. Therefore, this approach is based on Queue . If you try to pass the pool as a parameter to the pool processes, you will get this error:

 NotImplementedError: pool objects cannot be passed between processes or pickled 

You can also try a few quick tries in your f function to avoid synchronization overhead. It really is a question of how soon your function needs to wait to retry, and how likely it is to succeed if it is immediately retried.


Old answer:. For completeness, here is my old answer, which is not as optimal as retransmission to the pool, but it may still matter depending on the use case, because it provides a natural way to handle / limit n -level retries:

You can use Queue to aggregate failures and resubmit at the end of each run, in a few runs:

 import multiprocessing as mp import random def f(x): if random.getrandbits(1): # on failure / exception catch fqput(x) return None return x*x def f_init(q): fq = q def main(pending): run_number = 1 while pending: jobs = pending pending = [] q = mp.Queue() p = mp.Pool(None, f_init, [q]) results = p.imap(f, jobs) p.close() p.join() failed_items = [] while not q.empty(): failed_items.append(q.get()) successful = [r for r in results if not r is None] print "(%d) Succeeded: %s" % (run_number, successful) print "(%d) Failed: %s" % (run_number, failed_items) print pending = failed_items run_number += 1 if __name__ == '__main__': main(range(1, 10)) 

with the output as follows:

 (1) Succeeded: [9, 16, 36, 81] (1) Failed: [2, 1, 5, 7, 8] (2) Succeeded: [64] (2) Failed: [2, 1, 5, 7] (3) Succeeded: [1, 25] (3) Failed: [2, 7] (4) Succeeded: [49] (4) Failed: [2] (5) Succeeded: [4] (5) Failed: [] 
+5
source

Source: https://habr.com/ru/post/920762/


All Articles