Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions vpro-shared-util/src/main/java/nl/vpro/util/Copier.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public class Copier implements Runnable, Closeable {

private boolean ready;
private Throwable expection;
private long count;

private final InputStream input;
Expand All @@ -32,7 +33,6 @@ public class Copier implements Runnable, Closeable {
private final Consumer<Copier> batchConsumer;
private Future<?> future;
private final String name;

private final Object notify;


Expand Down Expand Up @@ -107,26 +107,27 @@ public void run() {
}
}
} catch (IOException ioe) {
if (errorHandler != null) {
errorHandler.accept(this, ioe);
}
expection = ioe;
log.debug(ioe.getMessage());
} catch (Throwable t) {
if (! CommandExecutor.isBrokenPipe(t)) {
log.warn("{}Connector {}\n{} {}", logPrefix(), toString(), t.getClass().getName(), t.getMessage());
}
if (errorHandler != null) {
errorHandler.accept(this, t);
}

expection = t;
} finally {
synchronized (this) {
ready = true;
// The copier is ready, but resulted some error, the user requested to be called back, so do that now
if (errorHandler != null && expection != null) {
errorHandler.accept(this, expection);
}
log.debug("{}notifying listeners", logPrefix());
}
if (callback != null) {
callback.accept(this);
}
synchronized (this) {
// ready now, notify threads waiting
notifyAll();
}

Expand All @@ -142,10 +143,34 @@ public void waitFor() throws InterruptedException {
}

public boolean isReady() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thie method should always throw IOException, let the caller handle it if wanted so. You always want to propagate the error.

try {
return isReadyIOException();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

public boolean isReadyIOException() throws IOException {
if (ready) {
throwIOExceptionIfNeeded();
}
return ready;
}

private void throwIOExceptionIfNeeded() throws IOException {
if (expection != null) {
if (expection instanceof IOException) {
throw (IOException) expection;
} else {
throw new IOException(expection);
}
}
}


/**
* Returns the number of bytes read from the input stream so far
*/
public long getCount() {
return count;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nl.vpro.util;

import lombok.Getter;
import lombok.SneakyThrows;
import lombok.*;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
Expand All @@ -21,6 +20,8 @@
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import com.google.common.annotations.VisibleForTesting;

import nl.vpro.logging.Slf4jHelper;

/**
Expand All @@ -44,6 +45,8 @@ public class FileCachingInputStream extends InputStream {
private static final int EOF = -1;
private final Copier copier;
private final byte[] buffer;
@Getter(AccessLevel.PACKAGE)
@VisibleForTesting
private final int bufferLength;


Expand Down Expand Up @@ -297,34 +300,20 @@ private FileCachingInputStream(

@Override
public int read() throws IOException {
try {
if (tempFileInputStream == null) {
// the stream was small, we are reading from the memory buffer
return readFromBuffer();
} else {
return readFromFile();
}
} catch (IOException ioe) {
close();
future.completeExceptionally(ioe);
throw ioe;
if (tempFileInputStream == null) {
// the stream was small, we are reading from the memory buffer
return readFromBuffer();
} else {
return readFromFile();
}
}

@Override
public int read(@NonNull byte[] b) throws IOException {
try {
if (tempFileInputStream == null) {
return readFromBuffer(b);
} else {
return readFromFile(b);
}
} catch (IOException ioe) {
if (! closed) {
close();
}
future.completeExceptionally(ioe);
throw ioe;
if (tempFileInputStream == null) {
return readFromBuffer(b);
} else {
return readFromFile(b);
}
}

Expand Down Expand Up @@ -374,6 +363,11 @@ public String toString() {
return super.toString() + " for " + tempFile;
}


/**
* Wait until the copier thread read at least the number of bytes given.
*
*/
public synchronized long waitForBytesRead(int atLeast) throws InterruptedException {
if (copier != null) {
copier.executeIfNotRunning();
Expand All @@ -386,15 +380,24 @@ public synchronized long waitForBytesRead(int atLeast) throws InterruptedExcepti
}
}

/**
* Returns the number of bytes consumed from the input stream so far
*/
public long getCount() {
return copier == null ? bufferLength : copier.getCount();
}

/**
* If a temp file is used for buffering, you can may obtain it.
*/
public Path getTempFile() {
return tempFile;
}


/**
* One of the paths of {@link #read()}, when it is reading from memory.
*/
private int readFromBuffer() {
if (count < bufferLength) {
int result = buffer[(int) count++];
Expand All @@ -408,6 +411,9 @@ private int readFromBuffer() {
}
}

/**
* One of the paths of {@link #read(byte[])} )}, when it is reading from memory.
*/
private int readFromBuffer(byte[] b) {
int toCopy = Math.min(b.length, bufferLength - (int) count);
if (toCopy > 0) {
Expand All @@ -419,12 +425,17 @@ private int readFromBuffer(byte[] b) {
}
}


/**
*
* See {@link InputStream#read()} This methods must behave exactly according to that.
*/
private int readFromFile() throws IOException {
copier.executeIfNotRunning();
int result = tempFileInputStream.read();
while (result == EOF) {
synchronized (copier) {
while (!copier.isReady() && result == EOF) {
while (!copier.isReadyIOException() && result == EOF) {
log.debug("Copier {} not yet ready", copier.logPrefix());
// copier is still busy, wait a second, and try again.
try {
Expand All @@ -437,7 +448,7 @@ private int readFromFile() throws IOException {
}
result = tempFileInputStream.read();
}
if (copier.isReady() && result == EOF) {
if (copier.isReadyIOException() && result == EOF) {
// the copier did not return any new results
// don't increase count but return now.
return EOF;
Expand All @@ -452,9 +463,13 @@ private int readFromFile() throws IOException {
return result;
}

/**
*
* See {@link InputStream#read(byte[])} This methods must behave exactly according to that.
*/
private int readFromFile(byte[] b) throws IOException {
copier.executeIfNotRunning();
if (copier.isReady() && count == copier.getCount()) {
if (copier.isReadyIOException() && count == copier.getCount()) {
return EOF;
}
int totalResult = 0;
Expand All @@ -463,7 +478,7 @@ private int readFromFile(byte[] b) throws IOException {

if (totalResult == 0) {

while (!copier.isReady() && totalResult == 0) {
while (!copier.isReadyIOException() && totalResult == 0) {
log.debug("Copier {} not yet ready", copier.logPrefix());
try {
copier.wait(1000);
Expand All @@ -473,13 +488,17 @@ private int readFromFile(byte[] b) throws IOException {
throw new InterruptedIOException(e.getMessage());
}
int subResult = Math.max(tempFileInputStream.read(b, totalResult, b.length - totalResult), 0);
totalResult += subResult;
totalResult += subResult;
}
if (totalResult == 0) {
// I doubt this can happen
return EOF;
}

}
count += totalResult;
}

assert totalResult != 0;
//log.debug("returning {} bytes", totalResult);
return totalResult;
}
Expand Down
Loading