How can I use concurrent.futures and queues for a real-time script?

It is fairly easy to do concurrent work with the Python 3 module concurrent.futures , as shown below.

 with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: future_to = {executor.submit(do_work, input, 60): input for input in dictionary} for future in concurrent.futures.as_completed(future_to): data = future.result() 

It is also very convenient to insert and retrieve items in a queue.

 q = queue.Queue() for task in tasks: q.put(task) while not q.empty(): q.get() 

I have a script running in the background listening for updates. Now, theoretically, suppose that as these updates arrive, I would queue them up and work on them at the same time using ThreadPoolExecutor .

Now, individually, all of these components work in isolation and make sense, but how can I use them together? I don’t know if it is possible to feed ThreadPoolExecutor work from the queue in real time if the data for work from them is not defined?

In short, all I want to do is receive updates, say 4 messages per second, insert them into the queue and receive my concurrent.futures to work on them. If I do not, then I take a consistent approach, which is slow.

Take the canonical example in Python below:

 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data))) 

The list of URLS fixed. Is it possible to feed this list in real time and force the employee to process it as they become available, possibly from the queue for management purposes? I'm a little confused, is my approach really possible?

+6
source share
2 answers

example from Python docs extended to do their work from a queue. It should be noted that this code uses concurrent.futures.wait instead of concurrent.futures.as_completed to allow a new job to start waiting for another job to finish.

 import concurrent.futures import urllib.request import time import queue q = queue.Queue() URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] def feed_the_workers(spacing): """ Simulate outside actors sending in work to do, request each url twice """ for url in URLS + URLS: time.sleep(spacing) q.put(url) return "DONE FEEDING" def load_url(url, timeout): """ Retrieve a single page and report the URL and contents """ with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # start a future for a thread which sends work in through the queue future_to_url = { executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'} while future_to_url: # check for status of the futures which are currently working done, not_done = concurrent.futures.wait( future_to_url, timeout=0.25, return_when=concurrent.futures.FIRST_COMPLETED) # if there is incoming work, start a new future while not q.empty(): # fetch a url from the queue url = q.get() # Start the load operation and mark the future with its URL future_to_url[executor.submit(load_url, url, 60)] = url # process any completed futures for future in done: url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: if url == 'FEEDER DONE': print(data) else: print('%r page is %d bytes' % (url, len(data))) # remove the now completed future del future_to_url[future] 

The output from the following two url :

 'http://www.foxnews.com/' page is 67574 bytes 'http://www.cnn.com/' page is 136975 bytes 'http://www.bbc.co.uk/' page is 193780 bytes 'http://some-made-up-domain.com/' page is 896 bytes 'http://www.foxnews.com/' page is 67574 bytes 'http://www.cnn.com/' page is 136975 bytes DONE FEEDING 'http://www.bbc.co.uk/' page is 193605 bytes 'http://some-made-up-domain.com/' page is 896 bytes 'http://europe.wsj.com/' page is 874649 bytes 'http://europe.wsj.com/' page is 874649 bytes 
+5
source

At work, I discovered a situation where I wanted to do parallel work with an unlimited data stream. I created a small library inspired by the excellent answer already provided by Stephen Rauch.

Initially, I approached this problem by thinking of two separate threads, one of which sends the work to the queue, and the other controls the queue for the execution of any completed tasks and creates more space for new work. This is similar to what Stephen Rauch suggested, where he uses a thread using the feed_the_workers function feed_the_workers which runs in a separate thread.

Talking with one of my colleagues, he helped me understand that you can do everything in one thread if you define a buffered iterator that allows you to control the number of elements output from the input stream each time you are ready to send more work to the thread pool.

So we present the BufferedIter class

 class BufferedIter(object): def __init__(self, iterator): self.iter = iterator def nextN(self, n): vals = [] for _ in range(n): vals.append(next(self.iter)) return vals 

which allows us to define a stream processor as follows

 import logging import queue import signal import sys import time from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED level = logging.DEBUG log = logging.getLogger(__name__) handler = logging.StreamHandler(sys.stdout) handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) handler.setLevel(level) log.addHandler(handler) log.setLevel(level) WAIT_SLEEP = 1 # second, adjust this based on the timescale of your tasks def stream_processor(input_stream, task, num_workers): # Use a queue to signal shutdown. shutting_down = queue.Queue() def shutdown(signum, frame): log.warning('Caught signal %d, shutting down gracefully ...' % signum) # Put an item in the shutting down queue to signal shutdown. shutting_down.put(None) # Register the signal handler signal.signal(signal.SIGTERM, shutdown) signal.signal(signal.SIGINT, shutdown) def is_shutting_down(): return not shutting_down.empty() futures = dict() buffer = BufferedIter(input_stream) with ThreadPoolExecutor(num_workers) as executor: num_success = 0 num_failure = 0 while True: idle_workers = num_workers - len(futures) if not is_shutting_down(): items = buffer.nextN(idle_workers) for data in items: futures[executor.submit(task, data)] = data done, _ = wait(futures, timeout=WAIT_SLEEP, return_when=ALL_COMPLETED) for f in done: data = futures[f] try: f.result(timeout=0) except Exception as exc: log.error('future encountered an exception: %r, %s' % (data, exc)) num_failure += 1 else: log.info('future finished successfully: %r' % data) num_success += 1 del futures[f] if is_shutting_down() and len(futures) == 0: break log.info("num_success=%d, num_failure=%d" % (num_success, num_failure)) 

Below we show an example of how to use a stream processor.

 import itertools def integers(): """Simulate an infinite stream of work.""" for i in itertools.count(): yield i def task(x): """The task we would like to perform in parallel. With some delay to simulate a time consuming job. With a baked in exception to simulate errors. """ time.sleep(3) if x == 4: raise ValueError('bad luck') return x * x stream_processor(integers(), task, num_workers=3) 

The output for this example is shown below.

 2019-01-15 22:34:40,193 future finished successfully: 1 2019-01-15 22:34:40,193 future finished successfully: 0 2019-01-15 22:34:40,193 future finished successfully: 2 2019-01-15 22:34:43,201 future finished successfully: 5 2019-01-15 22:34:43,201 future encountered an exception: 4, bad luck 2019-01-15 22:34:43,202 future finished successfully: 3 2019-01-15 22:34:46,208 future finished successfully: 6 2019-01-15 22:34:46,209 future finished successfully: 7 2019-01-15 22:34:46,209 future finished successfully: 8 2019-01-15 22:34:49,215 future finished successfully: 11 2019-01-15 22:34:49,215 future finished successfully: 10 2019-01-15 22:34:49,215 future finished successfully: 9 ^C <=== THIS IS WHEN I HIT Ctrl-C 2019-01-15 22:34:50,648 Caught signal 2, shutting down gracefully ... 2019-01-15 22:34:52,221 future finished successfully: 13 2019-01-15 22:34:52,222 future finished successfully: 14 2019-01-15 22:34:52,222 future finished successfully: 12 2019-01-15 22:34:52,222 num_success=14, num_failure=1 
0
source

Source: https://habr.com/ru/post/1262788/


All Articles