diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java index b06105f46e1..f0278528d46 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java @@ -58,7 +58,7 @@ public static FileReader openReader(SeekableInput in, DatumReader read // read magic header byte[] magic = new byte[MAGIC.length]; in.seek(0); - for (int c = 0; c < magic.length; c = in.read(magic, c, magic.length - c)) { + for (int c = 0; c < magic.length; c += in.read(magic, c, magic.length - c)) { } in.seek(0); @@ -92,13 +92,13 @@ public static DataFileReader openReader(SeekableInput in, DatumReader * Construct a reader for a file. For example,if you want to read a file * record,you need to close the resource. You can use try-with-resource as * follows: - * + * *
    * try (FileReader dataFileReader =
    * DataFileReader.openReader(file,datumReader)) { //Consume the reader } catch
    * (IOException e) { throw new RunTimeIOException(e,"Failed to read metadata for
    * file: %s", file); }
-   * 
+   *
    * 
    */
   public DataFileReader(File file, DatumReader reader) throws IOException {
@@ -109,13 +109,13 @@ public DataFileReader(File file, DatumReader reader) throws IOException {
    * Construct a reader for a file. For example,if you want to read a file
    * record,you need to close the resource. You can use try-with-resource as
    * follows:
-   * 
+   *
    * 
    * try (FileReader dataFileReader =
    * DataFileReader.openReader(file,datumReader)) { //Consume the reader } catch
    * (IOException e) { throw new RunTimeIOException(e,"Failed to read metadata for
    * file: %s", file); }
-   * 
+   *
    * 
    */
   public DataFileReader(SeekableInput sin, DatumReader reader) throws IOException {
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFileReader.java b/lang/java/avro/src/test/java/org/apache/avro/TestDataFileReader.java
index a4e10434a58..c222685e2d5 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFileReader.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFileReader.java
@@ -28,13 +28,16 @@
 import java.lang.management.OperatingSystemMXBean;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import com.sun.management.UnixOperatingSystemMXBean;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.file.SeekableInput;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.junit.Test;
-import com.sun.management.UnixOperatingSystemMXBean;
 
 @SuppressWarnings("restriction")
 public class TestDataFileReader {
@@ -69,6 +72,65 @@ private long getNumberOfOpenFileDescriptors() {
     return 0;
   }
 
+  @Test
+  // regression test for bug AVRO-2944
+  public void testThrottledInputStream() throws IOException {
+    // AVRO-2944 describes hanging/failure in reading Avro file with performing
+    // magic header check. This happens with throttled input stream,
+    // where we read into buffer less bytes than requested.
+
+    Schema legacySchema = new Schema.Parser().setValidate(false).setValidateDefaults(false)
+        .parse("{\"type\": \"record\", \"name\": \"TestSchema\", \"fields\": "
+            + "[ {\"name\": \"id\", \"type\": [\"long\", \"null\"], \"default\": null}]}");
+    File f = Files.createTempFile("testThrottledInputStream", ".avro").toFile();
+    try (DataFileWriter w = new DataFileWriter<>(new GenericDatumWriter<>())) {
+      w.create(legacySchema, f);
+      w.flush();
+    }
+
+    // Without checking for magic header, throttled input has no effect
+    FileReader r = new DataFileReader(throttledInputStream(f), new GenericDatumReader<>());
+    assertEquals("TestSchema", r.getSchema().getName());
+
+    // With checking for magic header, throttled input should pass too.
+    FileReader r2 = DataFileReader.openReader(throttledInputStream(f), new GenericDatumReader<>());
+    assertEquals("TestSchema", r2.getSchema().getName());
+  }
+
+  private SeekableInput throttledInputStream(File f) throws IOException {
+    SeekableFileInput input = new SeekableFileInput(f);
+    return new SeekableInput() {
+      @Override
+      public void close() throws IOException {
+        input.close();
+      }
+
+      @Override
+      public void seek(long p) throws IOException {
+        input.seek(p);
+      }
+
+      @Override
+      public long tell() throws IOException {
+        return input.tell();
+      }
+
+      @Override
+      public long length() throws IOException {
+        return input.length();
+      }
+
+      @Override
+      public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 1) {
+          return input.read(b, off, len);
+        } else {
+          return input.read(b, off, len - 1);
+        }
+      }
+    };
+  }
+
   @Test
   public void testIgnoreSchemaValidationOnRead() throws IOException {
     // This schema has an accent in the name and the default for the field doesn't