Python asyncio: Running subprocess_exec in a workflow

Therefore, I use the Python asyncio module (on Linux) to start the child process and then track it asynchronously. My code works fine ... when launched in the main thread. But when I run it in the workflow, it hangs and the process_exited callback is never called.

I suspect that in fact it could be some kind of undocumented defect or a problem with running subprocess_exec in the worker thread, which is probably due to how the implementation processes the signals in the background thread. But it can also be just screwing.

A simple, reproducible example is as follows:

 class MyProtocol(asyncio.SubprocessProtocol): def __init__(self, done_future): super().__init__() self._done_future = done_future def pipe_data_received(self, fd, data): print("Received:", len(data)) def process_exited(self): print("PROCESS EXITED!") self._done_future.set_result(None) def run(loop): done_future = asyncio.Future(loop = loop) transport = None try: transport, protocol = yield from loop.subprocess_exec( lambda : MyProtocol(done_future), "ls", "-lh", stdin = None ) yield from done_future finally: if transport: transport.close() return done_future.result() def run_loop(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # bind event loop to current thread try: return loop.run_until_complete(run(loop)) finally: loop.close() 

So, here I set up the asyncio event asyncio to execute the ls -lh shell ls -lh , and then run the callback when the data is received from the subprocess, and another callback to exit the subprocess.

If I just call run_loop() directly in the main thread of the Python program, everything will be fine. But if I say:

 t = threading.Thread(target = run_loop) t.start() t.join() 

Then what happens is that the pipe_data_received() is called successfully, but process_exited() never called, and the program just hangs.

After Google googling and looking at the asyncio source code to implement unix_events.py , I found that you might need to manually link my event loop to the global child watcher object as follows:

 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # bind event loop to current thread asyncio.get_child_watcher().attach_loop(loop) 

Apparently, the child observer is an (undocumented) object that is responsible for calling waitpid under the hood (or something like that). But when I tried this and ran run_event_loop() in the background thread, I got an error:

  File "/usr/lib/python3.4/asyncio/unix_events.py", line 77, in add_signal_handler raise RuntimeError(str(exc)) RuntimeError: set_wakeup_fd only works in main thread 

So, it seems that the implementation actually does a check to make sure that the signal handlers can only be used in the main thread, which allows me to assume that in the current implementation, using subprocess_exec in the background thread is actually impossible without changing the Python source code .

Am I right about that? Unfortunately, the asyncio module asyncio very documented, so it's hard for me to be sure of my conclusion here. I can just do something wrong.

+5
source share
1 answer

Processing subprocesses in a workflow is fine as long as the asyncio loop with its child observer runs in the main thread:

 asyncio.get_child_watcher() loop = asyncio.get_event_loop() coro = loop.run_in_executor(None, run_loop) loop.run_until_complete(coro) 

See this post and documentation .

+4
source

Source: https://habr.com/ru/post/1269080/


All Articles