Asynchronous bidirectional I / O for a child process

I am trying to figure out a generic way for asynchronous bi-directional I / O redirection of a child process. Basically, I would like to create an interactive child process awaiting input, and any output should be read. I tried experimenting with python.subprocess by creating a new python process. An example of a simplified basis that I tried to achieve is as follows

process = subprocess.Popen(['/usr/bin/python'],shell=False,stdin=subprocess.PIPE, stdout=subprocess.PIPE) while True: output = process.stdout.readline() print output input = sys.stdin.readline() process.stdin.write(input) 

and the execution of the above code fragment just hangs without output. I tried working with /usr/bash and /usr/bin/irb , but the result is still the same. I assume that buffered IO just doesn't gel well with IO redirection.

So my question is: is it possible to read the output of the child process without flushing the buffer or exiting the subprocess?

The following post mentions IPC sockets, but for this I would have to modify the child process, which might not be possible. Is there any other way to achieve it?

Note *** My ultimate goal is to create a server-side REPL process that can communicate with a remote web client. Although the above example is for Python, my ultimate goal is to complete all available REPL with a generic shell.


With some suggestion in the answers, I came up with the following

 #!/usr/bin/python import subprocess, os, select proc = subprocess.Popen(['/usr/bin/python'],shell=False,stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE) for i in xrange(0,5): inputready, outputready, exceptready = select.select([proc.stdout, proc.stderr],[proc.stdout, proc.stderr],[proc.stdout, proc.stderr],0) if not inputready: print "No Data", print inputready, outputready, exceptready for s in inputready: print s.fileno(),s.readline() proc.terminate() print "After Terminating" for i in xrange(0,5): inputready, outputready, exceptready = select.select([proc.stdout, proc.stderr],[proc.stdout, proc.stderr],[proc.stdout, proc.stderr],0) if not inputready: print "No Data", print inputready, outputready, exceptready for s in inputready: print s.fileno(),s.readline() 

Now, although the programs are not at an impasse, but unfortunately there is no way out. By executing the above code, I get

 No Data [] [] [] No Data [] [] [] No Data [] [] [] No Data [] [] [] No Data [] [] [] After Terminating No Data [] [] [] No Data [] [] [] No Data [] [] [] No Data [] [] [] No Data [] [] [] 

Just FYI, running python like

 /usr/bin/python 2>&1|tee test.out 

seems to be working fine.

I also came up with the 'C' code. But the result is no different.

 int kbhit() { struct timeval tv; fd_set fds; tv.tv_sec = tv.tv_usec = 0; FD_ZERO(&fds); FD_SET(STDIN_FILENO, &fds); select(STDIN_FILENO+1, &fds, NULL, NULL, &tv); return FD_ISSET(STDIN_FILENO, &fds); } void receive(char *str) { char ch; fprintf(stderr,"IN1\n"); if(!kbhit()) return; fprintf(stderr,"IN2\n"); fprintf(stderr,"%d\n",kbhit()); for(;kbhit() && (ch=fgetc(stdin))!=EOF;) { fprintf(stderr,"%c,%d",ch,kbhit()); } fprintf(stderr,"Done\n"); } int main(){ pid_t pid; int rv, pipeP2C[2],pipeC2P[2]; pipe(pipeP2C); pipe(pipeC2P); pid=fork(); if(pid){ dup2(pipeP2C[1],1); /* Replace stdout with out side of the pipe */ close(pipeP2C[0]); /* Close unused side of pipe (in side) */ dup2(pipeC2P[0],0); /* Replace stdin with in side of the pipe */ close(pipeC2P[1]); /* Close unused side of pipe (out side) */ setvbuf(stdout,(char*)NULL,_IONBF,0); /* Set non-buffered output on stdout */ sleep(2); receive("quit()\n"); wait(&rv); /* Wait for child process to end */ fprintf(stderr,"Child exited with a %d value\n",rv); } else{ dup2(pipeP2C[0],0); /* Replace stdin with the in side of the pipe */ close(pipeP2C[1]); /* Close unused side of pipe (out side) */ dup2(pipeC2P[1],1); /* Replace stdout with the out side of the pipe */ close(pipeC2P[0]); /* Close unused side of pipe (out side) */ setvbuf(stdout,(char*)NULL,_IONBF,0); /* Set non-buffered output on stdout */ close(2), dup2(1,2); /*Redirect stderr to stdout */ if(execl("/usr/bin/python","/usr/bin/python",NULL) == -1){ fprintf(stderr,"execl Error!"); exit(1); } } return 0; } 
+4
source share
5 answers

In the Python code you posted, you are not using the correct threads:

 inputready, outputready, exceptready = select.select( [proc.stdout, proc.stderr], # read list [proc.stdout, proc.stderr], # write list [proc.stdout, proc.stderr], # error list. 0) # time out. 

I have not tried to fix it, but I am sure that reading and writing to the same set of streams is incorrect.


There are several errors in your example. Firstly, the python executable that you run as a child process does not output. Secondly, there is a race condition, since you can call select() 5 times in a row before the child process will output, in which case you will kill the process before reading anything.

I fixed the three problems mentioned above (recording list, starting a process that produces an exit and race condition). Try this sample and see if it works for you:

 #!/usr/bin/python import subprocess, os, select, time path = "/usr/bin/python" proc = subprocess.Popen([path, "foo.py"], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) for i in xrange(0,5): time.sleep(1) inputready, outputready, exceptready = select.select( [proc.stdout, proc.stderr], [proc.stdin,], [proc.stdout, proc.stderr, proc.stdin], 0) if not inputready: print "No Data", print inputready, outputready, exceptready for s in inputready: print s.fileno(),s.readline() proc.terminate() print "After Terminating" for i in xrange(0,5): inputready, outputready, exceptready = select.select( [proc.stdout, proc.stderr], [proc.stdin,], [proc.stdout, proc.stderr, proc.stdin], 0) if not inputready: print "No Data", print inputready, outputready, exceptready for s in inputready: print s.fileno(),s.readline() 

The foo.py file that I used contained the following:

 #!/usr/bin/python print "Hello, world!" 

The next version (mostly remote redundant output to simplify reading the results):

 #!/usr/bin/python import subprocess, os, select, time path = "/usr/bin/python" proc = subprocess.Popen([path, "foo.py"], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) for i in xrange(0,5): time.sleep(1) inputready, outputready, exceptready = select.select( [proc.stdout, proc.stderr], [proc.stdin,], [proc.stdout, proc.stderr, proc.stdin], 0) for s in inputready: line = s.readline() if line: print s.fileno(), line proc.terminate() print "After Terminating" for i in xrange(0,5): time.sleep(1) inputready, outputready, exceptready = select.select( [proc.stdout, proc.stderr], [proc.stdin,], [proc.stdout, proc.stderr, proc.stdin], 0) for s in inputready: line = s.readline() if line: print s.fileno(), line 

Gives the following output:

5 Hello world!

After finishing

Note that for some reason, using the timeout parameter in select.select() did not produce the expected results on my system, and I resorted to using time.sleep() .


Just FYI, running python like

 /usr/bin/python 2>&1|tee test.out 

seems to be working fine.

You cannot get this effect because this example still gives the python interpreter control tty. Without a tty control, the python interpreter does not print the Python version and does not display the >>> prompt.

A close example might be the following. You can replace /dev/null with a file containing commands to send to the interpreter.

 /usr/bin/python </dev/null 2>&1|tee test.out 

If you redirect nothing but the control tty (keyboard) as standard input to the process, you will not get the output from the python interpreter. This is why your code is not working.

+1
source

There is another way to do this. You can, for example:

  • use SysV message queues and timeout polling in the queue to send a message
  • create for the child object and pipe () for the father and () for the father using the O_NONBLOCK flag, and then select () in the file descriptors to receive data (to even process timeouts if no data arrives)
  • use socket () AF_UNIX or AF_INET, install it without blocking and select () or epoll () to receive data.
  • mmap () MAP_SHARED memory segments and signal another process when data arrives, pay attention to the shared segment using the locking mechanism.

I wrote a sample in C with double pipes:

 #include <stdio.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <unistd.h> #include <sys/time.h> #include <sys/types.h> #include <sys/wait.h> #include <sys/stat.h> #include <sys/select.h> #include <fcntl.h> #include <signal.h> #define BUFLEN (6*1024) #define EXECFILE "/usr/bin/python" char *itoa(int n, char *s, int b) { static char digits[] = "0123456789abcdefghijklmnopqrstuvwxyz"; int i=0, sign; if ((sign = n) < 0) n = -n; do { s[i++] = digits[n % b]; } while ((n /= b) > 0); if (sign < 0) s[i++] = '-'; s[i] = '\0'; return s; } /* int set_nonblock(int sockfd) { // set socket to non blocking int arg,i; if ((arg=fcntl(sockfd, F_GETFL, NULL)) < 0) { printf("error getting socket flag for fd %i: fcntl(..., F_GETFL): %i\n", sockfd, errno); return -1; } // set O_NONBLOCK flag arg |= O_NONBLOCK; if ((i=fcntl(sockfd, F_SETFL, arg)) < 0) { printf("error setting socket flag for fd %i: fcntl(..., F_SETFL): %i\n", sockfd, errno); return -1; } return i; } int set_block(int sockfd) { // set socket to blocking int arg,i; if ((arg=fcntl(sockfd, F_GETFL, NULL)) < 0) { printf("error getting socket flag for fd %i: fcntl(..., F_GETFL): %i\n", sockfd, errno); return -1; } // clean O_NONBLOCK flag arg &= (~O_NONBLOCK); if ((i=fcntl(sockfd, F_SETFL, arg)) < 0) { printf("error setting socket flag for fd %i: fcntl(..., F_SETFL): %i\n", sockfd, errno); return -1; } return i; } */ int main() { FILE *input; char slice[BUFLEN]; int status = 0; pid_t pid; int err; int newfd; // if you want you can pass arguments to the program to execute // char *const arguments[] = {EXECFILE, "-v", NULL}; char *const arguments[] = {EXECFILE, NULL}; int father2child_pipefd[2]; int child2father_pipefd[2]; char *read_data = NULL; FILE *retclam; fd_set myset; int x=1; signal(SIGPIPE, SIG_IGN); newfd = dup(0); input = fdopen(newfd, "r"); pipe(father2child_pipefd); // Father speaking to child pipe(child2father_pipefd); // Child speaking to father pid = fork(); if (pid > 0) { // Father close(father2child_pipefd[0]); close(child2father_pipefd[1]); // Write to the pipe reading from stdin retclam = fdopen(child2father_pipefd[0], "r"); // set the two fd non blocking //set_nonblock(0); //set_nonblock(child2father_pipefd[0]); //set_nonblock(fileno(retclam)); while(x==1) { // clear the file descriptor set FD_ZERO(&myset); // add the stdin to the set FD_SET(fileno(input), &myset); // add the child pipe to the set FD_SET(fileno(retclam), &myset); // here we wait for data to arrive from stdin or from the child pipe. The last argument is a timeout, if you like err = select(fileno(retclam)+1, &myset, NULL, NULL, NULL); switch(err) { case -1: // Problem with select(). The errno variable knows why //exit(1); x=0; break; case 0: // timeout on select(). Data did not arrived in time, only valid if the last attribute of select() was specified break; default: // data is ready to be read bzero(slice, BUFLEN); if (FD_ISSET(fileno(retclam), &myset)) { // data ready on the child //set_block(fileno(retclam)); read_data = fgets(slice, BUFLEN, retclam); // read a line from the child (max BUFLEN bytes) //set_nonblock(fileno(retclam)); if (read_data == NULL) { //exit(0); x=0; break; } // write data back to stdout write (1, slice, strlen(slice)); if(feof(retclam)) { //exit(0); x=0; break; } break; } bzero(slice, BUFLEN); if (FD_ISSET(fileno(input), &myset)) { // data ready on stdin //printf("father\n"); //set_block(fileno(input)); read_data = fgets(slice, BUFLEN, input); // read a line from stdin (max BUFLEN bytes) //set_nonblock(fileno(input)); if (read_data == NULL) { //exit (0); close(father2child_pipefd[1]); waitpid(pid, &status, 0); //fclose(input); break; } // write data to the child write (father2child_pipefd[1], slice, strlen(slice)); /* if(feof(input)) { exit(0); }*/ break; } } } close(father2child_pipefd[1]); fclose(input); fsync(1); waitpid(pid, &status, 0); // child process terminated fclose (retclam); // Parse output data from child // write (1, "you can append somethind else on stdout if you like"); if (WEXITSTATUS(status) == 0) { exit (0); // child process exited successfully } } if (pid == 0) { // Child close (0); // stdin is not needed close (1); // stdout is not needed // Close the write side of this pipe close(father2child_pipefd[1]); // Close the read side of this pipe close(child2father_pipefd[0]); // Let read on stdin, but this stdin is associated to the read pipe dup2(father2child_pipefd[0], 0); // Let speak on stdout, but this stdout is associated to the write pipe dup2(child2father_pipefd[1], 1); // if you like you can put something back to the father before execve //write (child2father_pipefd[1], "something", 9); //fsync(child2father_pipefd[1]); err = execve(EXECFILE, arguments, NULL); // we'll never be here again after execve succeeded!! So we get here only if the execve() failed //fprintf(stderr, "Problem executing file %s: %i: %s\n", EXECFILE, err, strerror(errno)); exit (1); } if (pid < 0) { // Error exit (1); } fclose(input); return 0; } 
+1
source

I am using 2-way io in bash as follows:

 mkfifo hotleg mkfifo coldleg program <coldleg |tee hotleg & while read LINE; do case $LINE in *)call_a_function $LINE;; esac done <hotleg |tee coldleg & 

(note that you can simply ">" instead of tee, but you can see the result first)

0
source

It is assumed that your hypothesis that the buffered I / O is to blame is most likely true. The way you wrote your loop, the reading will be blocked until it fills the necessary buffer, and you cannot process any input until it returns. This can easily cause a dead end.

Popen.communicate deals with this by creating a stream to work with each channel and making sure that all data should be written to stdin, so that the actual record cannot be delayed while the file object is waiting for a buffer to fill in or for a file object that must be reset / closed. I think you could make a decision related to the work of threads if you need to, but this is not very asynchronous and probably not the easiest solution.

You can bypass python buffering without using the file objects provided by Popen for accessing pipes, and instead grab them fd using the fileno () method. Then you can use fd with os.read, os.write and select.select. The os.read and os.write functions will not perform buffering, but they will block until at least one byte is read. Before calling, make sure that the channel is read / write. The easiest way to do this is to use select.select () to wait for all the tubes you want to read / write, and make one read or write call for each channel, which will be ready when select () is selected. You should be able to find examples of selected loops when searching (they will probably use sockets instead of pipes, but the principle is the same). (Also, never read or write without first checking that it will not block, or you may encounter situations where you cause a deadlock with a child process. You must be prepared to read the data, even if you don’t have everything everything you want is written.)

0
source

If you need to manage a Python interpreter session, you'll probably be better off with

Btw in the latter case, the server can be launched anywhere, and PyScripter already has a working server module (the client module is in Pascal, you need to translate it).

0
source

Source: https://habr.com/ru/post/1399583/


All Articles