I'm currently trying to write my own thread proxy (let me call it that), which can change the contents of a given input stream and produce a modified output if necessary. This requirement is really necessary, because sometimes I have to change the flows in my application (for example, compress data really "on the fly"). The following class is quite simple and uses internal buffering.
private static class ProxyInputStream extends InputStream { private final InputStream iStream; private final byte[] iBuffer = new byte[512]; private int iBufferedBytes; private final ByteArrayOutputStream oBufferStream; private final OutputStream oStream; private byte[] oBuffer = emptyPrimitiveByteArray; private int oBufferIndex; ProxyInputStream(InputStream iStream, IFunction<OutputStream, ByteArrayOutputStream> oStreamFactory) { this.iStream = iStream; oBufferStream = new ByteArrayOutputStream(512); oStream = oStreamFactory.evaluate(oBufferStream); } @Override public int read() throws IOException { if ( oBufferIndex == oBuffer.length ) { iBufferedBytes = iStream.read(iBuffer); if ( iBufferedBytes == -1 ) { return -1; } oBufferIndex = 0; oStream.write(iBuffer, 0, iBufferedBytes); oStream.flush(); oBuffer = oBufferStream.toByteArray(); oBufferStream.reset(); } return oBuffer[oBufferIndex++]; } }
Suppose we also have a test output stream that simply adds a space before each byte written ("abc" → "abc") as follows:
private static class SpacingOutputStream extends OutputStream { private final OutputStream outputStream; SpacingOutputStream(OutputStream outputStream) { this.outputStream = outputStream; } @Override public void write(int b) throws IOException { outputStream.write(' '); outputStream.write(b); } }
And the following testing method:
private static void test(final boolean useDeflater) throws IOException { final FileInputStream input = new FileInputStream(SOURCE); final IFunction<OutputStream, ByteArrayOutputStream> outputFactory = new IFunction<OutputStream, ByteArrayOutputStream>() { @Override public OutputStream evaluate(ByteArrayOutputStream outputStream) { return useDeflater ? new DeflaterOutputStream(outputStream) : new SpacingOutputStream(outputStream); } }; final InputStream proxyInput = new ProxyInputStream(input, outputFactory); final OutputStream output = new FileOutputStream(SOURCE + ".~" + useDeflater); int c; while ( (c = proxyInput.read()) != -1 ) { output.write(c); } output.close(); proxyInput.close(); }
This test method simply reads the contents of the file and writes it to another stream, which can probably be modified in some way. If the testing method works with useDeflater=false , the expected approach works fine, as expected. But if the test method is called with useDeflater set, it behaves really strange and just doesn't write anything (if you omit the 78 9C header). I suspect that the deflater class cannot be designed to fit the approach I like to use, but I have always believed that the ZIP format and deflate compression are designed to work on the fly.
I’m probably mistaken at some point with the specifics of the deflate compression algorithm. What did I really miss? .. Maybe there might be a different approach to write a “thread proxy” to behave the way I want it to work ... How can I compress data on the fly with streams only ?
Thanks in advance.
UPD: the following basic version works with deflater and an inflatable device:
public final class ProxyInputStream<OS extends OutputStream> extends InputStream { private static final int INPUT_BUFFER_SIZE = 512; private static final int OUTPUT_BUFFER_SIZE = 512; private final InputStream iStream; private final byte[] iBuffer = new byte[INPUT_BUFFER_SIZE]; private final ByteArrayOutputStream oBufferStream; private final OS oStream; private final IProxyInputStreamListener<OS> listener; private byte[] oBuffer = emptyPrimitiveByteArray; private int oBufferIndex; private boolean endOfStream; private ProxyInputStream(InputStream iStream, IFunction<OS, ByteArrayOutputStream> oStreamFactory, IProxyInputStreamListener<OS> listener) { this.iStream = iStream; oBufferStream = new ByteArrayOutputStream(OUTPUT_BUFFER_SIZE); oStream = oStreamFactory.evaluate(oBufferStream); this.listener = listener; } public static <OS extends OutputStream> ProxyInputStream<OS> proxyInputStream(InputStream iStream, IFunction<OS, ByteArrayOutputStream> oStreamFactory, IProxyInputStreamListener<OS> listener) { return new ProxyInputStream<OS>(iStream, oStreamFactory, listener); } @Override public int read() throws IOException { if ( oBufferIndex == oBuffer.length ) { if ( endOfStream ) { return -1; } else { oBufferIndex = 0; do { final int iBufferedBytes = iStream.read(iBuffer); if ( iBufferedBytes == -1 ) { if ( listener != null ) { listener.afterEndOfStream(oStream); } endOfStream = true; break; } oStream.write(iBuffer, 0, iBufferedBytes); oStream.flush(); } while ( oBufferStream.size() == 0 ); oBuffer = oBufferStream.toByteArray(); oBufferStream.reset(); } } return !endOfStream || oBuffer.length != 0 ? (int) oBuffer[oBufferIndex++] & 0xFF : -1; }
}