At work, I discovered a situation where I wanted to do parallel work with an unlimited data stream. I created a small library inspired by the excellent answer already provided by Stephen Rauch.
Initially, I approached this problem by thinking of two separate threads, one of which sends the work to the queue, and the other controls the queue for the execution of any completed tasks and creates more space for new work. This is similar to what Stephen Rauch suggested, where he uses a thread using the feed_the_workers function feed_the_workers which runs in a separate thread.
Talking with one of my colleagues, he helped me understand that you can do everything in one thread if you define a buffered iterator that allows you to control the number of elements output from the input stream each time you are ready to send more work to the thread pool.
So we present the BufferedIter class
class BufferedIter(object): def __init__(self, iterator): self.iter = iterator def nextN(self, n): vals = [] for _ in range(n): vals.append(next(self.iter)) return vals
which allows us to define a stream processor as follows
import logging import queue import signal import sys import time from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED level = logging.DEBUG log = logging.getLogger(__name__) handler = logging.StreamHandler(sys.stdout) handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) handler.setLevel(level) log.addHandler(handler) log.setLevel(level) WAIT_SLEEP = 1
Below we show an example of how to use a stream processor.
import itertools def integers(): """Simulate an infinite stream of work.""" for i in itertools.count(): yield i def task(x): """The task we would like to perform in parallel. With some delay to simulate a time consuming job. With a baked in exception to simulate errors. """ time.sleep(3) if x == 4: raise ValueError('bad luck') return x * x stream_processor(integers(), task, num_workers=3)
The output for this example is shown below.
2019-01-15 22:34:40,193 future finished successfully: 1 2019-01-15 22:34:40,193 future finished successfully: 0 2019-01-15 22:34:40,193 future finished successfully: 2 2019-01-15 22:34:43,201 future finished successfully: 5 2019-01-15 22:34:43,201 future encountered an exception: 4, bad luck 2019-01-15 22:34:43,202 future finished successfully: 3 2019-01-15 22:34:46,208 future finished successfully: 6 2019-01-15 22:34:46,209 future finished successfully: 7 2019-01-15 22:34:46,209 future finished successfully: 8 2019-01-15 22:34:49,215 future finished successfully: 11 2019-01-15 22:34:49,215 future finished successfully: 10 2019-01-15 22:34:49,215 future finished successfully: 9 ^C <=== THIS IS WHEN I HIT Ctrl-C 2019-01-15 22:34:50,648 Caught signal 2, shutting down gracefully ... 2019-01-15 22:34:52,221 future finished successfully: 13 2019-01-15 22:34:52,222 future finished successfully: 14 2019-01-15 22:34:52,222 future finished successfully: 12 2019-01-15 22:34:52,222 num_success=14, num_failure=1