How to create continuous read without blocking from `stdin`?

I have one process that was created as follows:

p = subprocess.Popen(args = './myapp', stdin = subprocess.PIPE, stdout = subprocess.PIPE, universal_newlines=True) 

Later I try to write p stdin :

 p.stdin.write('my message\n') 

The myapp process has the following settings:

 q = queue.Queue() def get_input(): for line in iter(sys.stdin.readline, ''): q.put(line) sys.stdin.close() threading.Thread(name = 'input-getter', target = get_input).start() 

And he tries to read newlines continuously, for example:

 try: print('input:', q.get_nowait()) except Empty: print('no input') 

Unfortunately, the subprocess never receives any of my messages. Of course, when I use:

 p.communicate('my message\n') 

the subprocess receives the message, but, as expected, the communicate method closes p stdin , so communication no longer occurs.

+6
source share
5 answers
 p = subprocess.Popen(args = './myapp', stdin = subprocess.PIPE, stdout = subprocess.PIPE, universal_newlines=True) while p.poll() is None: data = p.stdout.readline() 

This will create a non-blocking read of your process until the process terminates. However, there are some warnings you need to know here. For example, if you also typed stderr but didn’t read it. Then you will most likely fill up a buffer or two, and you will hang up the program anyway. Therefore, always make sure that you delete any buffer I / O operations when performing manual actions.

A better alternative would be to use select.epoll() , if possible, it is only available on Unix systems, but gives you much better performance and error handling :)

 epoll = select.epoll() epoll.register(p.stdout.fileno(), select.EPOLLHUP) # Use select.EPOLLIN for stdin. for fileno, event in epoll.poll(1): if fileno == p.stdout.fileno(): # ... Do something ... 

NOTE. . Remember that whenever a process is waiting for input, it usually indicates this via stdout , so you still register stdout with select.epoll to check for “waiting for input.” You can register select.EPOLLIN to check if the input is entered, but I see almost no sense, because remember that what you want to enter into the process, which you should already know about, is "happening".

Checking if the input process is waiting

You can use select.epoll to check if the input process is waiting or not, without blocking the execution of your application using the example above. But there are better alternatives.

Pexpect is one library that works really well and works with SSH , for example.

It is slightly different from the subprocess, but may be a good alternative.

Getting subprocess.popen to work with SSH

I redirect to another question + if this is what you need (because SSH generates stdin secure manner.

Python + SSH authorization password (no external libraries or public / private keys)?

+7
source

I think you maybe just don’t see the result of what is happening. Here is a complete example that seems to work on my mailbox if I don't completely understand what you want. The main change I made was to set stdout for p in sys.stdout instead of subprocess.PIPE . Perhaps I do not understand the essence of your question, and this bit is crucial ...

Here's the full code and output:

In the process of sending (testing) (I called it test_comms.py). I'm on Windows now, so sorry .bat :

 import time import subprocess import sys # Note I'm sending stdout to sys.stdout for observation purposes p = subprocess.Popen(args = 'myapp.bat', stdin = subprocess.PIPE, stdout = sys.stdout, universal_newlines=True) #Send 10 messages to the process stdin, 1 second apart for i in range(10): time.sleep(1) p.stdin.write('my message\n') 

myapp.bat is trivial:

 echo "In the bat cave (script)" python myapp.py 

myapp.py contains (using Queue , not Queue - the current Python 2 environment):

 import Queue from Queue import Empty import threading import sys import time def get_input(): print("Started the listening thread") for line in iter(sys.stdin.readline, ''): print("line arrived to put on the queue\n") q.put(line) sys.stdin.close() print("Hi, I'm here via popen") q = Queue.Queue() threading.Thread(name = 'input-getter', target = get_input).start() print("stdin listener Thread created and started") # Read off the queue - note it being filled asynchronously based on # When it receives messages. I set the read interval below to 2 seconds # to illustrate the queue filling and emptying. while True: time.sleep(2) try: print('Queue size is',q.qsize()) print('input:', q.get_nowait()) except Empty: print('no input') print("Past my end of code...") 

Output:

 D:\>comms_test.py D:\>echo "In the bat cave (script)" "In the bat cave (script)" D:\>python myapp.py Hi, I'm here via popen Started the listening threadstdin listener Thread created and started line arrived to put on the queue line arrived to put on the queue ('Queue size is', 2) ('input:', 'my message\n') line arrived to put on the queue line arrived to put on the queue ('Queue size is', 3) ('input:', 'my message\n') line arrived to put on the queue line arrived to put on the queue ('Queue size is', 4) ('input:', 'my message\n') line arrived to put on the queue line arrived to put on the queue ('Queue size is', 5) ('input:', 'my message\n') line arrived to put on the queue line arrived to put on the queue D:\>('Queue size is', 6) ('input:', 'my message\n') ('Queue size is', 5) ('input:', 'my message\n') ('Queue size is', 4) ('input:', 'my message\n') ('Queue size is', 3) ('input:', 'my message\n') ('Queue size is', 2) ('input:', 'my message\n') ('Queue size is', 1) ('input:', 'my message\n') ('Queue size is', 0) no input ('Queue size is', 0) no input ('Queue size is', 0) no input 
+2
source

In order for everything to work fine, you must clear the output in the main process ( p.stdout ) and the subprocess ( sys.stdout ).

communicate performs both operations:

  • it closes p.stdin when closing
  • waiting for sys.stdout to clear (just before exiting)

main.py work main.py

 import subprocess,time import sys p = subprocess.Popen(args = ['python3', './myapp.py'], stdin = subprocess.PIPE, stdout = subprocess.PIPE, universal_newlines=True) time.sleep(0.5) p.stdin.write('my message\n') p.stdin.flush() #print("ici") for i,l in enumerate(iter(p.stdout.readline, ''),start=1): print("main:received:",i,repr(l)) if i == 6: break print("mainprocess:send:other message n°{}".format(i)) p.stdin.write("other message n°{}\n".format(i)) p.stdin.flush() print("main:waiting for subprocess") p.stdin.close() p.wait() 

example myapp.py import queue, threads, sys, time, rpdb

 q = queue.Queue() def get_input(): for line in iter(sys.stdin.readline, ''): q.put(line) sys.stdin.close() threading.Thread(name = 'input-getter', target = get_input).start() for i in range(6): try: l= q.get_nowait() print('myapp:input:', l,end="") sys.stdout.flush() except queue.Empty: print("myapp:no input") sys.stdout.flush() time.sleep(1) 

result:

 main:received: 1 'myapp:no input\n' mainprocess:send:other message n°1 main:received: 2 'myapp:input: my message\n' mainprocess:send:other message n°2 main:received: 3 'myapp:input: other message n°1\n' mainprocess:send:other message n°3 main:received: 4 'myapp:no input\n' mainprocess:send:other message n°4 main:received: 5 'myapp:input: other message n°2\n' mainprocess:send:other message n°5 main:received: 6 'myapp:input: other message n°3\n' main:waiting for subprocess 
+2
source

Trying to investigate your program, I wrote my own program "constantly load things into a cat and catch what it returns." I did not implement its subprocess, but, hopefully, the structure is similar.

This line is very strange about your program ...

 for line in iter(sys.stdin.readline, ''): q.put(line) sys.stdin.close() 

It looks very strange like

 for line in stdin: q.put(line) 

Please note that the cycle ends when the pipe closes, and after that there is no need to re-close it.

If you need to read stdin continuously asynchronously, you should be able to build a read stream close to child_reader in the code below. Just replace child.stdout with stdin .

 import subprocess import threading import random # We may need to guard this? child = subprocess.Popen('cat', stdout=subprocess.PIPE, stdin=subprocess.PIPE) # Continuously print what the process outputs... def print_child(): for line in child.stdout: print(line) child_reader = threading.Thread(target = print_child) child_reader.start() for i in range(10000): chars = 'ABC\n' child.stdin.write(random.choice(chars).encode()) # Send EOF. # This kills the cat. child.stdin.close() # I don't think order matters here? child.wait() child_reader.join() 
+1
source

I wrote a program that does ... basically everything related to IO asynchronously. It reads the input in the stream, it outputs to the stream, it creates a process and communicates with this process in the stream.

I'm not sure what exactly your program should execute, but hopefully this code will execute it.

 # Asynchronous cat program! # Asynchronously read stdin # Pump the results into a threadsafe queue # Asynchronously feed the contents to cat # Then catch the output from cat and print it # Thread all the things import subprocess import threading import queue import sys my_queue = queue.Queue() # Input! def input_method(): for line in sys.stdin: # End on EOF if line == 'STOP\n': # Also end on STOP break my_queue.put(line) input_thread = threading.Thread(target=input_method) input_thread.start() print ('Input thread started') # Subprocess! cat_process = subprocess.Popen('cat', stdout=subprocess.PIPE, stdin=subprocess.PIPE) print ('cat process started') queue_alive = True # Continuously dump the queue into cat def queue_dump_method(): while queue_alive: try: line = my_queue.get(timeout=2) cat_process.stdin.write(line.encode()) cat_process.stdin.flush() # For some reason, we have to manually flush my_queue.task_done() # Needed? except queue.Empty: pass queue_dump_thread = threading.Thread(target = queue_dump_method) queue_dump_thread.start() print ('Queue dump thread started') # Output! def output_method(): for line in cat_process.stdout: print(line) output_thread = threading.Thread(target=output_method) output_thread.start() print ('Output thread started') # input_thread will die when we type STOP input_thread.join() print ('Input thread joined') # Now we wait for the queue to finish processing my_queue.join() print ('Queue empty') queue_alive = False queue_dump_thread.join() print ("Queue dump thread joined") # Send EOF to cat cat_process.stdin.close() # This kills the cat cat_process.wait() print ('cat process done') # And make sure we're done outputting output_thread.join() print ('Output thread joined') 
+1
source

Source: https://habr.com/ru/post/987141/


All Articles