From d48991e71a4d2bacc934194a8152fc2b2c59f25c Mon Sep 17 00:00:00 2001 From: NoahFournier Date: Tue, 11 Apr 2023 10:59:15 +0100 Subject: [PATCH 1/5] Add support for multi-file datasets from Java --- java/dataset/src/main/cpp/jni_wrapper.cc | 57 ++++++++++++++++++- .../file/FileSystemDatasetFactory.java | 9 +++ .../apache/arrow/dataset/file/JniWrapper.java | 13 ++++- .../dataset/file/TestFileSystemDataset.java | 23 ++++++++ 4 files changed, 100 insertions(+), 2 deletions(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index b3b5fe18c79..e90b1484aa7 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include "arrow/array.h" #include "arrow/array/concatenate.h" @@ -24,6 +25,7 @@ #include "arrow/dataset/api.h" #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" +#include "arrow/filesystem/path_util.h" #include "arrow/ipc/api.h" #include "arrow/util/iterator.h" #include "jni_util.h" @@ -520,7 +522,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_releaseBuffe * Signature: (Ljava/lang/String;II)J */ JNIEXPORT jlong JNICALL -Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I( JNIEnv* env, jobject, jstring uri, jint file_format_id) { JNI_METHOD_START std::shared_ptr file_format = @@ -533,6 +535,59 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( JNI_METHOD_END(-1L) } +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: makeFileSystemDatasetFactory + * Signature: ([Ljava/lang/String;II)J + */ +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I( + JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) { + JNI_METHOD_START + + using FsPathPair = std::pair, std::string>; + + std::shared_ptr file_format = + JniGetOrThrow(GetFileFormat(file_format_id)); + arrow::dataset::FileSystemFactoryOptions options; + + std::vector uri_vec = ToStringVector(env, uris); + + // If not all URIs, throw exception + if (!std::all_of(uri_vec.begin(), uri_vec.end(), arrow::fs::internal::IsLikelyUri)) { + JniThrow("All sources must be valid URIs."); + } + + std::vector filesystems; + filesystems.reserve(uri_vec.size()); + std::transform(uri_vec.begin(), uri_vec.end(), std::back_inserter(filesystems), + [](const auto& s) -> FsPathPair { + std::string output_path; + auto fs = JniGetOrThrow(arrow::fs::FileSystemFromUri(s, &output_path)); + return {fs, output_path}; + }); + + // If all URIs, ensure that they all share a FileSystem type + if (std::unique(filesystems.begin(), filesystems.end(), + [] (const auto& p1, const auto& p2) { + return p1.first->type_name() == p2.first->type_name(); + }) - filesystems.begin() != 1) { + JniThrow("Different filesystems are not supported in a multi-file dataset."); + } + + // Retrieve output paths + std::transform(filesystems.begin(), filesystems.end(), uri_vec.begin(), + [] (auto& p) { + return p.second; + }); + + std::shared_ptr d = + JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( + filesystems[0].first, uri_vec, file_format, options)); + return CreateNativeRef(d); + JNI_METHOD_END(-1L) +} + /* * Class: org_apache_arrow_dataset_file_JniWrapper * Method: writeFromScannerToFile diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java index 1268d11fe10..aa315690592 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java @@ -31,8 +31,17 @@ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memo super(allocator, memoryPool, createNative(format, uri)); } + public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, + String[] uris) { + super(allocator, memoryPool, createNative(format, uris)); + } + private static long createNative(FileFormat format, String uri) { return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id()); } + private static long createNative(FileFormat format, String[] uris) { + return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id()); + } + } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 18560a46a5c..c3a1a4e58a1 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -25,7 +25,7 @@ public class JniWrapper { private static final JniWrapper INSTANCE = new JniWrapper(); - + public static JniWrapper get() { JniLoader.get().ensureLoaded(); return INSTANCE; @@ -45,6 +45,17 @@ private JniWrapper() { */ public native long makeFileSystemDatasetFactory(String uri, int fileFormat); + /** + * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a + * intermediate shared_ptr of the factory instance. + * + * @param uris List of file uris to read, each path pointing to an individual file + * @param fileFormat file format ID + * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. + * @see FileFormat + */ + public native long makeFileSystemDatasetFactory(String[] uris, int fileFormat); + /** * Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally * depends on C++ write API: FileSystemDataset::Write. diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index b8a13937a8a..d86a5e151b8 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -101,6 +101,29 @@ public void testBaseParquetRead() throws Exception { AutoCloseables.close(factory); } + @Test + public void testMultipleParquetReadFromUris() throws Exception { + ParquetWriteSupport writeSupport1 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), + 1, "a"); + ParquetWriteSupport writeSupport2 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), + 2, "b"); + String expectedJsonUnordered = "[[1,\"a\"],[2,\"b\"]]"; + + ScanOptions options = new ScanOptions(1); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{writeSupport1.getOutputURI(), writeSupport2.getOutputURI()}); + Schema schema = inferResultSchemaFromFactory(factory, options); + List datum = collectResultFromFactory(factory, options); + + assertScanBatchesProduced(factory, options); + assertEquals(2, datum.size()); + datum.forEach(batch -> assertEquals(1, batch.getLength())); + checkParquetReadResult(schema, expectedJsonUnordered, datum); + + AutoCloseables.close(datum); + + } + @Test public void testParquetProjectSingleColumn() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); From f00629926ac55c1e319e15ff8dc58be47f9deff4 Mon Sep 17 00:00:00 2001 From: NoahFournier Date: Wed, 12 Apr 2023 11:06:22 +0100 Subject: [PATCH 2/5] Better error message for unrecognized URI + test --- java/dataset/src/main/cpp/jni_wrapper.cc | 5 +++-- .../apache/arrow/dataset/file/TestFileSystemDataset.java | 8 ++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index e90b1484aa7..757a5a65062 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -554,8 +554,9 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Lj std::vector uri_vec = ToStringVector(env, uris); // If not all URIs, throw exception - if (!std::all_of(uri_vec.begin(), uri_vec.end(), arrow::fs::internal::IsLikelyUri)) { - JniThrow("All sources must be valid URIs."); + if (auto elem = std::find_if_not(uri_vec.begin(), uri_vec.end(), arrow::fs::internal::IsLikelyUri); + elem != uri_vec.end()) { + JniThrow("Unrecognized file type in URI: " + *elem); } std::vector filesystems; diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index d86a5e151b8..80b0acd90e4 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -121,7 +121,15 @@ public void testMultipleParquetReadFromUris() throws Exception { checkParquetReadResult(schema, expectedJsonUnordered, datum); AutoCloseables.close(datum); + AutoCloseables.close(factory); + } + @Test + public void testMultipleParquetInvalidUri() throws Exception { + RuntimeException exc = assertThrows(RuntimeException.class, + () -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{"https://example.com", "file:///test/location"})); + Assertions.assertEquals("Unrecognized filesystem type in URI: https://example.com", exc.getMessage()); } @Test From f2dcdf8783b970d762b58165a6bd1c87d53d2602 Mon Sep 17 00:00:00 2001 From: NoahFournier Date: Thu, 13 Apr 2023 09:42:41 +0100 Subject: [PATCH 3/5] Handle empty URI array --- java/dataset/src/main/cpp/jni_wrapper.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 757a5a65062..8847127d7f8 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -552,6 +552,9 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Lj arrow::dataset::FileSystemFactoryOptions options; std::vector uri_vec = ToStringVector(env, uris); + if (uri_vec.size() == 0) { + JniThrow("No URIs provided."); + } // If not all URIs, throw exception if (auto elem = std::find_if_not(uri_vec.begin(), uri_vec.end(), arrow::fs::internal::IsLikelyUri); From 3e71957d899e793b18e91afbdc7acac7fd53b13e Mon Sep 17 00:00:00 2001 From: NoahFournier Date: Thu, 13 Apr 2023 09:43:00 +0100 Subject: [PATCH 4/5] Fix lint --- .../org/apache/arrow/dataset/file/TestFileSystemDataset.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index 80b0acd90e4..13cab413ae7 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -127,8 +127,8 @@ public void testMultipleParquetReadFromUris() throws Exception { @Test public void testMultipleParquetInvalidUri() throws Exception { RuntimeException exc = assertThrows(RuntimeException.class, - () -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), - FileFormat.PARQUET, new String[]{"https://example.com", "file:///test/location"})); + () -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{"https://example.com", "file:///test/location"})); Assertions.assertEquals("Unrecognized filesystem type in URI: https://example.com", exc.getMessage()); } From 8b7ffdb29cd7f13e552c586509108cc4f8218fa8 Mon Sep 17 00:00:00 2001 From: NoahFournier Date: Fri, 2 Jun 2023 12:25:33 +0100 Subject: [PATCH 5/5] Additional test, and clean up implementation --- java/dataset/src/main/cpp/jni_wrapper.cc | 33 ++++++------------- .../dataset/file/TestFileSystemDataset.java | 12 +++++++ 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 8847127d7f8..b023746f776 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -545,8 +545,6 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Lj JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) { JNI_METHOD_START - using FsPathPair = std::pair, std::string>; - std::shared_ptr file_format = JniGetOrThrow(GetFileFormat(file_format_id)); arrow::dataset::FileSystemFactoryOptions options; @@ -562,32 +560,21 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Lj JniThrow("Unrecognized file type in URI: " + *elem); } - std::vector filesystems; - filesystems.reserve(uri_vec.size()); - std::transform(uri_vec.begin(), uri_vec.end(), std::back_inserter(filesystems), - [](const auto& s) -> FsPathPair { - std::string output_path; - auto fs = JniGetOrThrow(arrow::fs::FileSystemFromUri(s, &output_path)); - return {fs, output_path}; - }); - - // If all URIs, ensure that they all share a FileSystem type - if (std::unique(filesystems.begin(), filesystems.end(), - [] (const auto& p1, const auto& p2) { - return p1.first->type_name() == p2.first->type_name(); - }) - filesystems.begin() != 1) { - JniThrow("Different filesystems are not supported in a multi-file dataset."); - } + std::vector output_paths; + std::string first_path; + // We know that uri_vec isn't empty, from the conditional above + auto fs = JniGetOrThrow(arrow::fs::FileSystemFromUri(uri_vec[0], &first_path)); + output_paths.push_back(first_path); - // Retrieve output paths - std::transform(filesystems.begin(), filesystems.end(), uri_vec.begin(), - [] (auto& p) { - return p.second; + std::transform(uri_vec.begin() + 1, uri_vec.end(), std::back_inserter(output_paths), + [&](const auto& s) -> std::string { + auto result = JniGetOrThrow(fs->PathFromUri(s)); + return std::move(result); }); std::shared_ptr d = JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( - filesystems[0].first, uri_vec, file_format, options)); + std::move(fs), std::move(output_paths), file_format, options)); return CreateNativeRef(d); JNI_METHOD_END(-1L) } diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index 13cab413ae7..735b3ae6110 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -124,6 +124,7 @@ public void testMultipleParquetReadFromUris() throws Exception { AutoCloseables.close(factory); } + @Test public void testMultipleParquetInvalidUri() throws Exception { RuntimeException exc = assertThrows(RuntimeException.class, @@ -132,6 +133,17 @@ public void testMultipleParquetInvalidUri() throws Exception { Assertions.assertEquals("Unrecognized filesystem type in URI: https://example.com", exc.getMessage()); } + @Test + public void testMultipleParquetMultipleFilesystemTypes() throws Exception { + RuntimeException exc = assertThrows(RuntimeException.class, + () -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{"file:///test/location", "s3:///test/bucket/file" })); + Assertions.assertTrue( + exc.getMessage().startsWith("The filesystem expected a URI with one of the schemes (file) but received s3" + ) + ); + } + @Test public void testParquetProjectSingleColumn() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");