From a8ed61d9931419261e18a25421739d11593799dc Mon Sep 17 00:00:00 2001 From: "igor.suhorukov" Date: Sun, 7 Aug 2022 17:10:54 +0300 Subject: [PATCH 1/3] ARROW-17303: [Java][Dataset] Read Arrow IPC files by NativeDatasetFactory --- java/dataset/src/main/cpp/jni_wrapper.cc | 2 ++ .../apache/arrow/dataset/file/FileFormat.java | 1 + .../dataset/file/TestFileSystemDataset.java | 22 ++++++++++++++++++ .../src/test/resources/osm_nodes.arrow | Bin 0 -> 2074 bytes 4 files changed, 25 insertions(+) create mode 100644 java/dataset/src/test/resources/osm_nodes.arrow diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index e96dfb8aed7..d0881639034 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -89,6 +89,8 @@ arrow::Result> GetFileFormat( switch (file_format_id) { case 0: return std::make_shared(); + case 1: + return std::make_shared(); default: std::string error_message = "illegal file format id: " + std::to_string(file_format_id); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java index 107fc2f71d2..343e458ce23 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java @@ -22,6 +22,7 @@ */ public enum FileFormat { PARQUET(0), + ARROW_IPC(1), NONE(-1); private final int id; diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index 92610b1145c..462aa7fd709 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -316,6 +316,28 @@ public void testErrorThrownWhenIterateOnIteratorAfterTaskClose() throws Exceptio AutoCloseables.close(factory); } + @Test + public void testBaseArrowIpcRead() throws Exception { + + String arrowDataURI = TestFileSystemDataset.class.getResource("/osm_nodes.arrow").toURI().toString(); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.ARROW_IPC, arrowDataURI); + ScanOptions options = new ScanOptions(100); + Schema schema = inferResultSchemaFromFactory(factory, options); + List datum = collectResultFromFactory(factory, options); + + assertSingleTaskProduced(factory, options); + assertEquals(1, datum.size()); + assertEquals(4, schema.getFields().size()); + assertEquals("id", schema.getFields().get(0).getName()); + assertEquals("latitude", schema.getFields().get(1).getName()); + assertEquals("longitude", schema.getFields().get(2).getName()); + assertEquals("tags", schema.getFields().get(3).getName()); + + AutoCloseables.close(datum); + AutoCloseables.close(factory); + } + private void checkParquetReadResult(Schema schema, String expectedJson, List actual) throws IOException { final ObjectMapper json = new ObjectMapper(); diff --git a/java/dataset/src/test/resources/osm_nodes.arrow b/java/dataset/src/test/resources/osm_nodes.arrow new file mode 100644 index 0000000000000000000000000000000000000000..b30a9f1036a97781ab3c572eb933e2b4ad0dfdaf GIT binary patch literal 2074 zcmeHJO-oc^6h3!G$Lly#YAOSU6)2-=k%Ho4g6m2zQVq7SKq$*W%qc0IC}|NDL~XPW z@ekCdMI{Ld6-bmsDkVWwKcIC~i=YpCp7*2V76n0@=**n=x#vCSJ@4^++_9~zYe#pZ z$TH}IBT_7+sH~ER#KpxEDNpY^QHeovA#>j~48V@aNt|y0zrk~Yn^FoalQlqlt`xgv zhNo#FC5}cB994c^eOX?%HYb#g#E)r|5U|zQg%Go~BZBgE{91SbeLfzK_GWV#VAF5x z-$NgU-C*W8;^0Tb%7T2jQh-&FL)}Pk|1hQVKl%-uc$D!R^n%C>A!UaS^$(2X`WVms z73bC}O9>m)lDacu?hL7XXM0D?ah+R1c7K*z<0rJpz16 z*tla-fgUJF-Sd?H1Pv9LUI<4~Tl&+zz;iCn>weLet2LU{ndc=z1k+$|^RwUocR_Z0YYGMjo4O?rnnPuInZlAiV3bZx5c z(?#D~h*f4j10!QK`{a`E-I$+v{Sor)S;yHh?t2wa#&SturoL*i9k_Tf*0~M%EH^p2 zaKZN;?k;Iw0$RV!IZMN2aE3Xrd>3$jSs(Tx=h4>1p0fy*vgK^?os|Hy#?|1B;4R=5 zuZ!{~h>Oe+Oz&|2FkM!B@@M_$mMZ literal 0 HcmV?d00001 From 85f3ef9febeba548a05fda980bbdb5aebdf4f854 Mon Sep 17 00:00:00 2001 From: "igor.suhorukov" Date: Sun, 7 Aug 2022 17:10:54 +0300 Subject: [PATCH 2/3] ARROW-17303: [Java][Dataset] Read Arrow IPC files by NativeDatasetFactory (#13760) --- java/dataset/src/main/cpp/jni_wrapper.cc | 2 ++ .../apache/arrow/dataset/file/FileFormat.java | 1 + .../dataset/file/TestFileSystemDataset.java | 22 ++++++++++++++++++ .../src/test/resources/osm_nodes.arrow | Bin 0 -> 2074 bytes 4 files changed, 25 insertions(+) create mode 100644 java/dataset/src/test/resources/osm_nodes.arrow diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index e96dfb8aed7..d0881639034 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -89,6 +89,8 @@ arrow::Result> GetFileFormat( switch (file_format_id) { case 0: return std::make_shared(); + case 1: + return std::make_shared(); default: std::string error_message = "illegal file format id: " + std::to_string(file_format_id); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java index 107fc2f71d2..343e458ce23 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java @@ -22,6 +22,7 @@ */ public enum FileFormat { PARQUET(0), + ARROW_IPC(1), NONE(-1); private final int id; diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index 92610b1145c..462aa7fd709 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -316,6 +316,28 @@ public void testErrorThrownWhenIterateOnIteratorAfterTaskClose() throws Exceptio AutoCloseables.close(factory); } + @Test + public void testBaseArrowIpcRead() throws Exception { + + String arrowDataURI = TestFileSystemDataset.class.getResource("/osm_nodes.arrow").toURI().toString(); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.ARROW_IPC, arrowDataURI); + ScanOptions options = new ScanOptions(100); + Schema schema = inferResultSchemaFromFactory(factory, options); + List datum = collectResultFromFactory(factory, options); + + assertSingleTaskProduced(factory, options); + assertEquals(1, datum.size()); + assertEquals(4, schema.getFields().size()); + assertEquals("id", schema.getFields().get(0).getName()); + assertEquals("latitude", schema.getFields().get(1).getName()); + assertEquals("longitude", schema.getFields().get(2).getName()); + assertEquals("tags", schema.getFields().get(3).getName()); + + AutoCloseables.close(datum); + AutoCloseables.close(factory); + } + private void checkParquetReadResult(Schema schema, String expectedJson, List actual) throws IOException { final ObjectMapper json = new ObjectMapper(); diff --git a/java/dataset/src/test/resources/osm_nodes.arrow b/java/dataset/src/test/resources/osm_nodes.arrow new file mode 100644 index 0000000000000000000000000000000000000000..b30a9f1036a97781ab3c572eb933e2b4ad0dfdaf GIT binary patch literal 2074 zcmeHJO-oc^6h3!G$Lly#YAOSU6)2-=k%Ho4g6m2zQVq7SKq$*W%qc0IC}|NDL~XPW z@ekCdMI{Ld6-bmsDkVWwKcIC~i=YpCp7*2V76n0@=**n=x#vCSJ@4^++_9~zYe#pZ z$TH}IBT_7+sH~ER#KpxEDNpY^QHeovA#>j~48V@aNt|y0zrk~Yn^FoalQlqlt`xgv zhNo#FC5}cB994c^eOX?%HYb#g#E)r|5U|zQg%Go~BZBgE{91SbeLfzK_GWV#VAF5x z-$NgU-C*W8;^0Tb%7T2jQh-&FL)}Pk|1hQVKl%-uc$D!R^n%C>A!UaS^$(2X`WVms z73bC}O9>m)lDacu?hL7XXM0D?ah+R1c7K*z<0rJpz16 z*tla-fgUJF-Sd?H1Pv9LUI<4~Tl&+zz;iCn>weLet2LU{ndc=z1k+$|^RwUocR_Z0YYGMjo4O?rnnPuInZlAiV3bZx5c z(?#D~h*f4j10!QK`{a`E-I$+v{Sor)S;yHh?t2wa#&SturoL*i9k_Tf*0~M%EH^p2 zaKZN;?k;Iw0$RV!IZMN2aE3Xrd>3$jSs(Tx=h4>1p0fy*vgK^?os|Hy#?|1B;4R=5 zuZ!{~h>Oe+Oz&|2FkM!B@@M_$mMZ literal 0 HcmV?d00001 From 9f3acbc9b486b1971d08226e2153baeadc0b6b8a Mon Sep 17 00:00:00 2001 From: "igor.suhorukov" Date: Mon, 8 Aug 2022 16:14:44 +0300 Subject: [PATCH 3/3] ARROW-17303: [Java][Dataset] Read Arrow IPC files by NativeDatasetFactory (#13760) --- .../dataset/file/TestFileSystemDataset.java | 31 ++++++++++++++---- .../src/test/resources/osm_nodes.arrow | Bin 2074 -> 0 bytes 2 files changed, 25 insertions(+), 6 deletions(-) delete mode 100644 java/dataset/src/test/resources/osm_nodes.arrow diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index 462aa7fd709..2fd8a19bac1 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -46,11 +47,15 @@ import org.apache.arrow.dataset.scanner.ScanOptions; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileWriter; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; @@ -318,8 +323,22 @@ public void testErrorThrownWhenIterateOnIteratorAfterTaskClose() throws Exceptio @Test public void testBaseArrowIpcRead() throws Exception { + File dataFile = TMP.newFile(); + Schema sourceSchema = new Schema(Collections.singletonList(Field.nullable("ints", new ArrowType.Int(32, true)))); + try (VectorSchemaRoot root = VectorSchemaRoot.create(sourceSchema, rootAllocator()); + FileOutputStream sink = new FileOutputStream(dataFile); + ArrowFileWriter writer = new ArrowFileWriter(root, /*dictionaryProvider=*/null, sink.getChannel())) { + IntVector ints = (IntVector) root.getVector(0); + ints.setSafe(0, 0); + ints.setSafe(1, 1024); + ints.setSafe(2, Integer.MAX_VALUE); + root.setRowCount(3); + writer.start(); + writer.writeBatch(); + writer.end(); + } - String arrowDataURI = TestFileSystemDataset.class.getResource("/osm_nodes.arrow").toURI().toString(); + String arrowDataURI = dataFile.toURI().toString(); FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), FileFormat.ARROW_IPC, arrowDataURI); ScanOptions options = new ScanOptions(100); @@ -328,11 +347,11 @@ public void testBaseArrowIpcRead() throws Exception { assertSingleTaskProduced(factory, options); assertEquals(1, datum.size()); - assertEquals(4, schema.getFields().size()); - assertEquals("id", schema.getFields().get(0).getName()); - assertEquals("latitude", schema.getFields().get(1).getName()); - assertEquals("longitude", schema.getFields().get(2).getName()); - assertEquals("tags", schema.getFields().get(3).getName()); + assertEquals(1, schema.getFields().size()); + assertEquals("ints", schema.getFields().get(0).getName()); + + String expectedJsonUnordered = String.format("[[0],[1024],[%d]]", Integer.MAX_VALUE); + checkParquetReadResult(schema, expectedJsonUnordered, datum); AutoCloseables.close(datum); AutoCloseables.close(factory); diff --git a/java/dataset/src/test/resources/osm_nodes.arrow b/java/dataset/src/test/resources/osm_nodes.arrow deleted file mode 100644 index b30a9f1036a97781ab3c572eb933e2b4ad0dfdaf..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2074 zcmeHJO-oc^6h3!G$Lly#YAOSU6)2-=k%Ho4g6m2zQVq7SKq$*W%qc0IC}|NDL~XPW z@ekCdMI{Ld6-bmsDkVWwKcIC~i=YpCp7*2V76n0@=**n=x#vCSJ@4^++_9~zYe#pZ z$TH}IBT_7+sH~ER#KpxEDNpY^QHeovA#>j~48V@aNt|y0zrk~Yn^FoalQlqlt`xgv zhNo#FC5}cB994c^eOX?%HYb#g#E)r|5U|zQg%Go~BZBgE{91SbeLfzK_GWV#VAF5x z-$NgU-C*W8;^0Tb%7T2jQh-&FL)}Pk|1hQVKl%-uc$D!R^n%C>A!UaS^$(2X`WVms z73bC}O9>m)lDacu?hL7XXM0D?ah+R1c7K*z<0rJpz16 z*tla-fgUJF-Sd?H1Pv9LUI<4~Tl&+zz;iCn>weLet2LU{ndc=z1k+$|^RwUocR_Z0YYGMjo4O?rnnPuInZlAiV3bZx5c z(?#D~h*f4j10!QK`{a`E-I$+v{Sor)S;yHh?t2wa#&SturoL*i9k_Tf*0~M%EH^p2 zaKZN;?k;Iw0$RV!IZMN2aE3Xrd>3$jSs(Tx=h4>1p0fy*vgK^?os|Hy#?|1B;4R=5 zuZ!{~h>Oe+Oz&|2FkM!B@@M_$mMZ