Exception propagation in PipedInputStream and PipedOutputStream

I have a data producer that runs in a separate thread and pushes the generated data into PipedOutputStreamwhich is connected to PipedInputStream. A link to this input stream is opened through an open API so that any client can use it. PipedInputStreamcontains a limited buffer that, if full, blocks the data producer. Basically, when a client reads data from an input stream, new data is generated by the data producer.

The problem is that the data producer may fail and throw an exception. But since the consumer is working in a separate thread, there is no good way to get an exception for the customer.

What I am doing is that I will catch this exception and close the input stream. This will lead to a message IOExceptionwith the message "Pipe closed" on the client side, but I really would like to give the client the true reason for this.

This is the rough code of my API:

public InputStream getData() {
    final PipedInputStream inputStream = new PipedInputStream(config.getPipeBufferSize());
    final PipedOutputStream outputStream = new PipedOutputStream(inputStream);

    Thread thread = new Thread(() -> {
        try {
          // Start producing the data and push it into output stream.
          // The production my fail and throw an Exception with the reason
        } catch (Exception e) {
            try {
                // What to do here?
                outputStream.close();
                inputStream.close();
            } catch (IOException e1) {
            }
        }
    });
    thread.start();

    return inputStream;
}

I have two ideas on how to fix this:

  • Store the exception in the parent object and throw it to the client through the API. I. e. if reading fails IOException, the client may request an API for this reason.
  • Extend / reimplement streams with streams so I can explain the reason for the method close(). Then IOException, created by the thread, may contain this reason as a message.

Any better ideas?

+4
source share
2 answers

, , GZip . PipedInputStream, FilterInputStream , .

final PipedInputStream in = new PipedInputStream();
final InputStreamWithFinalExceptionCheck inWithException = new InputStreamWithFinalExceptionCheck(in);
final PipedOutputStream out = new PipedOutputStream(in);
Thread thread = new Thread(() -> {
    try {
      // Start producing the data and push it into output stream.
      // The production my fail and throw an Exception with the reason
    } catch (final IOException e) {
        inWithException.fail(e);
    } finally {
        inWithException.countDown();
    }
});
thread.start();
return inWithException;

InputStreamWithFinalExceptionCheck

private static final class InputStreamWithFinalExceptionCheck extends FilterInputStream {
    private final AtomicReference<IOException> exception = new AtomicReference<>(null);
    private final CountDownLatch complete = new CountDownLatch(1);

    public InputStreamWithFinalExceptionCheck(final InputStream stream) {
        super(stream);
    }

    @Override
    public void close() throws IOException {
        try {
            complete.await();
            final IOException e = exception.get();
            if (e != null) {
                throw e;
            }
        } catch (final InterruptedException e) {
            throw new IOException("Interrupted while waiting for synchronised closure");
        } finally {
            stream.close();
        }
    }

    public void fail(final IOException e) {
        exception.set(Preconditions.checkNotNull(e));
    }

    public void countDown() {complete.countDown();}
}
+2

, fooobar.com/questions/1615690/..., CountDownLatch complete.await(), , InputStream , . , PipedOutpuStream, PipedOutputStream , try-finally-resource, , , 2 .

Supplier<InputStream> streamSupplier = new Supplier<InputStream>() {
        @Override
        public InputStream get() {
            final AtomicReference<IOException> osException = new AtomicReference<>();
            final CountDownLatch piped = new CountDownLatch(1);

            final PipedInputStream is = new PipedInputStream();

            FilterInputStream fis = new FilterInputStream(is) {
                @Override
                public void close() throws IOException {
                    try {
                        IOException e = osException.get();
                        if (e != null) {
                            //Exception thrown by the write will bubble up to InputStream reader
                            throw new IOException("IOException in writer", e);
                        }
                    } finally {
                        super.close();
                    }
                };
            };

            Thread t = new Thread(() -> {
                    try (PipedOutputStream os = new PipedOutputStream(is)) {
                        piped.countDown();
                        writeIozToStream(os, projectFile, dataFolder);
                    } catch (final IOException e) {
                        osException.set(e);
                    }
            });
            t.start();

            try {
                piped.await();
            } catch (InterruptedException e) {
                t.cancel();
                Thread.currentThread().interrupt();
            }

            return fis;
        }
    };

- -

try (InputStream is = streamSupplier.getInputStream()) {
     //Read stream in full 
}

, InputStream , PipedOutputStream, "Pipe closed" IOException.

complete.await() FilterInputStream close() complete.await()), (PipedInputStream , complete.await(), PipedOutputStream PipedInputStream awaitSpace)

+1

Source: https://habr.com/ru/post/1615690/


All Articles