You can use Queue
to return failures to Pool
through a loop at the beginning of the Process
:
import multiprocessing as mp import random def f(x): if random.getrandbits(1): # on failure / exception catch fqput(x) return None return x*x def f_init(q): fq = q def main(pending): total_items = len(pending) successful = [] failure_tracker = [] q = mp.Queue() p = mp.Pool(None, f_init, [q]) results = p.imap(f, pending) retry_results = [] while len(successful) < total_items: successful.extend([r for r in results if not r is None]) successful.extend([r for r in retry_results if not r is None]) failed_items = [] while not q.empty(): failed_items.append(q.get()) if failed_items: failure_tracker.append(failed_items) retry_results = p.imap(f, failed_items); p.close() p.join() print "Results: %s" % successful print "Failures: %s" % failure_tracker if __name__ == '__main__': main(range(1, 10))
The output is as follows:
Results: [1, 4, 36, 49, 25, 81, 16, 64, 9] Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]
A Pool
cannot be shared between multiple processes. Therefore, this approach is based on Queue
. If you try to pass the pool as a parameter to the pool processes, you will get this error:
NotImplementedError: pool objects cannot be passed between processes or pickled
You can also try a few quick tries in your f
function to avoid synchronization overhead. It really is a question of how soon your function needs to wait to retry, and how likely it is to succeed if it is immediately retried.
Old answer:. For completeness, here is my old answer, which is not as optimal as retransmission to the pool, but it may still matter depending on the use case, because it provides a natural way to handle / limit n
-level retries:
You can use Queue
to aggregate failures and resubmit at the end of each run, in a few runs:
import multiprocessing as mp import random def f(x): if random.getrandbits(1): # on failure / exception catch fqput(x) return None return x*x def f_init(q): fq = q def main(pending): run_number = 1 while pending: jobs = pending pending = [] q = mp.Queue() p = mp.Pool(None, f_init, [q]) results = p.imap(f, jobs) p.close() p.join() failed_items = [] while not q.empty(): failed_items.append(q.get()) successful = [r for r in results if not r is None] print "(%d) Succeeded: %s" % (run_number, successful) print "(%d) Failed: %s" % (run_number, failed_items) print pending = failed_items run_number += 1 if __name__ == '__main__': main(range(1, 10))
with the output as follows:
(1) Succeeded: [9, 16, 36, 81] (1) Failed: [2, 1, 5, 7, 8] (2) Succeeded: [64] (2) Failed: [2, 1, 5, 7] (3) Succeeded: [1, 25] (3) Failed: [2, 7] (4) Succeeded: [49] (4) Failed: [2] (5) Succeeded: [4] (5) Failed: []