Here is an MVar implementation specific to byte arrays (be sure to add your own package definition). From here it is trivial to write the input stream over the combined streams. I can send this too if required.
import java.nio.ByteBuffer; public final class MVar { private static enum State { EMPTY, ONE, MANY } private final Object lock; private State state; private byte b; private ByteBuffer bytes; private int length; public MVar() { lock = new Object(); state = State.EMPTY; } public final void put(byte b) { synchronized (lock) { while (state != State.EMPTY) { try { lock.wait(); } catch (InterruptedException e) {} } this.b = b; state = State.ONE; lock.notifyAll(); } } public final void put(byte[] bytes, int offset, int length) { if (length == 0) { return; } synchronized (lock) { while (state != State.EMPTY) { try { lock.wait(); } catch (InterruptedException e) {} } this.bytes = ByteBuffer.allocateDirect(length); this.bytes.put(bytes, offset, length); this.bytes.position(0); this.length = length; state = State.MANY; lock.notifyAll(); } } public final byte take() { synchronized (lock) { while (state == State.EMPTY) { try { lock.wait(); } catch (InterruptedException e) {} } switch (state) { case ONE: { state = State.EMPTY; byte b = this.b; lock.notifyAll(); return b; } case MANY: { byte b = bytes.get(); state = --length <= 0 ? State.EMPTY : State.MANY; lock.notifyAll(); return b; } default: throw new AssertionError(); } } } public final int take(byte[] bytes, int offset, int length) { if (length == 0) { return 0; } synchronized (lock) { while (state == State.EMPTY) { try { lock.wait(); } catch (InterruptedException e) {} } switch (state) { case ONE: bytes[offset] = b; state = State.EMPTY; lock.notifyAll(); return 1; case MANY: if (this.length > length) { this.bytes.get(bytes, offset, length); this.length = this.length - length; synchronized (lock) { lock.notifyAll(); } return length; } this.bytes.get(bytes, offset, this.length); this.bytes = null; state = State.EMPTY; length = this.length; lock.notifyAll(); return length; default: throw new AssertionError(); } } } }
ScootyPuff Mar 12 2018-10-12 14:47
source share