diff --git a/vpro-shared-util/src/main/java/nl/vpro/util/Copier.java b/vpro-shared-util/src/main/java/nl/vpro/util/Copier.java index d804b493f..2b7ee5cf9 100644 --- a/vpro-shared-util/src/main/java/nl/vpro/util/Copier.java +++ b/vpro-shared-util/src/main/java/nl/vpro/util/Copier.java @@ -22,6 +22,7 @@ public class Copier implements Runnable, Closeable { private boolean ready; + private Throwable expection; private long count; private final InputStream input; @@ -32,7 +33,6 @@ public class Copier implements Runnable, Closeable { private final Consumer batchConsumer; private Future future; private final String name; - private final Object notify; @@ -107,26 +107,27 @@ public void run() { } } } catch (IOException ioe) { - if (errorHandler != null) { - errorHandler.accept(this, ioe); - } + expection = ioe; + log.debug(ioe.getMessage()); } catch (Throwable t) { if (! CommandExecutor.isBrokenPipe(t)) { log.warn("{}Connector {}\n{} {}", logPrefix(), toString(), t.getClass().getName(), t.getMessage()); } - if (errorHandler != null) { - errorHandler.accept(this, t); - } - + expection = t; } finally { synchronized (this) { ready = true; + // The copier is ready, but resulted some error, the user requested to be called back, so do that now + if (errorHandler != null && expection != null) { + errorHandler.accept(this, expection); + } log.debug("{}notifying listeners", logPrefix()); } if (callback != null) { callback.accept(this); } synchronized (this) { + // ready now, notify threads waiting notifyAll(); } @@ -142,10 +143,34 @@ public void waitFor() throws InterruptedException { } public boolean isReady() { + try { + return isReadyIOException(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + public boolean isReadyIOException() throws IOException { + if (ready) { + throwIOExceptionIfNeeded(); + } return ready; } + private void throwIOExceptionIfNeeded() throws IOException { + if (expection != null) { + if (expection instanceof IOException) { + throw (IOException) expection; + } else { + throw new IOException(expection); + } + } + } + + /** + * Returns the number of bytes read from the input stream so far + */ public long getCount() { return count; } diff --git a/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java b/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java index 81e7f989a..d10575df7 100644 --- a/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java +++ b/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java @@ -1,7 +1,6 @@ package nl.vpro.util; -import lombok.Getter; -import lombok.SneakyThrows; +import lombok.*; import lombok.extern.slf4j.Slf4j; import java.io.*; @@ -21,6 +20,8 @@ import org.slf4j.LoggerFactory; import org.slf4j.event.Level; +import com.google.common.annotations.VisibleForTesting; + import nl.vpro.logging.Slf4jHelper; /** @@ -44,6 +45,8 @@ public class FileCachingInputStream extends InputStream { private static final int EOF = -1; private final Copier copier; private final byte[] buffer; + @Getter(AccessLevel.PACKAGE) + @VisibleForTesting private final int bufferLength; @@ -297,34 +300,20 @@ private FileCachingInputStream( @Override public int read() throws IOException { - try { - if (tempFileInputStream == null) { - // the stream was small, we are reading from the memory buffer - return readFromBuffer(); - } else { - return readFromFile(); - } - } catch (IOException ioe) { - close(); - future.completeExceptionally(ioe); - throw ioe; + if (tempFileInputStream == null) { + // the stream was small, we are reading from the memory buffer + return readFromBuffer(); + } else { + return readFromFile(); } } @Override public int read(@NonNull byte[] b) throws IOException { - try { - if (tempFileInputStream == null) { - return readFromBuffer(b); - } else { - return readFromFile(b); - } - } catch (IOException ioe) { - if (! closed) { - close(); - } - future.completeExceptionally(ioe); - throw ioe; + if (tempFileInputStream == null) { + return readFromBuffer(b); + } else { + return readFromFile(b); } } @@ -374,6 +363,11 @@ public String toString() { return super.toString() + " for " + tempFile; } + + /** + * Wait until the copier thread read at least the number of bytes given. + * + */ public synchronized long waitForBytesRead(int atLeast) throws InterruptedException { if (copier != null) { copier.executeIfNotRunning(); @@ -386,15 +380,24 @@ public synchronized long waitForBytesRead(int atLeast) throws InterruptedExcepti } } + /** + * Returns the number of bytes consumed from the input stream so far + */ public long getCount() { return copier == null ? bufferLength : copier.getCount(); } + /** + * If a temp file is used for buffering, you can may obtain it. + */ public Path getTempFile() { return tempFile; } + /** + * One of the paths of {@link #read()}, when it is reading from memory. + */ private int readFromBuffer() { if (count < bufferLength) { int result = buffer[(int) count++]; @@ -408,6 +411,9 @@ private int readFromBuffer() { } } + /** + * One of the paths of {@link #read(byte[])} )}, when it is reading from memory. + */ private int readFromBuffer(byte[] b) { int toCopy = Math.min(b.length, bufferLength - (int) count); if (toCopy > 0) { @@ -419,12 +425,17 @@ private int readFromBuffer(byte[] b) { } } + + /** + * + * See {@link InputStream#read()} This methods must behave exactly according to that. + */ private int readFromFile() throws IOException { copier.executeIfNotRunning(); int result = tempFileInputStream.read(); while (result == EOF) { synchronized (copier) { - while (!copier.isReady() && result == EOF) { + while (!copier.isReadyIOException() && result == EOF) { log.debug("Copier {} not yet ready", copier.logPrefix()); // copier is still busy, wait a second, and try again. try { @@ -437,7 +448,7 @@ private int readFromFile() throws IOException { } result = tempFileInputStream.read(); } - if (copier.isReady() && result == EOF) { + if (copier.isReadyIOException() && result == EOF) { // the copier did not return any new results // don't increase count but return now. return EOF; @@ -452,9 +463,13 @@ private int readFromFile() throws IOException { return result; } + /** + * + * See {@link InputStream#read(byte[])} This methods must behave exactly according to that. + */ private int readFromFile(byte[] b) throws IOException { copier.executeIfNotRunning(); - if (copier.isReady() && count == copier.getCount()) { + if (copier.isReadyIOException() && count == copier.getCount()) { return EOF; } int totalResult = 0; @@ -463,7 +478,7 @@ private int readFromFile(byte[] b) throws IOException { if (totalResult == 0) { - while (!copier.isReady() && totalResult == 0) { + while (!copier.isReadyIOException() && totalResult == 0) { log.debug("Copier {} not yet ready", copier.logPrefix()); try { copier.wait(1000); @@ -473,13 +488,17 @@ private int readFromFile(byte[] b) throws IOException { throw new InterruptedIOException(e.getMessage()); } int subResult = Math.max(tempFileInputStream.read(b, totalResult, b.length - totalResult), 0); - totalResult += subResult; + totalResult += subResult; + } + if (totalResult == 0) { + // I doubt this can happen + return EOF; } } count += totalResult; } - + assert totalResult != 0; //log.debug("returning {} bytes", totalResult); return totalResult; } diff --git a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java index cd831a672..3b74bdf6d 100644 --- a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java +++ b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java @@ -12,7 +12,10 @@ import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.*; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import static org.apache.commons.io.IOUtils.EOF; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -39,10 +42,11 @@ public void before(TestInfo testInfo) { FileCachingInputStream.openStreams = 0; } - @Test - public void testRead() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRead(boolean downloadFirst) throws IOException { - try(FileCachingInputStream inputStream = slowReader()) { + try(FileCachingInputStream inputStream = slowReader(downloadFirst)) { ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -57,10 +61,11 @@ public void testRead() throws IOException { } - @Test - public void testReadBuffer() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testReadBuffer(boolean downloadFirst) throws IOException { - try(FileCachingInputStream inputStream = slowReader()) { + try(FileCachingInputStream inputStream = slowReader(downloadFirst)) { ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -76,7 +81,6 @@ public void testReadBuffer() throws IOException { } - @Test public void testReadFileGetsBroken() { assertThatThrownBy(() -> { @@ -85,6 +89,7 @@ public void testReadFileGetsBroken() { .builder() .outputBuffer(2) .batchSize(3) + .downloadFirst(false) // if true exception will come from constructor already .batchConsumer((f, c) -> { if (c.getCount() > 300) { // randomly close the file @@ -173,9 +178,8 @@ public void testReadFileGetsInterrupted() throws IOException { assertThat(isInterrupted).withFailMessage("Thread did not get interrupted").isTrue(); } - protected FileCachingInputStream slowReader() { - return - FileCachingInputStream.builder() + protected FileCachingInputStream slowReader(boolean downloadFirst) { + return FileCachingInputStream.builder() .outputBuffer(2) .batchSize(3) .batchConsumer((f, c) -> { @@ -190,6 +194,7 @@ protected FileCachingInputStream slowReader() { .input(new ByteArrayInputStream(MANY_BYTES)) .initialBuffer(4) .startImmediately(true) + .downloadFirst(downloadFirst) .build(); } @@ -251,6 +256,8 @@ public void testReadLargeBuffer() throws IOException { assertThat(out.toByteArray()).containsExactly(MANY_BYTES); } + + @Test public void testSimple() throws IOException { try ( @@ -298,9 +305,10 @@ public void testWithLargeBuffer() throws IOException { .outputBuffer(2) .batchSize(3) .input(new ByteArrayInputStream(HELLO)) - .initialBuffer(2048) + //.initialBuffer(2048) .build()) { + assertThat(inputStream.getBufferLength()).isEqualTo(HELLO.length); ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -310,6 +318,26 @@ public void testWithLargeBuffer() throws IOException { } } + @Test + public void testWithLargeBufferByte() throws IOException { + try ( + FileCachingInputStream inputStream = FileCachingInputStream.builder() + .outputBuffer(2) + .batchSize(3) + .input(new ByteArrayInputStream(HELLO)) + .initialBuffer(2048) + .build()) { + + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int n; + while (EOF != (n = inputStream.read())) { + out.write(n); + } + assertThat(out.toByteArray()).containsExactly(HELLO); + } + } + @Test public void testWithBufferEdge() throws IOException { try ( @@ -374,6 +402,89 @@ public void testWaitForBytesOnZeroBytes() throws IOException, InterruptedExcepti } } + @Test + public void ioExceptionFromSourceReadBytes() throws IOException { + int sizeOfStream = 10000; + try ( + // an input stream that will throw IOException when it's busy with file buffering + InputStream in = new InputStream() { + private int byteCount = 0; + + @Override + public int read() throws IOException { + if (byteCount == (sizeOfStream / 2)) { + throw new IOException("breaking!"); + } + return byteCount++ < sizeOfStream ? 'a' : -1; + } + }; + FileCachingInputStream stream = FileCachingInputStream.builder() + .outputBuffer(2) + .batchSize(100) + .input(in) + .logger(log) + .initialBuffer(4) + .build()) { + + byte[] buffer = new byte[500]; + + + assertThatThrownBy(() -> { + int read = 0; + int n; + while (EOF != (n = stream.read(buffer))) { + assertThat(n).isNotEqualTo(0); // cannot happen (unless buffer length == 0) according to contract + read += n; + log.debug("Read {}/{}", n, read); + } + } + ) + .isInstanceOf(IOException.class) + .hasMessage("breaking!"); + + assertThat(stream.getFuture()).isCompletedExceptionally(); + assertThat(stream.available()).isEqualTo(0); + } + } + + @Test + public void ioExceptionFromSourceReadByte() throws IOException { + int sizeOfStream = 10000; + try ( + // an input stream that will throw IOException when it's busy with file buffering + InputStream in = new InputStream() { + private int byteCount = 0; + + @Override + public int read() throws IOException { + if (byteCount == (sizeOfStream / 2)) { + throw new IOException("breaking!"); + } + return byteCount++ < sizeOfStream ? 'a' : -1; + } + }; + FileCachingInputStream stream = FileCachingInputStream.builder() + .outputBuffer(200) + .batchSize(100) + .input(in) + .logger(log) + .initialBuffer(4) + .build()) { + + assertThatThrownBy(() -> { + int b; + int read = 0; + while (EOF != (b = stream.read())) { + + read++; + log.debug("Read {}", read); + } + }).isInstanceOf(IOException.class) + .hasMessage("breaking!"); + } + } + + @Test public void createPath() { new File("/tmp/bestaatniet").delete();