From 43314ca15ec1735a77cbff69edf7b64557be2e13 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 22 Nov 2017 13:00:34 -0500 Subject: [PATCH] ARROW-1047: [Java] Add Generic Reader Interface for Stream Format Change-Id: I7a59a24bd54339cd637ace36e991bc062ba1d4e1 --- .../org/apache/arrow/tools/EchoServer.java | 4 +- .../org/apache/arrow/tools/FileRoundtrip.java | 4 +- .../org/apache/arrow/tools/FileToStream.java | 4 +- .../org/apache/arrow/tools/Integration.java | 10 +- .../org/apache/arrow/tools/StreamToFile.java | 4 +- .../arrow/tools/ArrowFileTestFixtures.java | 6 +- .../apache/arrow/tools/EchoServerTest.java | 4 +- .../main/codegen/templates/UnionVector.java | 2 +- .../arrow/vector/BaseDataValueVector.java | 2 +- .../vector/BaseNullableFixedWidthVector.java | 2 +- .../BaseNullableVariableWidthVector.java | 2 +- .../org/apache/arrow/vector/BitVector.java | 2 +- .../apache/arrow/vector/BitVectorHelper.java | 2 +- .../org/apache/arrow/vector/BufferBacked.java | 2 +- .../org/apache/arrow/vector/FieldVector.java | 2 +- .../org/apache/arrow/vector/VectorLoader.java | 6 +- .../apache/arrow/vector/VectorUnloader.java | 6 +- .../org/apache/arrow/vector/ZeroVector.java | 2 +- .../vector/complex/FixedSizeListVector.java | 2 +- .../arrow/vector/complex/ListVector.java | 2 +- .../vector/complex/NullableMapVector.java | 2 +- .../vector/{file => ipc}/ArrowFileReader.java | 59 +++++-- .../vector/{file => ipc}/ArrowFileWriter.java | 4 +- .../vector/{file => ipc}/ArrowMagic.java | 4 +- .../vector/{file => ipc}/ArrowReader.java | 150 ++++++++++-------- .../arrow/vector/ipc/ArrowStreamReader.java | 148 +++++++++++++++++ .../{stream => ipc}/ArrowStreamWriter.java | 11 +- .../vector/{file => ipc}/ArrowWriter.java | 9 +- .../InvalidArrowFileException.java | 2 +- .../{file/json => ipc}/JsonFileReader.java | 10 +- .../{file/json => ipc}/JsonFileWriter.java | 6 +- .../vector/{file => ipc}/ReadChannel.java | 2 +- .../{file => ipc}/SeekableReadChannel.java | 2 +- .../vector/{file => ipc}/WriteChannel.java | 4 +- .../{file => ipc/message}/ArrowBlock.java | 3 +- .../{schema => ipc/message}/ArrowBuffer.java | 2 +- .../message}/ArrowDictionaryBatch.java | 2 +- .../message}/ArrowFieldNode.java | 2 +- .../{file => ipc/message}/ArrowFooter.java | 5 +- .../{schema => ipc/message}/ArrowMessage.java | 2 +- .../message}/ArrowRecordBatch.java | 8 +- .../message}/ArrowVectorType.java | 2 +- .../message}/FBSerializable.java | 2 +- .../message}/FBSerializables.java | 2 +- .../ipc/message/MessageChannelReader.java | 115 ++++++++++++++ .../vector/ipc/message/MessageReader.java | 65 ++++++++ .../message}/MessageSerializer.java | 91 +++++++---- .../{schema => ipc/message}/TypeLayout.java | 46 +++--- .../{schema => ipc/message}/VectorLayout.java | 23 ++- .../vector/stream/ArrowStreamReader.java | 66 -------- .../apache/arrow/vector/types/pojo/Field.java | 8 +- .../apache/arrow/vector/TestValueVector.java | 6 +- .../arrow/vector/TestVectorUnloadLoad.java | 4 +- .../vector/{file => ipc}/BaseFileTest.java | 2 +- .../MessageSerializerTest.java | 13 +- .../vector/{file => ipc}/TestArrowFile.java | 61 +++---- .../vector/{file => ipc}/TestArrowFooter.java | 4 +- .../{file => ipc}/TestArrowReaderWriter.java | 21 ++- .../vector/{file => ipc}/TestArrowStream.java | 15 +- .../{file => ipc}/TestArrowStreamPipe.java | 50 +++--- .../{file/json => ipc}/TestJSONFile.java | 3 +- 61 files changed, 698 insertions(+), 408 deletions(-) rename java/vector/src/main/java/org/apache/arrow/vector/{file => ipc}/ArrowFileReader.java (77%) rename java/vector/src/main/java/org/apache/arrow/vector/{file => ipc}/ArrowFileWriter.java (94%) rename java/vector/src/main/java/org/apache/arrow/vector/{file => ipc}/ArrowMagic.java (93%) rename java/vector/src/main/java/org/apache/arrow/vector/{file => ipc}/ArrowReader.java (65%) create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java rename java/vector/src/main/java/org/apache/arrow/vector/{stream => ipc}/ArrowStreamWriter.java (84%) rename java/vector/src/main/java/org/apache/arrow/vector/{file => ipc}/ArrowWriter.java (95%) rename java/vector/src/main/java/org/apache/arrow/vector/{file => ipc}/InvalidArrowFileException.java (96%) rename java/vector/src/main/java/org/apache/arrow/vector/{file/json => ipc}/JsonFileReader.java (98%) rename java/vector/src/main/java/org/apache/arrow/vector/{file/json => ipc}/JsonFileWriter.java (98%) rename java/vector/src/main/java/org/apache/arrow/vector/{file => ipc}/ReadChannel.java (98%) rename java/vector/src/main/java/org/apache/arrow/vector/{file => ipc}/SeekableReadChannel.java (97%) rename java/vector/src/main/java/org/apache/arrow/vector/{file => ipc}/WriteChannel.java (97%) rename java/vector/src/main/java/org/apache/arrow/vector/{file => ipc/message}/ArrowBlock.java (96%) rename java/vector/src/main/java/org/apache/arrow/vector/{schema => ipc/message}/ArrowBuffer.java (97%) rename java/vector/src/main/java/org/apache/arrow/vector/{schema => ipc/message}/ArrowDictionaryBatch.java (97%) rename java/vector/src/main/java/org/apache/arrow/vector/{schema => ipc/message}/ArrowFieldNode.java (97%) rename java/vector/src/main/java/org/apache/arrow/vector/{file => ipc/message}/ArrowFooter.java (96%) rename java/vector/src/main/java/org/apache/arrow/vector/{schema => ipc/message}/ArrowMessage.java (96%) rename java/vector/src/main/java/org/apache/arrow/vector/{schema => ipc/message}/ArrowRecordBatch.java (94%) rename java/vector/src/main/java/org/apache/arrow/vector/{schema => ipc/message}/ArrowVectorType.java (98%) rename java/vector/src/main/java/org/apache/arrow/vector/{schema => ipc/message}/FBSerializable.java (95%) rename java/vector/src/main/java/org/apache/arrow/vector/{schema => ipc/message}/FBSerializables.java (96%) create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageReader.java rename java/vector/src/main/java/org/apache/arrow/vector/{stream => ipc/message}/MessageSerializer.java (86%) rename java/vector/src/main/java/org/apache/arrow/vector/{schema => ipc/message}/TypeLayout.java (80%) rename java/vector/src/main/java/org/apache/arrow/vector/{schema => ipc/message}/VectorLayout.java (89%) delete mode 100644 java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java rename java/vector/src/test/java/org/apache/arrow/vector/{file => ipc}/BaseFileTest.java (99%) rename java/vector/src/test/java/org/apache/arrow/vector/{stream => ipc}/MessageSerializerTest.java (94%) rename java/vector/src/test/java/org/apache/arrow/vector/{file => ipc}/TestArrowFile.java (94%) rename java/vector/src/test/java/org/apache/arrow/vector/{file => ipc}/TestArrowFooter.java (93%) rename java/vector/src/test/java/org/apache/arrow/vector/{file => ipc}/TestArrowReaderWriter.java (85%) rename java/vector/src/test/java/org/apache/arrow/vector/{file => ipc}/TestArrowStream.java (88%) rename java/vector/src/test/java/org/apache/arrow/vector/{file => ipc}/TestArrowStreamPipe.java (78%) rename java/vector/src/test/java/org/apache/arrow/vector/{file/json => ipc}/TestJSONFile.java (99%) diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java index 3091bc4dab1..ce6b5164a8c 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java @@ -23,8 +23,8 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.stream.ArrowStreamReader; -import org.apache.arrow.vector.stream.ArrowStreamWriter; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java index ab8fa6e45ce..6e45305bf6c 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java @@ -22,8 +22,8 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.file.ArrowFileReader; -import org.apache.arrow.vector.file.ArrowFileWriter; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowFileWriter; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java index 6722b30fa7f..3db01f40c59 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java @@ -21,8 +21,8 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.file.ArrowFileReader; -import org.apache.arrow.vector.stream.ArrowStreamWriter; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; import java.io.File; import java.io.FileInputStream; diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java index d2b35e65a81..666f1ddeabc 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java @@ -22,11 +22,11 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.file.ArrowBlock; -import org.apache.arrow.vector.file.ArrowFileReader; -import org.apache.arrow.vector.file.ArrowFileWriter; -import org.apache.arrow.vector.file.json.JsonFileReader; -import org.apache.arrow.vector.file.json.JsonFileWriter; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.ipc.JsonFileReader; +import org.apache.arrow.vector.ipc.JsonFileWriter; import org.apache.arrow.vector.types.pojo.DictionaryEncoding; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java index ef1a11f6bfa..42d336af9b0 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java @@ -21,8 +21,8 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.file.ArrowFileWriter; -import org.apache.arrow.vector.stream.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.ipc.ArrowStreamReader; import java.io.File; import java.io.FileInputStream; diff --git a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java index c56a5a3303b..eac517d96bd 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java @@ -28,9 +28,9 @@ import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; import org.apache.arrow.vector.complex.writer.BigIntWriter; import org.apache.arrow.vector.complex.writer.IntWriter; -import org.apache.arrow.vector.file.ArrowBlock; -import org.apache.arrow.vector.file.ArrowFileReader; -import org.apache.arrow.vector.file.ArrowFileWriter; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowFileWriter; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.Assert; diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java index 89714e44791..d8693c59615 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java @@ -44,8 +44,8 @@ import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider; -import org.apache.arrow.vector.stream.ArrowStreamReader; -import org.apache.arrow.vector.stream.ArrowStreamWriter; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.ArrowType.Int; diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java index e44edbd47b6..73165315ea5 100644 --- a/java/vector/src/main/codegen/templates/UnionVector.java +++ b/java/vector/src/main/codegen/templates/UnionVector.java @@ -32,7 +32,7 @@ import org.apache.arrow.vector.BaseDataValueVector; import org.apache.arrow.vector.complex.impl.ComplexCopier; import org.apache.arrow.vector.util.CallBack; -import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.vector.BaseValueVector; import org.apache.arrow.vector.util.OversizedAllocationException; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java index 38524ff8adf..6d9eb1db03a 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import io.netty.buffer.ArrowBuf; import org.apache.arrow.vector.util.CallBack; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java index 209758e4ece..f82077f692f 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java @@ -28,7 +28,7 @@ import org.apache.arrow.memory.OutOfMemoryException; import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.util.CallBack; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java index edf4987de57..b9e5442ecf6 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java @@ -25,7 +25,7 @@ import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.complex.NullableMapVector; -import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.util.CallBack; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java index c6d404e1543..26c81700883 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java @@ -24,7 +24,7 @@ import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.holders.BitHolder; import org.apache.arrow.vector.holders.NullableBitHolder; -import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.OversizedAllocationException; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java index 23252ca697b..2d4db85c583 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java @@ -20,7 +20,7 @@ import io.netty.buffer.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; /** * Helper class for performing generic operations on a bit vector buffer. diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java b/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java index a0dbf2bdcf1..332ca228a43 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java @@ -18,7 +18,7 @@ package org.apache.arrow.vector; -import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import io.netty.buffer.ArrowBuf; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java index c2ed17eb4dd..509eeda7500 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java @@ -20,7 +20,7 @@ import java.util.List; -import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.types.pojo.Field; import io.netty.buffer.ArrowBuf; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 58fc80bbba1..2cd4099c669 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -24,9 +24,9 @@ import java.util.Iterator; import java.util.List; -import org.apache.arrow.vector.schema.ArrowFieldNode; -import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.schema.VectorLayout; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.VectorLayout; import org.apache.arrow.vector.types.pojo.Field; import com.google.common.collect.Iterators; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java index fd9677312ed..2b034894ab1 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java @@ -22,9 +22,9 @@ import java.util.List; import io.netty.buffer.ArrowBuf; -import org.apache.arrow.vector.schema.ArrowFieldNode; -import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.schema.ArrowVectorType; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.ArrowVectorType; public class VectorUnloader { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java index 3cc93a2a34f..0ab3a7b6843 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java @@ -28,7 +28,7 @@ import org.apache.arrow.memory.OutOfMemoryException; import org.apache.arrow.vector.complex.impl.NullReader; import org.apache.arrow.vector.complex.reader.FieldReader; -import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType.Null; import org.apache.arrow.vector.types.pojo.Field; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java index 6713b1c7871..774a10dbfb9 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java @@ -33,7 +33,7 @@ import org.apache.arrow.memory.OutOfMemoryException; import org.apache.arrow.vector.*; import org.apache.arrow.vector.complex.impl.UnionFixedSizeListReader; -import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.DictionaryEncoding; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index afe86a692c3..d50d4c447fe 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -42,7 +42,7 @@ import org.apache.arrow.vector.complex.impl.UnionListWriter; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.FieldWriter; -import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.DictionaryEncoding; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java index f95302f55f8..e223d1ce674 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java @@ -34,7 +34,7 @@ import org.apache.arrow.vector.complex.impl.NullableMapReaderImpl; import org.apache.arrow.vector.complex.impl.NullableMapWriter; import org.apache.arrow.vector.holders.ComplexHolder; -import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.ArrowType.Struct; import org.apache.arrow.vector.types.pojo.DictionaryEncoding; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java similarity index 77% rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java index d711b9c6c1e..4cd70262261 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; import java.io.IOException; import java.nio.ByteBuffer; @@ -26,32 +26,45 @@ import org.apache.arrow.flatbuf.Footer; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.schema.ArrowDictionaryBatch; -import org.apache.arrow.vector.schema.ArrowMessage; -import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.stream.MessageSerializer; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.ipc.message.ArrowFooter; +import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.arrow.vector.types.pojo.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ArrowFileReader extends ArrowReader { +public class ArrowFileReader extends ArrowReader { private static final Logger LOGGER = LoggerFactory.getLogger(ArrowFileReader.class); + private SeekableReadChannel in; private ArrowFooter footer; private int currentDictionaryBatch = 0; private int currentRecordBatch = 0; + public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) { + super(allocator); + this.in = in; + } + public ArrowFileReader(SeekableByteChannel in, BufferAllocator allocator) { - super(new SeekableReadChannel(in), allocator); + this(new SeekableReadChannel(in), allocator); } - public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) { - super(in, allocator); + @Override + public long bytesRead() { + return in.bytesRead(); + } + + @Override + protected void closeReadSource() throws IOException { + in.close(); } @Override - protected Schema readSchema(SeekableReadChannel in) throws IOException { + protected Schema readSchema() throws IOException { if (footer == null) { if (in.size() <= (ArrowMagic.MAGIC_LENGTH * 2 + 4)) { throw new InvalidArrowFileException("file too small: " + in.size()); @@ -82,18 +95,30 @@ protected Schema readSchema(SeekableReadChannel in) throws IOException { } @Override - protected ArrowMessage readMessage(SeekableReadChannel in, BufferAllocator allocator) throws IOException { - if (currentDictionaryBatch < footer.getDictionaries().size()) { - ArrowBlock block = footer.getDictionaries().get(currentDictionaryBatch++); - return readDictionaryBatch(in, block, allocator); - } else if (currentRecordBatch < footer.getRecordBatches().size()) { + public ArrowDictionaryBatch readDictionary() throws IOException { + if (currentDictionaryBatch >= footer.getDictionaries().size()) { + throw new IOException("Requested more dictionaries than defined in footer: " + currentDictionaryBatch); + } + ArrowBlock block = footer.getDictionaries().get(currentDictionaryBatch++); + return readDictionaryBatch(in, block, allocator); + } + + // Returns true if a batch was read, false if no more batches + @Override + public boolean loadNextBatch() throws IOException { + prepareLoadNextBatch(); + + if (currentRecordBatch < footer.getRecordBatches().size()) { ArrowBlock block = footer.getRecordBatches().get(currentRecordBatch++); - return readRecordBatch(in, block, allocator); + ArrowRecordBatch batch = readRecordBatch(in, block, allocator); + loadRecordBatch(batch); + return true; } else { - return null; + return false; } } + public List getDictionaryBlocks() throws IOException { ensureInitialized(); return footer.getDictionaries(); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java similarity index 94% rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java index 1d92d2bde1c..1b687c9f269 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; import java.io.IOException; import java.nio.channels.WritableByteChannel; @@ -24,6 +24,8 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.ipc.message.ArrowFooter; import org.apache.arrow.vector.types.pojo.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowMagic.java similarity index 93% rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowMagic.java index 68313e7878b..a9310a608ae 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowMagic.java @@ -16,7 +16,9 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; + +import org.apache.arrow.vector.ipc.WriteChannel; import java.io.IOException; import java.nio.charset.StandardCharsets; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java similarity index 65% rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java index 21fb2207eb0..6d708a03cad 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; import java.io.IOException; import java.util.ArrayList; @@ -33,32 +33,25 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.dictionary.DictionaryProvider; -import org.apache.arrow.vector.schema.ArrowDictionaryBatch; -import org.apache.arrow.vector.schema.ArrowMessage; -import org.apache.arrow.vector.schema.ArrowMessage.ArrowMessageVisitor; -import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.DictionaryUtility; /** - * Abstract class to read ArrowRecordBatches from a ReadChannel. + * Abstract class to read Schema and ArrowRecordBatches. * - * @param Type of ReadChannel to use */ -public abstract class ArrowReader implements DictionaryProvider, AutoCloseable { - - private final T in; - private final BufferAllocator allocator; +public abstract class ArrowReader implements DictionaryProvider, AutoCloseable { + protected final BufferAllocator allocator; private VectorLoader loader; private VectorSchemaRoot root; private Map dictionaries; - private boolean initialized = false; - protected ArrowReader(T in, BufferAllocator allocator) { - this.in = in; + protected ArrowReader(BufferAllocator allocator) { this.allocator = allocator; } @@ -105,58 +98,18 @@ public Dictionary lookup(long id) { * @return true if a batch was read, false on EOS * @throws IOException */ - public boolean loadNextBatch() throws IOException { - ensureInitialized(); - // read in all dictionary batches, then stop after our first record batch - ArrowMessageVisitor visitor = new ArrowMessageVisitor() { - @Override - public Boolean visit(ArrowDictionaryBatch message) { - try { - load(message); - } finally { - message.close(); - } - return true; - } - - @Override - public Boolean visit(ArrowRecordBatch message) { - try { - loader.load(message); - } finally { - message.close(); - } - return false; - } - }; - root.setRowCount(0); - ArrowMessage message = readMessage(in, allocator); - - boolean readBatch = false; - while (message != null) { - if (!message.accepts(visitor)) { - readBatch = true; - break; - } - // else read a dictionary - message = readMessage(in, allocator); - } - - return readBatch; - } + public abstract boolean loadNextBatch() throws IOException; /** * Return the number of bytes read from the ReadChannel. * * @return number of bytes read */ - public long bytesRead() { - return in.bytesRead(); - } + public abstract long bytesRead(); /** * Close resources, including vector schema root and dictionary vectors, and the - * underlying ReadChannel. + * underlying read source. * * @throws IOException */ @@ -167,12 +120,12 @@ public void close() throws IOException { /** * Close resources, including vector schema root and dictionary vectors. If the flag - * closeReadChannel is true then close the underlying ReadChannel, otherwise leave it open. + * closeReadChannel is true then close the underlying read source, otherwise leave it open. * - * @param closeReadChannel Flag to control if closing the underlying ReadChannel + * @param closeReadSource Flag to control if closing the underlying read source * @throws IOException */ - public void close(boolean closeReadChannel) throws IOException { + public void close(boolean closeReadSource) throws IOException { if (initialized) { root.close(); for (Dictionary dictionary : dictionaries.values()) { @@ -180,15 +133,40 @@ public void close(boolean closeReadChannel) throws IOException { } } - if (closeReadChannel) { - in.close(); + if (closeReadSource) { + closeReadSource(); } } - protected abstract Schema readSchema(T in) throws IOException; + /** + * Close the underlying read source. + * + * @throws IOException + */ + protected abstract void closeReadSource() throws IOException; + + /** + * Read the Schema from the source, will be invoked at the beginning the initialization. + * + * @return the read Schema + * @throws IOException + */ + protected abstract Schema readSchema() throws IOException; - protected abstract ArrowMessage readMessage(T in, BufferAllocator allocator) throws IOException; + /** + * Read a dictionary batch from the source, will be invoked after the schema has been read and + * called N times, where N is the number of dictionaries indicated by the schema Fields. + * + * @return the read ArrowDictionaryBatch + * @throws IOException + */ + protected abstract ArrowDictionaryBatch readDictionary() throws IOException; + /** + * Initialize if not done previously. + * + * @throws IOException + */ protected void ensureInitialized() throws IOException { if (!initialized) { initialize(); @@ -200,7 +178,7 @@ protected void ensureInitialized() throws IOException { * Reads the schema and initializes the vectors */ private void initialize() throws IOException { - Schema originalSchema = readSchema(in); + Schema originalSchema = readSchema(); List fields = new ArrayList<>(); List vectors = new ArrayList<>(); Map dictionaries = new HashMap<>(); @@ -216,9 +194,43 @@ private void initialize() throws IOException { this.root = new VectorSchemaRoot(schema, vectors, 0); this.loader = new VectorLoader(root); this.dictionaries = Collections.unmodifiableMap(dictionaries); + + // Read and load all dictionaries from schema + for (int i = 0; i < dictionaries.size(); i++) { + ArrowDictionaryBatch dictionaryBatch = readDictionary(); + loadDictionary(dictionaryBatch); + } } - private void load(ArrowDictionaryBatch dictionaryBatch) { + /** + * Ensure the reader has been initialized and reset the VectorSchemaRoot row count to 0. + * + * @throws IOException + */ + protected void prepareLoadNextBatch() throws IOException { + ensureInitialized(); + root.setRowCount(0); + } + + /** + * Load an ArrowRecordBatch to the readers VectorSchemaRoot. + * + * @param batch the record batch to load + */ + protected void loadRecordBatch(ArrowRecordBatch batch) { + try { + loader.load(batch); + } finally { + batch.close(); + } + } + + /** + * Load an ArrowDictionaryBatch to the readers dictionary vectors. + * + * @param dictionaryBatch + */ + protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) { long id = dictionaryBatch.getDictionaryId(); Dictionary dictionary = dictionaries.get(id); if (dictionary == null) { @@ -227,6 +239,10 @@ private void load(ArrowDictionaryBatch dictionaryBatch) { FieldVector vector = dictionary.getVector(); VectorSchemaRoot root = new VectorSchemaRoot(ImmutableList.of(vector.getField()), ImmutableList.of(vector), 0); VectorLoader loader = new VectorLoader(root); - loader.load(dictionaryBatch.getDictionary()); + try { + loader.load(dictionaryBatch.getDictionary()); + } finally { + dictionaryBatch.close(); + } } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java new file mode 100644 index 00000000000..d1e48021885 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.ipc; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; + +import org.apache.arrow.flatbuf.Message; +import org.apache.arrow.flatbuf.MessageHeader; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.MessageChannelReader; +import org.apache.arrow.vector.ipc.message.MessageReader; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * This classes reads from an input stream and produces ArrowRecordBatches. + */ +public class ArrowStreamReader extends ArrowReader { + + private MessageReader messageReader; + + /** + * Constructs a streaming reader using the MessageReader interface. Non-blocking. + * + * @param messageReader interface to get read messages + * @param allocator to allocate new buffers + */ + public ArrowStreamReader(MessageReader messageReader, BufferAllocator allocator) { + super(allocator); + this.messageReader = messageReader; + } + + /** + * Constructs a streaming reader from a ReadableByteChannel input. Non-blocking. + * + * @param in ReadableByteChannel to read messages from + * @param allocator to allocate new buffers + */ + public ArrowStreamReader(ReadableByteChannel in, BufferAllocator allocator) { + this(new MessageChannelReader(new ReadChannel(in)), allocator); + } + + /** + * Constructs a streaming reader from an InputStream. Non-blocking. + * + * @param in InputStream to read messages from + * @param allocator to allocate new buffers + */ + public ArrowStreamReader(InputStream in, BufferAllocator allocator) { + this(Channels.newChannel(in), allocator); + } + + /** + * Get the number of bytes read from the stream since constructing the reader. + * + * @return number of bytes + */ + @Override + public long bytesRead() { + return messageReader.bytesRead(); + } + + /** + * Closes the underlying read source. + * + * @throws IOException + */ + @Override + protected void closeReadSource() throws IOException { + messageReader.close(); + } + + /** + * Load the next ArrowRecordBatch to the vector schema root if available. + * + * @return true if a batch was read, false on EOS + * @throws IOException + */ + public boolean loadNextBatch() throws IOException { + prepareLoadNextBatch(); + + Message message = messageReader.readNextMessage(); + + // Reached EOS + if (message == null) { + return false; + } + + if (message.headerType() != MessageHeader.RecordBatch) { + throw new IOException("Expected RecordBatch but header was " + message.headerType()); + } + + ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(messageReader, message, allocator); + loadRecordBatch(batch); + return true; + } + + /** + * Reads the schema message from the beginning of the stream. + * + * @return the deserialized arrow schema + */ + @Override + protected Schema readSchema() throws IOException { + return MessageSerializer.deserializeSchema(messageReader); + } + + /** + * Read a dictionary batch message, will be invoked after the schema and before normal record + * batches are read. + * + * @return the deserialized dictionary batch + * @throws IOException + */ + @Override + protected ArrowDictionaryBatch readDictionary() throws IOException { + Message message = messageReader.readNextMessage(); + + if (message.headerType() != MessageHeader.DictionaryBatch) { + throw new IOException("Expected DictionaryBatch but header was " + message.headerType()); + } + + return MessageSerializer.deserializeDictionaryBatch(messageReader, message, allocator); + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java similarity index 84% rename from java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java index b854cd2bb6e..d731d05b81f 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java @@ -16,16 +16,13 @@ * limitations under the License. */ -package org.apache.arrow.vector.stream; +package org.apache.arrow.vector.ipc; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.dictionary.DictionaryProvider; -import org.apache.arrow.vector.file.ArrowBlock; -import org.apache.arrow.vector.file.ArrowWriter; -import org.apache.arrow.vector.file.WriteChannel; -import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.ipc.ArrowWriter; +import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.types.pojo.Schema; import java.io.IOException; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java similarity index 95% rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java index 7dc10b5e629..4b483d01050 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; import java.io.IOException; import java.nio.channels.WritableByteChannel; @@ -30,9 +30,10 @@ import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.dictionary.DictionaryProvider; -import org.apache.arrow.vector.schema.ArrowDictionaryBatch; -import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.stream.MessageSerializer; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.DictionaryUtility; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/InvalidArrowFileException.java similarity index 96% rename from java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/InvalidArrowFileException.java index 607207f41b0..ad9d8776e33 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/InvalidArrowFileException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; public class InvalidArrowFileException extends RuntimeException { private static final long serialVersionUID = 1L; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java similarity index 98% rename from java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java index 8017b385d14..cb11a253058 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java @@ -16,19 +16,18 @@ * limitations under the License. ******************************************************************************/ -package org.apache.arrow.vector.file.json; +package org.apache.arrow.vector.ipc; import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; import static com.fasterxml.jackson.core.JsonToken.END_OBJECT; import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.arrow.vector.schema.ArrowVectorType.*; +import static org.apache.arrow.vector.ipc.message.ArrowVectorType.*; import java.io.File; import java.io.IOException; import java.math.BigDecimal; -import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -42,9 +41,8 @@ import org.apache.arrow.vector.*; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.dictionary.DictionaryProvider; -import org.apache.arrow.vector.file.InvalidArrowFileException; -import org.apache.arrow.vector.schema.ArrowFieldNode; -import org.apache.arrow.vector.schema.ArrowVectorType; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowVectorType; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java similarity index 98% rename from java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java index 0c8507b51f5..22423b844e5 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java @@ -16,9 +16,9 @@ * limitations under the License. ******************************************************************************/ -package org.apache.arrow.vector.file.json; +package org.apache.arrow.vector.ipc; -import static org.apache.arrow.vector.schema.ArrowVectorType.*; +import static org.apache.arrow.vector.ipc.message.ArrowVectorType.*; import java.io.File; import java.io.IOException; @@ -33,7 +33,7 @@ import org.apache.arrow.vector.*; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.dictionary.DictionaryProvider; -import org.apache.arrow.vector.schema.ArrowVectorType; +import org.apache.arrow.vector.ipc.message.ArrowVectorType; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java similarity index 98% rename from java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java index b0eb8f3d84d..395fd7db597 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; import java.io.IOException; import java.nio.ByteBuffer; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/SeekableReadChannel.java similarity index 97% rename from java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/SeekableReadChannel.java index 46bea1314da..62ba3b73e53 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/SeekableReadChannel.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; import java.io.IOException; import java.nio.channels.SeekableByteChannel; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java similarity index 97% rename from java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java index 89c9d1f9b7a..da500aa97be 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; import java.io.IOException; import java.nio.ByteBuffer; @@ -25,7 +25,7 @@ import com.google.flatbuffers.FlatBufferBuilder; import io.netty.buffer.ArrowBuf; -import org.apache.arrow.vector.schema.FBSerializable; +import org.apache.arrow.vector.ipc.message.FBSerializable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBlock.java similarity index 96% rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBlock.java index e1b4d6a8b21..8731f77ac2c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBlock.java @@ -16,10 +16,9 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc.message; import org.apache.arrow.flatbuf.Block; -import org.apache.arrow.vector.schema.FBSerializable; import com.google.flatbuffers.FlatBufferBuilder; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBuffer.java similarity index 97% rename from java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBuffer.java index 4e0187e791b..6b0eeaad4d1 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBuffer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.schema; +package org.apache.arrow.vector.ipc.message; import org.apache.arrow.flatbuf.Buffer; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java similarity index 97% rename from java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java index 635fa3fb423..cd23cb96b6b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.schema; +package org.apache.arrow.vector.ipc.message; import com.google.flatbuffers.FlatBufferBuilder; import org.apache.arrow.flatbuf.DictionaryBatch; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFieldNode.java similarity index 97% rename from java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFieldNode.java index 3ed384ed7e2..ca0087f7089 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFieldNode.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.schema; +package org.apache.arrow.vector.ipc.message; import org.apache.arrow.flatbuf.FieldNode; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFooter.java similarity index 96% rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFooter.java index 1e95321fdec..f7794f7364b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFooter.java @@ -16,16 +16,15 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc.message; -import static org.apache.arrow.vector.schema.FBSerializables.writeAllStructsToVector; +import static org.apache.arrow.vector.ipc.message.FBSerializables.writeAllStructsToVector; import java.util.ArrayList; import java.util.List; import org.apache.arrow.flatbuf.Block; import org.apache.arrow.flatbuf.Footer; -import org.apache.arrow.vector.schema.FBSerializable; import org.apache.arrow.vector.types.pojo.Schema; import com.google.flatbuffers.FlatBufferBuilder; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java similarity index 96% rename from java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java index f59b4b6c172..92fb58e16fe 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.schema; +package org.apache.arrow.vector.ipc.message; public interface ArrowMessage extends FBSerializable, AutoCloseable { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java similarity index 94% rename from java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index bf0967a2797..6c6481e74dd 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -16,9 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.schema; - -import static org.apache.arrow.vector.schema.FBSerializables.writeAllStructsToVector; +package org.apache.arrow.vector.ipc.message; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -113,9 +111,9 @@ public List getBuffersLayout() { @Override public int writeTo(FlatBufferBuilder builder) { RecordBatch.startNodesVector(builder, nodes.size()); - int nodesOffset = writeAllStructsToVector(builder, nodes); + int nodesOffset = FBSerializables.writeAllStructsToVector(builder, nodes); RecordBatch.startBuffersVector(builder, buffers.size()); - int buffersOffset = writeAllStructsToVector(builder, buffersLayout); + int buffersOffset = FBSerializables.writeAllStructsToVector(builder, buffersLayout); RecordBatch.startRecordBatch(builder); RecordBatch.addLength(builder, length); RecordBatch.addNodes(builder, nodesOffset); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowVectorType.java similarity index 98% rename from java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowVectorType.java index 9d2fdfaafe4..3342652bedd 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowVectorType.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.schema; +package org.apache.arrow.vector.ipc.message; import java.util.Map; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializable.java similarity index 95% rename from java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializable.java index 91d60ea995b..31f55bd522c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializable.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.schema; +package org.apache.arrow.vector.ipc.message; import com.google.flatbuffers.FlatBufferBuilder; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializables.java similarity index 96% rename from java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializables.java index ae5aa555e74..6717ed7ab31 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializables.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.schema; +package org.apache.arrow.vector.ipc.message; import java.util.ArrayList; import java.util.Collections; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java new file mode 100644 index 00000000000..5bc3e1fff6f --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.ipc.message; + + +import io.netty.buffer.ArrowBuf; +import org.apache.arrow.flatbuf.Message; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ReadChannel; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Reads a sequence of messages using a ReadChannel. + */ +public class MessageChannelReader implements MessageReader { + + private ReadChannel in; + + /** + * Construct from an existing ReadChannel. + * + * @param in Channel to read messages from + */ + public MessageChannelReader(ReadChannel in) { + this.in = in; + } + + /** + * Read the next message from the ReadChannel. + * + * @return A Message or null if ReadChannel has no more messages, indicated by message length of 0 + * @throws IOException + */ + @Override + public Message readNextMessage() throws IOException { + // Read the message size. There is an i32 little endian prefix. + ByteBuffer buffer = ByteBuffer.allocate(4); + if (in.readFully(buffer) != 4) { + return null; + } + int messageLength = MessageSerializer.bytesToInt(buffer.array()); + if (messageLength == 0) { + return null; + } + + buffer = ByteBuffer.allocate(messageLength); + if (in.readFully(buffer) != messageLength) { + throw new IOException( + "Unexpected end of stream trying to read message."); + } + buffer.rewind(); + + return Message.getRootAsMessage(buffer); + } + + /** + * Read a message body from the ReadChannel. + * + * @param message Read message that is followed by a body of data + * @param allocator BufferAllocator to allocate memory for body data + * @return ArrowBuf containing the message body data + * @throws IOException + */ + @Override + public ArrowBuf readMessageBody(Message message, BufferAllocator allocator) throws IOException { + + int bodyLength = (int) message.bodyLength(); + + // Now read the record batch body + ArrowBuf buffer = allocator.buffer(bodyLength); + if (in.readFully(buffer, bodyLength) != bodyLength) { + throw new IOException("Unexpected end of input trying to read batch."); + } + + return buffer; + } + + /** + * Get the number of bytes read from the ReadChannel. + * + * @return number of bytes + */ + @Override + public long bytesRead() { + return in.bytesRead(); + } + + /** + * Close the ReadChannel. + * + * @throws IOException + */ + @Override + public void close() throws IOException { + in.close(); + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageReader.java new file mode 100644 index 00000000000..b277c582950 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageReader.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.ipc.message; + + +import io.netty.buffer.ArrowBuf; +import org.apache.arrow.flatbuf.Message; +import org.apache.arrow.memory.BufferAllocator; + +import java.io.IOException; + +/** + * Interface for reading a sequence of messages. + */ +public interface MessageReader { + + /** + * Read the next message in the sequence. + * + * @return The read message or null if reached the end of the message sequence + * @throws IOException + */ + Message readNextMessage() throws IOException; + + /** + * When a message is followed by a body of data, read that data into an ArrowBuf. This should + * only be called when a Message has a body length > 0. + * + * @param message Read message that is followed by a body of data + * @param allocator BufferAllocator to allocate memory for body data + * @return An ArrowBuf containing the body of the message that was read + * @throws IOException + */ + ArrowBuf readMessageBody(Message message, BufferAllocator allocator) throws IOException; + + /** + * Return the current number of bytes that have been read. + * + * @return number of bytes read + */ + long bytesRead(); + + /** + * Close any resource opened by the message reader, not including message body allocations. + * + * @throws IOException + */ + void close() throws IOException; +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java similarity index 86% rename from java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index c397cec72f0..e2f8f7d9a8d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.stream; +package org.apache.arrow.vector.ipc.message; import java.io.IOException; import java.nio.ByteBuffer; @@ -31,14 +31,8 @@ import org.apache.arrow.flatbuf.MetadataVersion; import org.apache.arrow.flatbuf.RecordBatch; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.file.ArrowBlock; -import org.apache.arrow.vector.file.ReadChannel; -import org.apache.arrow.vector.file.WriteChannel; -import org.apache.arrow.vector.schema.ArrowBuffer; -import org.apache.arrow.vector.schema.ArrowDictionaryBatch; -import org.apache.arrow.vector.schema.ArrowFieldNode; -import org.apache.arrow.vector.schema.ArrowMessage; -import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.types.pojo.Schema; import com.google.flatbuffers.FlatBufferBuilder; @@ -102,12 +96,12 @@ public static long serialize(WriteChannel out, Schema schema) throws IOException /** * Deserializes a schema object. Format is from serialize(). * - * @param in the channel to deserialize from + * @param reader the reader interface to deserialize from * @return the deserialized object * @throws IOException if something went wrong */ - public static Schema deserializeSchema(ReadChannel in) throws IOException { - Message message = deserializeMessage(in); + public static Schema deserializeSchema(MessageReader reader) throws IOException { + Message message = reader.readNextMessage(); if (message == null) { throw new IOException("Unexpected end of input. Missing schema."); } @@ -119,6 +113,16 @@ public static Schema deserializeSchema(ReadChannel in) throws IOException { message.header(new org.apache.arrow.flatbuf.Schema())); } + /** + * Deserializes a schema object. Format is from serialize(). + * + * @param in the channel to deserialize from + * @return the deserialized object + * @throws IOException if something went wrong + */ + public static Schema deserializeSchema(ReadChannel in) throws IOException { + return deserializeSchema(new MessageChannelReader(in)); + } /** * Serializes an ArrowRecordBatch. Returns the offset and length of the written batch. @@ -184,25 +188,20 @@ public static long writeBatchBuffers(WriteChannel out, ArrowRecordBatch batch) t } /** - * Deserializes a RecordBatch + * Deserializes a RecordBatch. * - * @param in the channel to deserialize from + * @param reader the reader interface to deserialize from * @param message the object to derialize to * @param alloc to allocate buffers * @return the deserialized object * @throws IOException if something went wrong */ - public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, Message message, BufferAllocator alloc) + public static ArrowRecordBatch deserializeRecordBatch(MessageReader reader, Message message, BufferAllocator alloc) throws IOException { RecordBatch recordBatchFB = (RecordBatch) message.header(new RecordBatch()); - int bodyLength = (int) message.bodyLength(); - // Now read the record batch body - ArrowBuf buffer = alloc.buffer(bodyLength); - if (in.readFully(buffer, bodyLength) != bodyLength) { - throw new IOException("Unexpected end of input trying to read batch."); - } + ArrowBuf buffer = reader.readMessageBody(message, alloc); return deserializeRecordBatch(recordBatchFB, buffer); } @@ -243,7 +242,14 @@ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock return deserializeRecordBatch(recordBatchFB, body); } - // Deserializes a record batch given the Flatbuffer metadata and in-memory body + /** + * Deserializes a record batch given the Flatbuffer metadata and in-memory body. + * + * @param recordBatchFB Deserialized FlatBuffer record batch + * @param body Read body of the record batch + * @return ArrowRecordBatch from metadata and in-memory body + * @throws IOException + */ public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, ArrowBuf body) throws IOException { // Now read the body @@ -314,26 +320,21 @@ public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch) } /** - * Deserializes a DictionaryBatch + * Deserializes a DictionaryBatch. * - * @param in where to read from + * @param reader where to read from * @param message the message message metadata to deserialize * @param alloc the allocator for new buffers * @return the corresponding dictionary batch * @throws IOException if something went wrong */ - public static ArrowDictionaryBatch deserializeDictionaryBatch(ReadChannel in, + public static ArrowDictionaryBatch deserializeDictionaryBatch(MessageReader reader, Message message, BufferAllocator alloc) throws IOException { DictionaryBatch dictionaryBatchFB = (DictionaryBatch) message.header(new DictionaryBatch()); - int bodyLength = (int) message.bodyLength(); - // Now read the record batch body - ArrowBuf body = alloc.buffer(bodyLength); - if (in.readFully(body, bodyLength) != bodyLength) { - throw new IOException("Unexpected end of input trying to read batch."); - } + ArrowBuf body = reader.readMessageBody(message, alloc); ArrowRecordBatch recordBatch = deserializeRecordBatch(dictionaryBatchFB.data(), body); return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch); } @@ -377,8 +378,16 @@ public static ArrowDictionaryBatch deserializeDictionaryBatch(ReadChannel in, return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch); } - public static ArrowMessage deserializeMessageBatch(ReadChannel in, BufferAllocator alloc) throws IOException { - Message message = deserializeMessage(in); + /** + * Deserialize a message that is either an ArrowDictionaryBatch or ArrowRecordBatch. + * + * @param reader Interface to read messages from + * @param alloc Allocator for message data + * @return The deserialized record batch + * @throws IOException if the message is not an ArrowDictionaryBatch or ArrowRecordBatch + */ + public static ArrowMessage deserializeMessageBatch(MessageReader reader, BufferAllocator alloc) throws IOException { + Message message = reader.readNextMessage(); if (message == null) { return null; } else if (message.bodyLength() > Integer.MAX_VALUE) { @@ -391,14 +400,26 @@ public static ArrowMessage deserializeMessageBatch(ReadChannel in, BufferAllocat switch (message.headerType()) { case MessageHeader.RecordBatch: - return deserializeRecordBatch(in, message, alloc); + return deserializeRecordBatch(reader, message, alloc); case MessageHeader.DictionaryBatch: - return deserializeDictionaryBatch(in, message, alloc); + return deserializeDictionaryBatch(reader, message, alloc); default: throw new IOException("Unexpected message header type " + message.headerType()); } } + /** + * Deserialize a message that is either an ArrowDictionaryBatch or ArrowRecordBatch. + * + * @param in ReadChannel to read messages from + * @param alloc Allocator for message data + * @return The deserialized record batch + * @throws IOException if the message is not an ArrowDictionaryBatch or ArrowRecordBatch + */ + public static ArrowMessage deserializeMessageBatch(ReadChannel in, BufferAllocator alloc) throws IOException { + return deserializeMessageBatch(new MessageChannelReader(in), alloc); + } + /** * Serializes a message header. * diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/TypeLayout.java similarity index 80% rename from java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/TypeLayout.java index 29407bf1ab4..06fe9481686 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/TypeLayout.java @@ -16,15 +16,9 @@ * limitations under the License. */ -package org.apache.arrow.vector.schema; +package org.apache.arrow.vector.ipc.message; import static java.util.Arrays.asList; -import static org.apache.arrow.vector.schema.VectorLayout.booleanVector; -import static org.apache.arrow.vector.schema.VectorLayout.byteVector; -import static org.apache.arrow.vector.schema.VectorLayout.dataVector; -import static org.apache.arrow.vector.schema.VectorLayout.offsetVector; -import static org.apache.arrow.vector.schema.VectorLayout.typeVector; -import static org.apache.arrow.vector.schema.VectorLayout.validityVector; import java.util.ArrayList; import java.util.Collections; @@ -64,7 +58,7 @@ public static TypeLayout getTypeLayout(final ArrowType arrowType) { @Override public TypeLayout visit(Int type) { - return newFixedWidthTypeLayout(dataVector(type.getBitWidth())); + return newFixedWidthTypeLayout(VectorLayout.dataVector(type.getBitWidth())); } @Override @@ -74,14 +68,14 @@ public TypeLayout visit(Union type) { case Dense: vectors = asList( // TODO: validate this - validityVector(), - typeVector(), - offsetVector() // offset to find the vector + VectorLayout.validityVector(), + VectorLayout.typeVector(), + VectorLayout.offsetVector() // offset to find the vector ); break; case Sparse: vectors = asList( - typeVector() // type of the value at the index or 0 if null + VectorLayout.typeVector() // type of the value at the index or 0 if null ); break; default: @@ -93,21 +87,21 @@ public TypeLayout visit(Union type) { @Override public TypeLayout visit(Struct type) { List vectors = asList( - validityVector() + VectorLayout.validityVector() ); return new TypeLayout(vectors); } @Override public TypeLayout visit(Timestamp type) { - return newFixedWidthTypeLayout(dataVector(64)); + return newFixedWidthTypeLayout(VectorLayout.dataVector(64)); } @Override public TypeLayout visit(org.apache.arrow.vector.types.pojo.ArrowType.List type) { List vectors = asList( - validityVector(), - offsetVector() + VectorLayout.validityVector(), + VectorLayout.offsetVector() ); return new TypeLayout(vectors); } @@ -115,7 +109,7 @@ public TypeLayout visit(org.apache.arrow.vector.types.pojo.ArrowType.List type) @Override public TypeLayout visit(FixedSizeList type) { List vectors = asList( - validityVector() + VectorLayout.validityVector() ); return new TypeLayout(vectors); } @@ -136,18 +130,18 @@ public TypeLayout visit(FloatingPoint type) { default: throw new UnsupportedOperationException("Unsupported Precision: " + type.getPrecision()); } - return newFixedWidthTypeLayout(dataVector(bitWidth)); + return newFixedWidthTypeLayout(VectorLayout.dataVector(bitWidth)); } @Override public TypeLayout visit(Decimal type) { // TODO: check size - return newFixedWidthTypeLayout(dataVector(64)); // actually depends on the type fields + return newFixedWidthTypeLayout(VectorLayout.dataVector(64)); // actually depends on the type fields } @Override public TypeLayout visit(Bool type) { - return newFixedWidthTypeLayout(booleanVector()); + return newFixedWidthTypeLayout(VectorLayout.booleanVector()); } @Override @@ -161,7 +155,7 @@ public TypeLayout visit(Utf8 type) { } private TypeLayout newVariableWidthTypeLayout() { - return newPrimitiveTypeLayout(validityVector(), offsetVector(), byteVector()); + return newPrimitiveTypeLayout(VectorLayout.validityVector(), VectorLayout.offsetVector(), VectorLayout.byteVector()); } private TypeLayout newPrimitiveTypeLayout(VectorLayout... vectors) { @@ -169,7 +163,7 @@ private TypeLayout newPrimitiveTypeLayout(VectorLayout... vectors) { } public TypeLayout newFixedWidthTypeLayout(VectorLayout dataVector) { - return newPrimitiveTypeLayout(validityVector(), dataVector); + return newPrimitiveTypeLayout(VectorLayout.validityVector(), dataVector); } @Override @@ -179,21 +173,21 @@ public TypeLayout visit(Null type) { @Override public TypeLayout visit(Date type) { - return newFixedWidthTypeLayout(dataVector(64)); + return newFixedWidthTypeLayout(VectorLayout.dataVector(64)); } @Override public TypeLayout visit(Time type) { - return newFixedWidthTypeLayout(dataVector(type.getBitWidth())); + return newFixedWidthTypeLayout(VectorLayout.dataVector(type.getBitWidth())); } @Override public TypeLayout visit(Interval type) { // TODO: check size switch (type.getUnit()) { case DAY_TIME: - return newFixedWidthTypeLayout(dataVector(64)); + return newFixedWidthTypeLayout(VectorLayout.dataVector(64)); case YEAR_MONTH: - return newFixedWidthTypeLayout(dataVector(64)); + return newFixedWidthTypeLayout(VectorLayout.dataVector(64)); default: throw new UnsupportedOperationException("Unknown unit " + type.getUnit()); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/VectorLayout.java similarity index 89% rename from java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/VectorLayout.java index 0871baf38ed..e4f2f98fde3 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/VectorLayout.java @@ -16,12 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.schema; - -import static org.apache.arrow.vector.schema.ArrowVectorType.DATA; -import static org.apache.arrow.vector.schema.ArrowVectorType.OFFSET; -import static org.apache.arrow.vector.schema.ArrowVectorType.TYPE; -import static org.apache.arrow.vector.schema.ArrowVectorType.VALIDITY; +package org.apache.arrow.vector.ipc.message; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -30,14 +25,14 @@ public class VectorLayout implements FBSerializable { - private static final VectorLayout VALIDITY_VECTOR = new VectorLayout(VALIDITY, 1); - private static final VectorLayout OFFSET_VECTOR = new VectorLayout(OFFSET, 32); - private static final VectorLayout TYPE_VECTOR = new VectorLayout(TYPE, 32); - private static final VectorLayout BOOLEAN_VECTOR = new VectorLayout(DATA, 1); - private static final VectorLayout VALUES_64 = new VectorLayout(DATA, 64); - private static final VectorLayout VALUES_32 = new VectorLayout(DATA, 32); - private static final VectorLayout VALUES_16 = new VectorLayout(DATA, 16); - private static final VectorLayout VALUES_8 = new VectorLayout(DATA, 8); + private static final VectorLayout VALIDITY_VECTOR = new VectorLayout(ArrowVectorType.VALIDITY, 1); + private static final VectorLayout OFFSET_VECTOR = new VectorLayout(ArrowVectorType.OFFSET, 32); + private static final VectorLayout TYPE_VECTOR = new VectorLayout(ArrowVectorType.TYPE, 32); + private static final VectorLayout BOOLEAN_VECTOR = new VectorLayout(ArrowVectorType.DATA, 1); + private static final VectorLayout VALUES_64 = new VectorLayout(ArrowVectorType.DATA, 64); + private static final VectorLayout VALUES_32 = new VectorLayout(ArrowVectorType.DATA, 32); + private static final VectorLayout VALUES_16 = new VectorLayout(ArrowVectorType.DATA, 16); + private static final VectorLayout VALUES_8 = new VectorLayout(ArrowVectorType.DATA, 8); public static VectorLayout typeVector() { return TYPE_VECTOR; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java deleted file mode 100644 index 5b6300076b6..00000000000 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.arrow.vector.stream; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; - -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.file.ArrowReader; -import org.apache.arrow.vector.file.ReadChannel; -import org.apache.arrow.vector.schema.ArrowMessage; -import org.apache.arrow.vector.types.pojo.Schema; - -/** - * This classes reads from an input stream and produces ArrowRecordBatches. - */ -public class ArrowStreamReader extends ArrowReader { - - /** - * Constructs a streaming read, reading bytes from 'in'. Non-blocking. - * - * @param in the stream to read from - * @param allocator to allocate new buffers - */ - public ArrowStreamReader(ReadableByteChannel in, BufferAllocator allocator) { - super(new ReadChannel(in), allocator); - } - - public ArrowStreamReader(InputStream in, BufferAllocator allocator) { - this(Channels.newChannel(in), allocator); - } - - /** - * Reads the schema message from the beginning of the stream. - * - * @param in to allocate new buffers - * @return the deserialized arrow schema - */ - @Override - protected Schema readSchema(ReadChannel in) throws IOException { - return MessageSerializer.deserializeSchema(in); - } - - @Override - protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException { - return MessageSerializer.deserializeMessageBatch(in, allocator); - } -} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java index eba149bf79f..574612833cd 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java @@ -43,8 +43,8 @@ import org.apache.arrow.flatbuf.Type; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.schema.TypeLayout; -import org.apache.arrow.vector.schema.VectorLayout; +import org.apache.arrow.vector.ipc.message.VectorLayout; +import org.apache.arrow.vector.ipc.message.TypeLayout; import org.apache.arrow.vector.types.pojo.ArrowType.Int; public class Field { @@ -117,9 +117,9 @@ public static Field convertField(org.apache.arrow.flatbuf.Field field) { } dictionary = new DictionaryEncoding(dictionaryFB.id(), dictionaryFB.isOrdered(), indexType); } - ImmutableList.Builder layout = ImmutableList.builder(); + ImmutableList.Builder layout = ImmutableList.builder(); for (int i = 0; i < field.layoutLength(); ++i) { - layout.add(new org.apache.arrow.vector.schema.VectorLayout(field.layout(i))); + layout.add(new VectorLayout(field.layout(i))); } ImmutableList.Builder childrenBuilder = ImmutableList.builder(); for (int i = 0; i < field.childrenLength(); i++) { diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java index c7ee202f946..f51a8743676 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java @@ -17,7 +17,6 @@ */ package org.apache.arrow.vector; -import org.apache.arrow.vector.holders.VarCharHolder; import org.apache.arrow.vector.util.OversizedAllocationException; import static org.apache.arrow.vector.TestUtils.newNullableVarBinaryVector; @@ -38,15 +37,14 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.schema.TypeLayout; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.TypeLayout; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.TransferPair; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java index 3853eecac05..e61dbecf441 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java @@ -39,8 +39,8 @@ import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; import org.apache.arrow.vector.complex.writer.BigIntWriter; import org.apache.arrow.vector.complex.writer.IntWriter; -import org.apache.arrow.vector.schema.ArrowFieldNode; -import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java similarity index 99% rename from java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java index 874ba99e20f..233b682c99b 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; import java.math.BigDecimal; import java.math.BigInteger; diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java similarity index 94% rename from java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java index f968768f5e6..239d3034ad1 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.stream; +package org.apache.arrow.vector.ipc; import static java.util.Arrays.asList; import static org.junit.Assert.assertArrayEquals; @@ -33,12 +33,11 @@ import io.netty.buffer.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.file.ArrowBlock; -import org.apache.arrow.vector.file.ReadChannel; -import org.apache.arrow.vector.file.WriteChannel; -import org.apache.arrow.vector.schema.ArrowFieldNode; -import org.apache.arrow.vector.schema.ArrowMessage; -import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowMessage; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.DictionaryEncoding; import org.apache.arrow.vector.types.pojo.Field; diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFile.java similarity index 94% rename from java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFile.java index 8559969a204..4387db03655 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFile.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -40,16 +40,14 @@ import org.apache.arrow.vector.NullableIntVector; import org.apache.arrow.vector.NullableTinyIntVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.complex.FixedSizeListVector; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider; -import org.apache.arrow.vector.schema.ArrowBuffer; -import org.apache.arrow.vector.schema.ArrowMessage; -import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.stream.ArrowStreamReader; -import org.apache.arrow.vector.stream.ArrowStreamWriter; -import org.apache.arrow.vector.stream.MessageSerializerTest; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.ipc.message.ArrowBuffer; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType; @@ -108,52 +106,41 @@ public void testWriteRead() throws IOException { // read try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); FileInputStream fileInputStream = new FileInputStream(file); - ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator) { - @Override - protected ArrowMessage readMessage(SeekableReadChannel in, BufferAllocator allocator) throws IOException { - ArrowMessage message = super.readMessage(in, allocator); - if (message != null) { - ArrowRecordBatch batch = (ArrowRecordBatch) message; - List buffersLayout = batch.getBuffersLayout(); - for (ArrowBuffer arrowBuffer : buffersLayout) { - Assert.assertEquals(0, arrowBuffer.getOffset() % 8); - } - } - return message; - } - }) { - Schema schema = arrowReader.getVectorSchemaRoot().getSchema(); - LOGGER.debug("reading schema: " + schema); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + VectorUnloader unloader = new VectorUnloader(root); + Schema schema = root.getSchema(); + LOGGER.debug("reading schema: " + schema); for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) { arrowReader.loadRecordBatch(rbBlock); Assert.assertEquals(count, root.getRowCount()); + ArrowRecordBatch batch = unloader.getRecordBatch(); + List buffersLayout = batch.getBuffersLayout(); + for (ArrowBuffer arrowBuffer : buffersLayout) { + Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + } validateContent(count, root); + batch.close(); } } // Read from stream. try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); - ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator) { - @Override - protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException { - ArrowMessage message = super.readMessage(in, allocator); - if (message != null) { - ArrowRecordBatch batch = (ArrowRecordBatch) message; - List buffersLayout = batch.getBuffersLayout(); - for (ArrowBuffer arrowBuffer : buffersLayout) { - Assert.assertEquals(0, arrowBuffer.getOffset() % 8); - } - } - return message; - } - }) { + ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) { VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + VectorUnloader unloader = new VectorUnloader(root); Schema schema = root.getSchema(); LOGGER.debug("reading schema: " + schema); Assert.assertTrue(arrowReader.loadNextBatch()); + ArrowRecordBatch batch = unloader.getRecordBatch(); + List buffersLayout = batch.getBuffersLayout(); + for (ArrowBuffer arrowBuffer : buffersLayout) { + Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + } + batch.close(); Assert.assertEquals(count, root.getRowCount()); validateContent(count, root); } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFooter.java similarity index 93% rename from java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFooter.java index 46124653231..235e8c16467 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFooter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; @@ -27,6 +27,8 @@ import java.util.List; import org.apache.arrow.flatbuf.Footer; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.ipc.message.ArrowFooter; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java similarity index 85% rename from java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java index 3ce01a26835..49e194b51b8 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; import static java.nio.channels.Channels.newChannel; import static java.util.Arrays.asList; @@ -37,9 +37,15 @@ import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.TestUtils; +import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.schema.ArrowFieldNode; -import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.ipc.SeekableReadChannel; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; @@ -87,7 +93,10 @@ public void test() throws IOException { ArrowFileWriter writer = new ArrowFileWriter(root, null, newChannel(out))) { ArrowBuf validityb = buf(validity); ArrowBuf valuesb = buf(values); - writer.writeRecordBatch(new ArrowRecordBatch(16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb))); + ArrowRecordBatch batch = new ArrowRecordBatch(16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)); + VectorLoader loader = new VectorLoader(root); + loader.load(batch); + writer.writeBatch(); } byte[] byteArray = out.toByteArray(); @@ -100,7 +109,9 @@ public void test() throws IOException { // TODO: dictionaries List recordBatches = reader.getRecordBlocks(); assertEquals(1, recordBatches.size()); - ArrowRecordBatch recordBatch = (ArrowRecordBatch) reader.readMessage(channel, allocator); + reader.loadNextBatch(); + VectorUnloader unloader = new VectorUnloader(reader.getVectorSchemaRoot()); + ArrowRecordBatch recordBatch = unloader.getRecordBatch(); List nodes = recordBatch.getNodes(); assertEquals(1, nodes.size()); ArrowFieldNode node = nodes.get(0); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java similarity index 88% rename from java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java index c7e34191a3f..7a8586a9e70 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java @@ -16,9 +16,8 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; -import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -27,16 +26,12 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import io.netty.buffer.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.NullableTinyIntVector; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.schema.ArrowFieldNode; -import org.apache.arrow.vector.schema.ArrowMessage; -import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.stream.ArrowStreamReader; -import org.apache.arrow.vector.stream.ArrowStreamWriter; -import org.apache.arrow.vector.stream.MessageSerializerTest; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.ipc.BaseFileTest; +import org.apache.arrow.vector.ipc.MessageSerializerTest; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.Assert; import org.junit.Test; diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java similarity index 78% rename from java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java index f393733cc90..65e6cea2ec2 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file; +package org.apache.arrow.vector.ipc; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -30,10 +30,9 @@ import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.NullableTinyIntVector; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.schema.ArrowMessage; -import org.apache.arrow.vector.stream.ArrowStreamReader; -import org.apache.arrow.vector.stream.ArrowStreamWriter; -import org.apache.arrow.vector.stream.MessageSerializerTest; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.ipc.MessageSerializerTest; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.Assert; import org.junit.Test; @@ -95,37 +94,28 @@ private final class ReaderThread extends Thread { public ReaderThread(ReadableByteChannel sourceChannel) throws IOException { reader = new ArrowStreamReader(sourceChannel, alloc) { - @Override - protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException { - // Read all the batches. Each batch contains an incrementing id and then some - // constant data. Verify both. - ArrowMessage message = super.readMessage(in, allocator); - if (message == null) { - done = true; - } else { - batchesRead++; - } - return message; - } @Override public boolean loadNextBatch() throws IOException { - if (!super.loadNextBatch()) { + if (super.loadNextBatch()) { + batchesRead++; + } else { + done = true; return false; } - if (!done) { - VectorSchemaRoot root = getVectorSchemaRoot(); - Assert.assertEquals(16, root.getRowCount()); - NullableTinyIntVector vector = (NullableTinyIntVector) root.getFieldVectors().get(0); - Assert.assertEquals((byte) (batchesRead - 1), vector.get(0)); - for (int i = 1; i < 16; i++) { - if (i < 8) { - Assert.assertEquals((byte) (i + 1), vector.get(i)); - } else { - Assert.assertTrue(vector.isNull(i)); - } + + VectorSchemaRoot root = getVectorSchemaRoot(); + Assert.assertEquals(16, root.getRowCount()); + NullableTinyIntVector vector = (NullableTinyIntVector) root.getFieldVectors().get(0); + Assert.assertEquals((byte) (batchesRead - 1), vector.get(0)); + for (int i = 1; i < 16; i++) { + if (i < 8) { + Assert.assertEquals((byte) (i + 1), vector.get(i)); + } else { + Assert.assertTrue(vector.isNull(i)); } } + return true; } }; @@ -139,7 +129,7 @@ public void run() { reader.getVectorSchemaRoot().getSchema().getFields().get(0).getTypeLayout().getVectorTypes().toString(), reader.getVectorSchemaRoot().getSchema().getFields().get(0).getTypeLayout().getVectors().size() > 0); while (!done) { - assertTrue(reader.loadNextBatch()); + assertTrue(reader.loadNextBatch() != done); } reader.close(); } catch (IOException e) { diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestJSONFile.java similarity index 99% rename from java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/TestJSONFile.java index 5c4c48cd26b..c3e0b795174 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestJSONFile.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.file.json; +package org.apache.arrow.vector.ipc; import java.io.File; import java.io.IOException; @@ -28,7 +28,6 @@ import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider; -import org.apache.arrow.vector.file.BaseFileTest; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.Validator; import org.junit.Assert;