From 2315510903284b4be39f93c95880f534bdef2b22 Mon Sep 17 00:00:00 2001 From: Michiel Meeuwissen Date: Fri, 23 Oct 2020 11:52:10 +0200 Subject: [PATCH 1/9] MSE-4945 I think test demonstrates the problem. --- .../vpro/util/FileCachingInputStreamTest.java | 50 +++++++++++++++++-- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java index cd831a672..02f8c857e 100644 --- a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java +++ b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java @@ -6,11 +6,11 @@ import java.nio.channels.ClosedByInterruptException; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; +import org.assertj.core.api.AbstractThrowableAssert; import org.junit.jupiter.api.*; import static org.assertj.core.api.Assertions.assertThat; @@ -174,8 +174,7 @@ public void testReadFileGetsInterrupted() throws IOException { } protected FileCachingInputStream slowReader() { - return - FileCachingInputStream.builder() + return FileCachingInputStream.builder() .outputBuffer(2) .batchSize(3) .batchConsumer((f, c) -> { @@ -374,6 +373,49 @@ public void testWaitForBytesOnZeroBytes() throws IOException, InterruptedExcepti } } + @Test + public void ioExceptionFromSource() { + AbstractThrowableAssert o = assertThatThrownBy(() -> { + int sizeOfStream = 10000; + try ( + // an input stream that will throw IOException when it's busy with file buffering + InputStream in = new InputStream() { + private int byteCount = 0; + + @Override + public int read() throws IOException { + if (byteCount == (sizeOfStream / 2)) { + throw new IOException("breaking!"); + } + return byteCount++ < sizeOfStream ? 'a' : -1; + } + }; + FileCachingInputStream stream = FileCachingInputStream.builder() + .outputBuffer(2) + .batchSize(100) + .input(in) + .logger(log) + .initialBuffer(4) + .build()) { + + byte[] buffer = new byte[500]; + + int n; + int read = 0; + while (IOUtils.EOF != (n = stream.read(buffer))) { + assertThat(n).isNotEqualTo(0); + read += n; + log.debug("Read {}/{}", n, read); + } + } + }) + .isInstanceOf(IOException.class) + .hasMessage("breaking!"); + ; + + + } + @Test public void createPath() { new File("/tmp/bestaatniet").delete(); From c7d5c2339ca8885a8694968872af2552c9c5c8a0 Mon Sep 17 00:00:00 2001 From: Michiel Meeuwissen Date: Fri, 23 Oct 2020 12:11:55 +0200 Subject: [PATCH 2/9] MSE-4945 --- .../nl/vpro/util/FileCachingInputStream.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java b/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java index 81e7f989a..e3d6c71b4 100644 --- a/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java +++ b/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java @@ -313,18 +313,10 @@ public int read() throws IOException { @Override public int read(@NonNull byte[] b) throws IOException { - try { - if (tempFileInputStream == null) { - return readFromBuffer(b); - } else { - return readFromFile(b); - } - } catch (IOException ioe) { - if (! closed) { - close(); - } - future.completeExceptionally(ioe); - throw ioe; + if (tempFileInputStream == null) { + return readFromBuffer(b); + } else { + return readFromFile(b); } } @@ -424,7 +416,7 @@ private int readFromFile() throws IOException { int result = tempFileInputStream.read(); while (result == EOF) { synchronized (copier) { - while (!copier.isReady() && result == EOF) { + while (!copier.isReadyIOException() && result == EOF) { log.debug("Copier {} not yet ready", copier.logPrefix()); // copier is still busy, wait a second, and try again. try { @@ -437,7 +429,7 @@ private int readFromFile() throws IOException { } result = tempFileInputStream.read(); } - if (copier.isReady() && result == EOF) { + if (copier.isReadyIOException() && result == EOF) { // the copier did not return any new results // don't increase count but return now. return EOF; @@ -452,9 +444,13 @@ private int readFromFile() throws IOException { return result; } + /** + * + * @see {@link InputStream#read(byte[])} This methods must behave exactly according to that. + */ private int readFromFile(byte[] b) throws IOException { copier.executeIfNotRunning(); - if (copier.isReady() && count == copier.getCount()) { + if (copier.isReadyIOException() && count == copier.getCount()) { return EOF; } int totalResult = 0; @@ -463,7 +459,7 @@ private int readFromFile(byte[] b) throws IOException { if (totalResult == 0) { - while (!copier.isReady() && totalResult == 0) { + while (!copier.isReadyIOException() && totalResult == 0) { log.debug("Copier {} not yet ready", copier.logPrefix()); try { copier.wait(1000); @@ -473,13 +469,17 @@ private int readFromFile(byte[] b) throws IOException { throw new InterruptedIOException(e.getMessage()); } int subResult = Math.max(tempFileInputStream.read(b, totalResult, b.length - totalResult), 0); - totalResult += subResult; + totalResult += subResult; + } + if (totalResult == 0) { + // I doubt this can happen + return EOF; } } count += totalResult; } - + assert totalResult != 0; //log.debug("returning {} bytes", totalResult); return totalResult; } From d428b51784f9b97f6ffcfe98b148b04ef96c6f3c Mon Sep 17 00:00:00 2001 From: Michiel Meeuwissen Date: Fri, 23 Oct 2020 12:12:03 +0200 Subject: [PATCH 3/9] MSE-4945 --- .../src/main/java/nl/vpro/util/Copier.java | 38 +++++++++++++++---- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/vpro-shared-util/src/main/java/nl/vpro/util/Copier.java b/vpro-shared-util/src/main/java/nl/vpro/util/Copier.java index d804b493f..8fbf1338f 100644 --- a/vpro-shared-util/src/main/java/nl/vpro/util/Copier.java +++ b/vpro-shared-util/src/main/java/nl/vpro/util/Copier.java @@ -22,6 +22,7 @@ public class Copier implements Runnable, Closeable { private boolean ready; + private Throwable errors; private long count; private final InputStream input; @@ -32,7 +33,6 @@ public class Copier implements Runnable, Closeable { private final Consumer batchConsumer; private Future future; private final String name; - private final Object notify; @@ -107,26 +107,27 @@ public void run() { } } } catch (IOException ioe) { - if (errorHandler != null) { - errorHandler.accept(this, ioe); - } + errors = ioe; + log.debug(ioe.getMessage()); } catch (Throwable t) { if (! CommandExecutor.isBrokenPipe(t)) { log.warn("{}Connector {}\n{} {}", logPrefix(), toString(), t.getClass().getName(), t.getMessage()); } - if (errorHandler != null) { - errorHandler.accept(this, t); - } - + errors = t; } finally { synchronized (this) { ready = true; + // The copier is ready, but resulted some error, the user requested to be called back, so do that now + if (errorHandler != null && errors != null) { + errorHandler.accept(this, errors); + } log.debug("{}notifying listeners", logPrefix()); } if (callback != null) { callback.accept(this); } synchronized (this) { + // ready now, notify threads waiting notifyAll(); } @@ -142,9 +143,30 @@ public void waitFor() throws InterruptedException { } public boolean isReady() { + try { + return isReadyIOException(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + public boolean isReadyIOException() throws IOException { + if (ready) { + throwIOExceptionIfNeeded(); + } return ready; } + private void throwIOExceptionIfNeeded() throws IOException { + if (errors != null) { + if (errors instanceof IOException) { + throw (IOException) errors; + } else { + throw new IOException(errors); + } + } + } + public long getCount() { return count; From c3de4fb8077adaceb2e2758f3f9fd4619902ad71 Mon Sep 17 00:00:00 2001 From: Michiel Meeuwissen Date: Fri, 23 Oct 2020 13:25:19 +0200 Subject: [PATCH 4/9] Another case. The read() must be equivalently implemented --- .../nl/vpro/util/FileCachingInputStream.java | 21 +++++---- .../vpro/util/FileCachingInputStreamTest.java | 44 ++++++++++++++++++- 2 files changed, 53 insertions(+), 12 deletions(-) diff --git a/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java b/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java index e3d6c71b4..b69dc922d 100644 --- a/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java +++ b/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java @@ -297,17 +297,11 @@ private FileCachingInputStream( @Override public int read() throws IOException { - try { - if (tempFileInputStream == null) { - // the stream was small, we are reading from the memory buffer - return readFromBuffer(); - } else { - return readFromFile(); - } - } catch (IOException ioe) { - close(); - future.completeExceptionally(ioe); - throw ioe; + if (tempFileInputStream == null) { + // the stream was small, we are reading from the memory buffer + return readFromBuffer(); + } else { + return readFromFile(); } } @@ -411,6 +405,11 @@ private int readFromBuffer(byte[] b) { } } + + /** + * + * @see {@link InputStream#read()} This methods must behave exactly according to that. + */ private int readFromFile() throws IOException { copier.executeIfNotRunning(); int result = tempFileInputStream.read(); diff --git a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java index 02f8c857e..8fbf62a37 100644 --- a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java +++ b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java @@ -374,7 +374,7 @@ public void testWaitForBytesOnZeroBytes() throws IOException, InterruptedExcepti } @Test - public void ioExceptionFromSource() { + public void ioExceptionFromSourceReadBytes() { AbstractThrowableAssert o = assertThatThrownBy(() -> { int sizeOfStream = 10000; try ( @@ -414,6 +414,48 @@ public int read() throws IOException { ; + } + + @Test + public void ioExceptionFromSourceReadByte() { + AbstractThrowableAssert o = assertThatThrownBy(() -> { + int sizeOfStream = 10000; + try ( + // an input stream that will throw IOException when it's busy with file buffering + InputStream in = new InputStream() { + private int byteCount = 0; + + @Override + public int read() throws IOException { + if (byteCount == (sizeOfStream / 2)) { + throw new IOException("breaking!"); + } + return byteCount++ < sizeOfStream ? 'a' : -1; + } + }; + FileCachingInputStream stream = FileCachingInputStream.builder() + .outputBuffer(2) + .batchSize(100) + .input(in) + .logger(log) + .initialBuffer(4) + .build()) { + + + int b; + int read = 0; + while (IOUtils.EOF != (b = stream.read())) { + + read++; + log.debug("Read {}", read); + } + } + }) + .isInstanceOf(IOException.class) + .hasMessage("breaking!"); + ; + + } @Test From 63334c51efab057db1effbf64f51cb4c83f2c8f4 Mon Sep 17 00:00:00 2001 From: Michiel Meeuwissen Date: Fri, 23 Oct 2020 13:31:37 +0200 Subject: [PATCH 5/9] Added test for future. --- .../vpro/util/FileCachingInputStreamTest.java | 62 ++++++++++--------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java index 8fbf62a37..8ca4aa2c5 100644 --- a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java +++ b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java @@ -374,46 +374,48 @@ public void testWaitForBytesOnZeroBytes() throws IOException, InterruptedExcepti } @Test - public void ioExceptionFromSourceReadBytes() { - AbstractThrowableAssert o = assertThatThrownBy(() -> { - int sizeOfStream = 10000; - try ( - // an input stream that will throw IOException when it's busy with file buffering - InputStream in = new InputStream() { - private int byteCount = 0; - - @Override - public int read() throws IOException { - if (byteCount == (sizeOfStream / 2)) { - throw new IOException("breaking!"); - } - return byteCount++ < sizeOfStream ? 'a' : -1; + public void ioExceptionFromSourceReadBytes() throws IOException { + int sizeOfStream = 10000; + try ( + // an input stream that will throw IOException when it's busy with file buffering + InputStream in = new InputStream() { + private int byteCount = 0; + + @Override + public int read() throws IOException { + if (byteCount == (sizeOfStream / 2)) { + throw new IOException("breaking!"); } - }; - FileCachingInputStream stream = FileCachingInputStream.builder() - .outputBuffer(2) - .batchSize(100) - .input(in) - .logger(log) - .initialBuffer(4) - .build()) { + return byteCount++ < sizeOfStream ? 'a' : -1; + } + }; + FileCachingInputStream stream = FileCachingInputStream.builder() + .outputBuffer(2) + .batchSize(100) + .input(in) + .logger(log) + .initialBuffer(4) + .build()) { - byte[] buffer = new byte[500]; + byte[] buffer = new byte[500]; - int n; + + assertThatThrownBy(() -> { int read = 0; + int n; while (IOUtils.EOF != (n = stream.read(buffer))) { assertThat(n).isNotEqualTo(0); read += n; log.debug("Read {}/{}", n, read); } - } - }) - .isInstanceOf(IOException.class) - .hasMessage("breaking!"); - ; - + } + ) + .isInstanceOf(IOException.class) + .hasMessage("breaking!"); + assertThat(stream.getFuture()).isCompletedExceptionally(); + assertThat(stream.available()).isEqualTo(0); + } } @Test From 1f104bd9df3e1aef52b67000967ee05344092b0a Mon Sep 17 00:00:00 2001 From: Michiel Meeuwissen Date: Fri, 23 Oct 2020 13:48:19 +0200 Subject: [PATCH 6/9] Testing another path which was still missed. --- .../vpro/util/FileCachingInputStreamTest.java | 108 +++++++++++------- 1 file changed, 66 insertions(+), 42 deletions(-) diff --git a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java index 8ca4aa2c5..382275251 100644 --- a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java +++ b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java @@ -6,13 +6,16 @@ import java.nio.channels.ClosedByInterruptException; import java.time.Duration; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; -import org.assertj.core.api.AbstractThrowableAssert; import org.junit.jupiter.api.*; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import static org.apache.commons.io.IOUtils.EOF; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -39,10 +42,11 @@ public void before(TestInfo testInfo) { FileCachingInputStream.openStreams = 0; } - @Test - public void testRead() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRead(boolean downloadFirst) throws IOException { - try(FileCachingInputStream inputStream = slowReader()) { + try(FileCachingInputStream inputStream = slowReader(downloadFirst)) { ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -57,10 +61,11 @@ public void testRead() throws IOException { } - @Test - public void testReadBuffer() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testReadBuffer(boolean downloadFirst) throws IOException { - try(FileCachingInputStream inputStream = slowReader()) { + try(FileCachingInputStream inputStream = slowReader(downloadFirst)) { ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -76,7 +81,6 @@ public void testReadBuffer() throws IOException { } - @Test public void testReadFileGetsBroken() { assertThatThrownBy(() -> { @@ -85,6 +89,7 @@ public void testReadFileGetsBroken() { .builder() .outputBuffer(2) .batchSize(3) + .downloadFirst(false) // if true exception will come from constructor already .batchConsumer((f, c) -> { if (c.getCount() > 300) { // randomly close the file @@ -173,7 +178,7 @@ public void testReadFileGetsInterrupted() throws IOException { assertThat(isInterrupted).withFailMessage("Thread did not get interrupted").isTrue(); } - protected FileCachingInputStream slowReader() { + protected FileCachingInputStream slowReader(boolean downloadFirst) { return FileCachingInputStream.builder() .outputBuffer(2) .batchSize(3) @@ -189,6 +194,7 @@ protected FileCachingInputStream slowReader() { .input(new ByteArrayInputStream(MANY_BYTES)) .initialBuffer(4) .startImmediately(true) + .downloadFirst(downloadFirst) .build(); } @@ -250,6 +256,8 @@ public void testReadLargeBuffer() throws IOException { assertThat(out.toByteArray()).containsExactly(MANY_BYTES); } + + @Test public void testSimple() throws IOException { try ( @@ -309,6 +317,26 @@ public void testWithLargeBuffer() throws IOException { } } + @Test + public void testWithLargeBufferByte() throws IOException { + try ( + FileCachingInputStream inputStream = FileCachingInputStream.builder() + .outputBuffer(2) + .batchSize(3) + .input(new ByteArrayInputStream(HELLO)) + .initialBuffer(2048) + .build()) { + + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int n; + while (EOF != (n = inputStream.read())) { + out.write(n); + } + assertThat(out.toByteArray()).containsExactly(HELLO); + } + } + @Test public void testWithBufferEdge() throws IOException { try ( @@ -403,7 +431,7 @@ public int read() throws IOException { assertThatThrownBy(() -> { int read = 0; int n; - while (IOUtils.EOF != (n = stream.read(buffer))) { + while (EOF != (n = stream.read(buffer))) { assertThat(n).isNotEqualTo(0); read += n; log.debug("Read {}/{}", n, read); @@ -419,47 +447,43 @@ public int read() throws IOException { } @Test - public void ioExceptionFromSourceReadByte() { - AbstractThrowableAssert o = assertThatThrownBy(() -> { - int sizeOfStream = 10000; - try ( - // an input stream that will throw IOException when it's busy with file buffering - InputStream in = new InputStream() { - private int byteCount = 0; - - @Override - public int read() throws IOException { - if (byteCount == (sizeOfStream / 2)) { - throw new IOException("breaking!"); - } - return byteCount++ < sizeOfStream ? 'a' : -1; - } - }; - FileCachingInputStream stream = FileCachingInputStream.builder() - .outputBuffer(2) - .batchSize(100) - .input(in) - .logger(log) - .initialBuffer(4) - .build()) { + public void ioExceptionFromSourceReadByte() throws IOException { + int sizeOfStream = 10000; + try ( + // an input stream that will throw IOException when it's busy with file buffering + InputStream in = new InputStream() { + private int byteCount = 0; + @Override + public int read() throws IOException { + if (byteCount == (sizeOfStream / 2)) { + throw new IOException("breaking!"); + } + return byteCount++ < sizeOfStream ? 'a' : -1; + } + }; + FileCachingInputStream stream = FileCachingInputStream.builder() + .outputBuffer(200) + .batchSize(100) + .input(in) + .logger(log) + .initialBuffer(4) + .build()) { + assertThatThrownBy(() -> { int b; int read = 0; - while (IOUtils.EOF != (b = stream.read())) { + while (EOF != (b = stream.read())) { read++; log.debug("Read {}", read); } - } - }) - .isInstanceOf(IOException.class) - .hasMessage("breaking!"); - ; - - + }).isInstanceOf(IOException.class) + .hasMessage("breaking!"); + } } + @Test public void createPath() { new File("/tmp/bestaatniet").delete(); From 693307960d76601a7717a5cbbf2875c20244a890 Mon Sep 17 00:00:00 2001 From: Michiel Meeuwissen Date: Fri, 23 Oct 2020 13:57:56 +0200 Subject: [PATCH 7/9] Testing more paths --- .../src/main/java/nl/vpro/util/FileCachingInputStream.java | 7 +++++-- .../test/java/nl/vpro/util/FileCachingInputStreamTest.java | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java b/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java index b69dc922d..899843077 100644 --- a/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java +++ b/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java @@ -1,7 +1,6 @@ package nl.vpro.util; -import lombok.Getter; -import lombok.SneakyThrows; +import lombok.*; import lombok.extern.slf4j.Slf4j; import java.io.*; @@ -21,6 +20,8 @@ import org.slf4j.LoggerFactory; import org.slf4j.event.Level; +import com.google.common.annotations.VisibleForTesting; + import nl.vpro.logging.Slf4jHelper; /** @@ -44,6 +45,8 @@ public class FileCachingInputStream extends InputStream { private static final int EOF = -1; private final Copier copier; private final byte[] buffer; + @Getter(AccessLevel.PACKAGE) + @VisibleForTesting private final int bufferLength; diff --git a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java index 382275251..c63568d4a 100644 --- a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java +++ b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java @@ -305,9 +305,10 @@ public void testWithLargeBuffer() throws IOException { .outputBuffer(2) .batchSize(3) .input(new ByteArrayInputStream(HELLO)) - .initialBuffer(2048) + //.initialBuffer(2048) .build()) { + assertThat(inputStream.getBufferLength()).isEqualTo(HELLO.length); ByteArrayOutputStream out = new ByteArrayOutputStream(); From 25872c011aa71044abca32b38a85c445d68cebc4 Mon Sep 17 00:00:00 2001 From: Michiel Meeuwissen Date: Fri, 23 Oct 2020 15:27:55 +0200 Subject: [PATCH 8/9] Details. --- .../src/main/java/nl/vpro/util/Copier.java | 21 +++++++++++-------- .../nl/vpro/util/FileCachingInputStream.java | 21 +++++++++++++++++-- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/vpro-shared-util/src/main/java/nl/vpro/util/Copier.java b/vpro-shared-util/src/main/java/nl/vpro/util/Copier.java index 8fbf1338f..2b7ee5cf9 100644 --- a/vpro-shared-util/src/main/java/nl/vpro/util/Copier.java +++ b/vpro-shared-util/src/main/java/nl/vpro/util/Copier.java @@ -22,7 +22,7 @@ public class Copier implements Runnable, Closeable { private boolean ready; - private Throwable errors; + private Throwable expection; private long count; private final InputStream input; @@ -107,19 +107,19 @@ public void run() { } } } catch (IOException ioe) { - errors = ioe; + expection = ioe; log.debug(ioe.getMessage()); } catch (Throwable t) { if (! CommandExecutor.isBrokenPipe(t)) { log.warn("{}Connector {}\n{} {}", logPrefix(), toString(), t.getClass().getName(), t.getMessage()); } - errors = t; + expection = t; } finally { synchronized (this) { ready = true; // The copier is ready, but resulted some error, the user requested to be called back, so do that now - if (errorHandler != null && errors != null) { - errorHandler.accept(this, errors); + if (errorHandler != null && expection != null) { + errorHandler.accept(this, expection); } log.debug("{}notifying listeners", logPrefix()); } @@ -158,16 +158,19 @@ public boolean isReadyIOException() throws IOException { } private void throwIOExceptionIfNeeded() throws IOException { - if (errors != null) { - if (errors instanceof IOException) { - throw (IOException) errors; + if (expection != null) { + if (expection instanceof IOException) { + throw (IOException) expection; } else { - throw new IOException(errors); + throw new IOException(expection); } } } + /** + * Returns the number of bytes read from the input stream so far + */ public long getCount() { return count; } diff --git a/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java b/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java index 899843077..d10575df7 100644 --- a/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java +++ b/vpro-shared-util/src/main/java/nl/vpro/util/FileCachingInputStream.java @@ -363,6 +363,11 @@ public String toString() { return super.toString() + " for " + tempFile; } + + /** + * Wait until the copier thread read at least the number of bytes given. + * + */ public synchronized long waitForBytesRead(int atLeast) throws InterruptedException { if (copier != null) { copier.executeIfNotRunning(); @@ -375,15 +380,24 @@ public synchronized long waitForBytesRead(int atLeast) throws InterruptedExcepti } } + /** + * Returns the number of bytes consumed from the input stream so far + */ public long getCount() { return copier == null ? bufferLength : copier.getCount(); } + /** + * If a temp file is used for buffering, you can may obtain it. + */ public Path getTempFile() { return tempFile; } + /** + * One of the paths of {@link #read()}, when it is reading from memory. + */ private int readFromBuffer() { if (count < bufferLength) { int result = buffer[(int) count++]; @@ -397,6 +411,9 @@ private int readFromBuffer() { } } + /** + * One of the paths of {@link #read(byte[])} )}, when it is reading from memory. + */ private int readFromBuffer(byte[] b) { int toCopy = Math.min(b.length, bufferLength - (int) count); if (toCopy > 0) { @@ -411,7 +428,7 @@ private int readFromBuffer(byte[] b) { /** * - * @see {@link InputStream#read()} This methods must behave exactly according to that. + * See {@link InputStream#read()} This methods must behave exactly according to that. */ private int readFromFile() throws IOException { copier.executeIfNotRunning(); @@ -448,7 +465,7 @@ private int readFromFile() throws IOException { /** * - * @see {@link InputStream#read(byte[])} This methods must behave exactly according to that. + * See {@link InputStream#read(byte[])} This methods must behave exactly according to that. */ private int readFromFile(byte[] b) throws IOException { copier.executeIfNotRunning(); From dc7149271ee2250663ccc5a4660d4d4dd7321c4a Mon Sep 17 00:00:00 2001 From: Michiel Meeuwissen Date: Fri, 23 Oct 2020 15:29:13 +0200 Subject: [PATCH 9/9] comments only. --- .../src/test/java/nl/vpro/util/FileCachingInputStreamTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java index c63568d4a..3b74bdf6d 100644 --- a/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java +++ b/vpro-shared-util/src/test/java/nl/vpro/util/FileCachingInputStreamTest.java @@ -433,7 +433,7 @@ public int read() throws IOException { int read = 0; int n; while (EOF != (n = stream.read(buffer))) { - assertThat(n).isNotEqualTo(0); + assertThat(n).isNotEqualTo(0); // cannot happen (unless buffer length == 0) according to contract read += n; log.debug("Read {}/{}", n, read); }