Asynchronous Subprocess on Windows

First of all, the general problem that I am solving is a bit more complicated than what I am showing here, so please do not tell me to “use threads with blocking”, as this will not solve my real situation without fair, FAIR-bit rewriting and refactoring.

I have several applications that are not mine for change that take data from stdin and push it to stdout after their magic. My task is to link several of these programs. The problem is that sometimes they choke, and so I need to track their progress, which is displayed on STDERR.

pA = subprocess.Popen(CommandA, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # ... some more processes make up the chain, but that is irrelevant to the problem pB = subprocess.Popen(CommandB, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=pA.stdout ) 

Now, reading directly through pA.stdout.readline () and pB.stdout.readline () or the plain read () function is a blocking problem. Since different applications are displayed in different steps and in different formats, locking is not an option. (And, as I wrote above, streams are not an option if in the final, last resort.) pA.communicate() is safe from the point of view of deadlocks, but since I need live information, this is not an option either.

So google led me to this on ActiveState.

At first everything is fine until I implement it. Comparing the output of cmd.exe pA.exe | pB.exe pA.exe | pB.exe , ignoring the fact that both outputs in the same window create a mess, I see very instant updates. However, I am implementing the same thing using the above snippet and the read_some() function declared there, and it takes more than 10 seconds to notify read_some() of updates to one channel. But when this happens, he has updates that lead to success, for example, up to 40%.

Thus, I do some more research and see numerous topics regarding PeekNamedPipe, anonymous descriptors and returns 0 bytes, even if there is information available in the pipe. Since the subject proved that he far surpasses my experience for fixing or coding, I came to Stack Overflow to find a guide. :)

My W7 platform is 64-bit with Python 2.6, 32-bit applications, if that matters, and Unix compatibility is not a concern. I can even deal with the full ctypes or pywin32 solution, which completely undermines the subprocess if it is the only solution, if I can read from any stderr pipe asynchronously with immediate performance and without deadlocks. :)

+4
source share
4 answers

How bad is it to use streams? I ran into the same problem and ended up deciding to use streams to collect all the data based on the stdout and stderr sub-processes and put it in a thread-safe queue that the main thread can read in a blocking way, without having to worry about the threads happening behind backstage.

It is unclear what kind of problems you expect with a solution based on threads and locking. Are you worried about making the rest of the code safe? This should not be a problem since the I / O stream should not interact with the rest of your code or data. If you have very strict memory requirements, or your pipeline is especially long, you might feel miserable to have created so many threads. I don’t know enough about your situation, so I can’t say if this can be a problem, but it seems to me that since you are already creating additional processes, several threads for interacting with them should not be a terrible load. In my situation, I did not find these I / O topics particularly problematic.

My stream function looked something like this:

 def simple_io_thread(pipe, queue, tag, stop_event): """ Read line-by-line from pipe, writing (tag, line) to the queue. Also checks for a stop_event to give up before the end of the stream. """ while True: line = pipe.readline() while True: try: # Post to the queue with a large timeout in case the # queue is full. queue.put((tag, line), block=True, timeout=60) break except Queue.Full: if stop_event.isSet(): break continue if stop_event.isSet() or line=="": break pipe.close() 

When I run the subprocess, I do this:

 outputqueue = Queue.Queue(50) stop_event = threading.Event() process = subprocess.Popen( command, cwd=workingdir, env=env, shell=useshell, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stderr_thread = threading.Thread( target=simple_io_thread, args=(process.stderr, outputqueue, "STDERR", stop_event) ) stdout_thread = threading.Thread( target=simple_io_thread, args=(process.stdout, outputqueue, "STDOUT", stop_event) ) stderr_thread.daemon = True stdout_thread.daemon = True stderr_thread.start() stdout_thread.start() 

Then, when I want to read, I can simply block the output at the output - each element read from it contains either a line to determine which channel it comes from, and a line of text from this channel. Very small code works in a separate thread, and it only communicates with the main thread through a thread-safe queue (plus an event in case I need to refuse early). Perhaps this approach would be useful and allow you to solve the thread and lock problem, but without having to rewrite a lot of code?

(My solution gets more complicated because I sometimes want to finish the subprocesses earlier and want to make sure that the threads all run out. If this is not a problem, you can get rid of all the stop_event stuff and it will become quite compressed.)

+4
source

I assume that the process pipeline will not be inhibited if it uses only stdin and stdout; and the problem you are trying to solve is how to make it non-deadlock if they write stderr (and have to deal with stderr, possibly backup).

If you allow multiple processes to write to stderr, you should watch for their mixing. I guess you figured it out that way; just putting it there to be sure.

Keep in mind the -u flag for python; this is useful when testing to check if OS buffering is causing you to.

If you want to emulate select () on file descriptors in win32, your only choice is to use PeekNamedPipe () and friends. I have a piece of code that reads linear output from several processes at the same time, which you can even use directly - try passing it a list of proc.stderr handlers and go.

 class NoLineError(Exception): pass class NoMoreLineError(Exception): pass class LineReader(object): """Helper class for multi_readlines.""" def __init__(self, f): self.fd = f.fileno() self.osf = msvcrt.get_osfhandle(self.fd) self.buf = '' def getline(self): """Returns a line of text, or raises NoLineError, or NoMoreLineError.""" try: _, avail, _ = win32pipe.PeekNamedPipe(self.osf, 0) bClosed = False except pywintypes.error: avail = 0 bClosed = True if avail: self.buf += os.read(self.fd, avail) idx = self.buf.find('\n') if idx >= 0: ret, self.buf = self.buf[:idx+1], self.buf[idx+1:] return ret elif bClosed: if self.buf: ret, self.buf = self.buf, None return ret else: raise NoMoreLineError else: raise NoLineError def multi_readlines(fs, timeout=0): """Read lines from |fs|, a list of file objects. The lines come out in arbitrary order, depending on which files have output available first.""" if type(fs) not in (list, tuple): raise Exception("argument must be a list.") objs = [LineReader(f) for f in fs] for i,obj in enumerate(objs): obj._index = i while objs: yielded = 0 for i,obj in enumerate(objs): try: yield (obj._index, obj.getline()) yielded += 1 except NoLineError: #time.sleep(timeout) pass except NoMoreLineError: del objs[i] break # Because we mutated the array if not yielded: time.sleep(timeout) pass 

I have never seen "Peek return 0 bytes, even though the data is available." If this happens to others, I am sure their libc will buffer their stdout / stderr before sending data to the OS; you cannot do anything from without. You must force the application to use unbuffered output in some way (-u in python; win32 / libc calls to change the file descriptor stderr ...)

The fact that you don’t see anything, and then a lot of updates, makes me think that your problem is buffered at the original end. win32 libc can buffer differently if it writes to a pipe and not to the console. Again, the best you can do from outside these programs is to aggressively deplete their output.

+1
source

How about using Twisted FD? http://twistedmatrix.com/documents/8.1.0/api/twisted.internet.fdesc.html

It is not asynchronous, but non-blocking. For asynchronous use, can you use the Twisted port?

0
source

Can't you just do non-blocking reads from pA.stdout and pB.stdout? Will you have a closed employment cycle?

Non-blocking reading on .PIPE subprocess in python

0
source

Source: https://habr.com/ru/post/1305677/


All Articles