Python multiprocessing: file object synchronization

I am trying to create a file similar to the object that sys.stdout / sys.stderr should be assigned during testing to provide deterministic output. This does not mean to be fast, just reliable. The fact that I still almost work, but I need help to get rid of the latest errors of the edge case.

Here is my current implementation.

try: from cStringIO import StringIO except ImportError: from StringIO import StringIO from os import getpid class MultiProcessFile(object): """ helper for testing multiprocessing multiprocessing poses a problem for doctests, since the strategy of replacing sys.stdout/stderr with file-like objects then inspecting the results won't work: the child processes will write to the objects, but the data will not be reflected in the parent doctest-ing process. The solution is to create file-like objects which will interact with multiprocessing in a more desirable way. All processes can write to this object, but only the creator can read. This allows the testing system to see a unified picture of I/O. """ def __init__(self): # per advice at: # http://docs.python.org/library/multiprocessing.html#all-platforms from multiprocessing import Queue self.__master = getpid() self.__queue = Queue() self.__buffer = StringIO() self.softspace = 0 def buffer(self): if getpid() != self.__master: return from Queue import Empty from collections import defaultdict cache = defaultdict(str) while True: try: pid, data = self.__queue.get_nowait() except Empty: break cache[pid] += data for pid in sorted(cache): self.__buffer.write( '%s wrote: %r\n' % (pid, cache[pid]) ) def write(self, data): self.__queue.put((getpid(), data)) def __iter__(self): "getattr doesn't work for iter()" self.buffer() return self.__buffer def getvalue(self): self.buffer() return self.__buffer.getvalue() def flush(self): "meaningless" pass 

... and a quick test script:

 #!/usr/bin/python2.6 from multiprocessing import Process from mpfile import MultiProcessFile def printer(msg): print msg processes = [] for i in range(20): processes.append( Process(target=printer, args=(i,), name='printer') ) print 'START' import sys buffer = MultiProcessFile() sys.stdout = buffer for p in processes: p.start() for p in processes: p.join() for i in range(20): print i, print sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ print print 'DONE' print buffer.buffer() print buffer.getvalue() 

This works perfectly in 95% of cases, but it has three problems with the edge. I have to run a test script in a fast while loop to play them.

  • 3% of the time, the output of the parent process is not fully reflected. I assume this is due to the fact that the data is consumed before the thread with flushing queue can catch up. I have no way to wait for a thread without deadlock.
  • .5% of the time, there is a trace from the multiprocess.Queue implementation
  • .01% of the time, PIDs are wrapped, and so sorting by PID gives the wrong order.

In the worst case scenario (odds: one in 70 million), the result will look like this:

 START DONE 302 wrote: '19\n' 32731 wrote: '0 1 2 3 4 5 6 7 8 ' 32732 wrote: '0\n' 32734 wrote: '1\n' 32735 wrote: '2\n' 32736 wrote: '3\n' 32737 wrote: '4\n' 32738 wrote: '5\n' 32743 wrote: '6\n' 32744 wrote: '7\n' 32745 wrote: '8\n' 32749 wrote: '9\n' 32751 wrote: '10\n' 32752 wrote: '11\n' 32753 wrote: '12\n' 32754 wrote: '13\n' 32756 wrote: '14\n' 32757 wrote: '15\n' 32759 wrote: '16\n' 32760 wrote: '17\n' 32761 wrote: '18\n' Exception in thread QueueFeederThread (most likely raised during interpreter shutdown): Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner File "/usr/lib/python2.6/threading.py", line 484, in run File "/usr/lib/python2.6/multiprocessing/queues.py", line 233, in _feed <type 'exceptions.TypeError'>: 'NoneType' object is not callable 

In python2.7, the exception is slightly different:

 Exception in thread QueueFeederThread (most likely raised during interpreter shutdown): Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner File "/usr/lib/python2.7/threading.py", line 505, in run File "/usr/lib/python2.7/multiprocessing/queues.py", line 268, in _feed <type 'exceptions.IOError'>: [Errno 32] Broken pipe 

How to get rid of these cases?

+6
source share
2 answers

The solution received two parts. I successfully run a test program 200 thousand times without any changes in the output.

The easy part was using multiprocessing.current_process () ._ identity to sort messages. This is not part of the published API, but it is a unique, deterministic identifier for each process. This fixed a problem with PID bypasses and poor output order.

Another part of the solution was to use multiprocessing.Manager (). Queue (), not multiprocessing. Queue This fixes problem # 2 above because the manager lives in a separate process and therefore avoids some of the bad special cases when using the queue from the ownership process. # 3 is committed because the queue is completely exhausted and the feeder thread naturally dies before python starts closing and closing stdin.

+8
source

I encountered much less multiprocessing errors with Python 2.7 than with Python 2.6. Having said that, the solution I used to avoid the " Exception in thread QueueFeederThread " Exception in thread QueueFeederThread is sleep instantly, possibly 0.01 s, in every process that uses Queue . It is true that using sleep undesirable or even reliable, but a certain duration, as has been noticed, works quite well in practice for me. You can also try 0.1s.

0
source

Source: https://habr.com/ru/post/886899/


All Articles