I am trying to create a file similar to the object that sys.stdout / sys.stderr should be assigned during testing to provide deterministic output. This does not mean to be fast, just reliable. The fact that I still almost work, but I need help to get rid of the latest errors of the edge case.
Here is my current implementation.
try: from cStringIO import StringIO except ImportError: from StringIO import StringIO from os import getpid class MultiProcessFile(object): """ helper for testing multiprocessing multiprocessing poses a problem for doctests, since the strategy of replacing sys.stdout/stderr with file-like objects then inspecting the results won't work: the child processes will write to the objects, but the data will not be reflected in the parent doctest-ing process. The solution is to create file-like objects which will interact with multiprocessing in a more desirable way. All processes can write to this object, but only the creator can read. This allows the testing system to see a unified picture of I/O. """ def __init__(self):
... and a quick test script:
#!/usr/bin/python2.6 from multiprocessing import Process from mpfile import MultiProcessFile def printer(msg): print msg processes = [] for i in range(20): processes.append( Process(target=printer, args=(i,), name='printer') ) print 'START' import sys buffer = MultiProcessFile() sys.stdout = buffer for p in processes: p.start() for p in processes: p.join() for i in range(20): print i, print sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ print print 'DONE' print buffer.buffer() print buffer.getvalue()
This works perfectly in 95% of cases, but it has three problems with the edge. I have to run a test script in a fast while loop to play them.
- 3% of the time, the output of the parent process is not fully reflected. I assume this is due to the fact that the data is consumed before the thread with flushing queue can catch up. I have no way to wait for a thread without deadlock.
- .5% of the time, there is a trace from the multiprocess.Queue implementation
- .01% of the time, PIDs are wrapped, and so sorting by PID gives the wrong order.
In the worst case scenario (odds: one in 70 million), the result will look like this:
START DONE 302 wrote: '19\n' 32731 wrote: '0 1 2 3 4 5 6 7 8 ' 32732 wrote: '0\n' 32734 wrote: '1\n' 32735 wrote: '2\n' 32736 wrote: '3\n' 32737 wrote: '4\n' 32738 wrote: '5\n' 32743 wrote: '6\n' 32744 wrote: '7\n' 32745 wrote: '8\n' 32749 wrote: '9\n' 32751 wrote: '10\n' 32752 wrote: '11\n' 32753 wrote: '12\n' 32754 wrote: '13\n' 32756 wrote: '14\n' 32757 wrote: '15\n' 32759 wrote: '16\n' 32760 wrote: '17\n' 32761 wrote: '18\n' Exception in thread QueueFeederThread (most likely raised during interpreter shutdown): Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner File "/usr/lib/python2.6/threading.py", line 484, in run File "/usr/lib/python2.6/multiprocessing/queues.py", line 233, in _feed <type 'exceptions.TypeError'>: 'NoneType' object is not callable
In python2.7, the exception is slightly different:
Exception in thread QueueFeederThread (most likely raised during interpreter shutdown): Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner File "/usr/lib/python2.7/threading.py", line 505, in run File "/usr/lib/python2.7/multiprocessing/queues.py", line 268, in _feed <type 'exceptions.IOError'>: [Errno 32] Broken pipe
How to get rid of these cases?