Asyncio: Wait for events from another topic

I am developing a Python application that needs to access a machine to perform some (long) tasks. An asynchronous module seems like a good choice for anything connected to the network, but now I need to access the serial port for one specific component. I have implemented some level of abstraction for the actual material of the serial port, but I cannot figure out how to intelligently integrate it with asynchronous.

Next setup: I have a loop loop that regularly talks to the machine and decodes the responses. Using the enqueue_query() method, I can put a query string in a queue, which will then be sent to the computer by another thread and will trigger a response. By passing to threading.Event (or something with the set() method), the caller can perform a pending response block. It might look something like this:

 f = threading.Event() ch.enqueue_query('2 getnlimit', f) f.wait() print(ch.get_query_responses()) 

Now my goal is to put these lines in a coroutine and handle the asynchronous wait so that the application can do something else. How can i do this? This will probably work by wrapping f.wait() in Executor, but it seems a little silly as it will create a new thread to wait for another thread to do something.

Thanks! Best regards, Philipp

+5
source share
2 answers

By passing to threading.Event (or something using the set() method), the caller can execute a blocking response.

Given the above behavior of your request function, you only need the streaming version of asyncio.Event . These are just 3 lines of code:

 import asyncio class Event_ts(asyncio.Event): #TODO: clear() method def set(self): #FIXME: The _loop attribute is not documented as public api! self._loop.call_soon_threadsafe(super().set) 

Functionality Test:

 def threaded(event): import time while True: event.set() time.sleep(1) 

 async def main(): import threading e = Event_ts() threading.Thread(target=threaded, args=(e,)).start() while True: await e.wait() e.clear() print('whatever') 

 asyncio.ensure_future(main()) asyncio.get_event_loop().run_forever() 
+4
source

The easiest way to do what you suggested is to wrap the f.wait() call in executor:

 @asyncio.coroutine def do_enqueue(): f = threading.Event() ch.enqueue_query('2 getnlimit', f) yield from loop.run_in_executor(None, f.wait) print(ch.get_query_responses()) 

You bear the overhead of starting a thread pool (at least for the first call, the pool will remain in memory from this point forward), but any solution that provides an implementation of something like threading.Event() safe blocking and non-blocking APIs, without relying to any background threads inside, would be a little more useful.

+2
source

Source: https://habr.com/ru/post/1233195/