Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static <D> FileReader<D> openReader(SeekableInput in, DatumReader<D> read
// read magic header
byte[] magic = new byte[MAGIC.length];
in.seek(0);
for (int c = 0; c < magic.length; c = in.read(magic, c, magic.length - c)) {
for (int c = 0; c < magic.length; c += in.read(magic, c, magic.length - c)) {
}
in.seek(0);

Expand Down Expand Up @@ -92,13 +92,13 @@ public static <D> DataFileReader<D> openReader(SeekableInput in, DatumReader<D>
* Construct a reader for a file. For example,if you want to read a file
* record,you need to close the resource. You can use try-with-resource as
* follows:
*
*
* <pre>
* try (FileReader<User> dataFileReader =
* DataFileReader.openReader(file,datumReader)) { //Consume the reader } catch
* (IOException e) { throw new RunTimeIOException(e,"Failed to read metadata for
* file: %s", file); }
*
*
* <pre/>
*/
public DataFileReader(File file, DatumReader<D> reader) throws IOException {
Expand All @@ -109,13 +109,13 @@ public DataFileReader(File file, DatumReader<D> reader) throws IOException {
* Construct a reader for a file. For example,if you want to read a file
* record,you need to close the resource. You can use try-with-resource as
* follows:
*
*
* <pre>
* try (FileReader<User> dataFileReader =
* DataFileReader.openReader(file,datumReader)) { //Consume the reader } catch
* (IOException e) { throw new RunTimeIOException(e,"Failed to read metadata for
* file: %s", file); }
*
*
* <pre/>
*/
public DataFileReader(SeekableInput sin, DatumReader<D> reader) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import java.lang.management.OperatingSystemMXBean;
import java.nio.file.Files;
import java.nio.file.Path;
import com.sun.management.UnixOperatingSystemMXBean;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableFileInput;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.junit.Test;
import com.sun.management.UnixOperatingSystemMXBean;

@SuppressWarnings("restriction")
public class TestDataFileReader {
Expand Down Expand Up @@ -69,6 +72,65 @@ private long getNumberOfOpenFileDescriptors() {
return 0;
}

@Test
// regression test for bug AVRO-2944
public void testThrottledInputStream() throws IOException {
// AVRO-2944 describes hanging/failure in reading Avro file with performing
// magic header check. This happens with throttled input stream,
// where we read into buffer less bytes than requested.

Schema legacySchema = new Schema.Parser().setValidate(false).setValidateDefaults(false)
.parse("{\"type\": \"record\", \"name\": \"TestSchema\", \"fields\": "
+ "[ {\"name\": \"id\", \"type\": [\"long\", \"null\"], \"default\": null}]}");
File f = Files.createTempFile("testThrottledInputStream", ".avro").toFile();
try (DataFileWriter<?> w = new DataFileWriter<>(new GenericDatumWriter<>())) {
w.create(legacySchema, f);
w.flush();
}

// Without checking for magic header, throttled input has no effect
FileReader r = new DataFileReader(throttledInputStream(f), new GenericDatumReader<>());
assertEquals("TestSchema", r.getSchema().getName());

// With checking for magic header, throttled input should pass too.
FileReader r2 = DataFileReader.openReader(throttledInputStream(f), new GenericDatumReader<>());
assertEquals("TestSchema", r2.getSchema().getName());
}

private SeekableInput throttledInputStream(File f) throws IOException {
SeekableFileInput input = new SeekableFileInput(f);
return new SeekableInput() {
@Override
public void close() throws IOException {
input.close();
}

@Override
public void seek(long p) throws IOException {
input.seek(p);
}

@Override
public long tell() throws IOException {
return input.tell();
}

@Override
public long length() throws IOException {
return input.length();
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (len == 1) {
return input.read(b, off, len);
} else {
return input.read(b, off, len - 1);
}
}
};
}

@Test
public void testIgnoreSchemaValidationOnRead() throws IOException {
// This schema has an accent in the name and the default for the field doesn't
Expand Down