diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 48191eac495..cba2b4a0dbf 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include "arrow/array.h" @@ -25,6 +26,7 @@ #include "arrow/dataset/api.h" #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" +#include "arrow/filesystem/path_util.h" #include "arrow/engine/substrait/util.h" #include "arrow/ipc/api.h" #include "arrow/util/iterator.h" @@ -569,7 +571,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_releaseBuffe * Signature: (Ljava/lang/String;II)J */ JNIEXPORT jlong JNICALL -Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I( JNIEnv* env, jobject, jstring uri, jint file_format_id) { JNI_METHOD_START std::shared_ptr file_format = @@ -582,6 +584,50 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( JNI_METHOD_END(-1L) } +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: makeFileSystemDatasetFactory + * Signature: ([Ljava/lang/String;II)J + */ +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I( + JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) { + JNI_METHOD_START + + std::shared_ptr file_format = + JniGetOrThrow(GetFileFormat(file_format_id)); + arrow::dataset::FileSystemFactoryOptions options; + + std::vector uri_vec = ToStringVector(env, uris); + if (uri_vec.size() == 0) { + JniThrow("No URIs provided."); + } + + // If not all URIs, throw exception + if (auto elem = std::find_if_not(uri_vec.begin(), uri_vec.end(), arrow::fs::internal::IsLikelyUri); + elem != uri_vec.end()) { + JniThrow("Unrecognized file type in URI: " + *elem); + } + + std::vector output_paths; + std::string first_path; + // We know that uri_vec isn't empty, from the conditional above + auto fs = JniGetOrThrow(arrow::fs::FileSystemFromUri(uri_vec[0], &first_path)); + output_paths.push_back(first_path); + + std::transform(uri_vec.begin() + 1, uri_vec.end(), std::back_inserter(output_paths), + [&](const auto& s) -> std::string { + auto result = JniGetOrThrow(fs->PathFromUri(s)); + return std::move(result); + }); + + std::shared_ptr d = + JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( + std::move(fs), std::move(output_paths), file_format, options)); + return CreateNativeRef(d); + JNI_METHOD_END(-1L) +} + /* * Class: org_apache_arrow_dataset_file_JniWrapper * Method: writeFromScannerToFile diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java index 1268d11fe10..aa315690592 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java @@ -31,8 +31,17 @@ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memo super(allocator, memoryPool, createNative(format, uri)); } + public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, + String[] uris) { + super(allocator, memoryPool, createNative(format, uris)); + } + private static long createNative(FileFormat format, String uri) { return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id()); } + private static long createNative(FileFormat format, String[] uris) { + return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id()); + } + } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 18560a46a5c..c3a1a4e58a1 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -25,7 +25,7 @@ public class JniWrapper { private static final JniWrapper INSTANCE = new JniWrapper(); - + public static JniWrapper get() { JniLoader.get().ensureLoaded(); return INSTANCE; @@ -45,6 +45,17 @@ private JniWrapper() { */ public native long makeFileSystemDatasetFactory(String uri, int fileFormat); + /** + * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a + * intermediate shared_ptr of the factory instance. + * + * @param uris List of file uris to read, each path pointing to an individual file + * @param fileFormat file format ID + * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. + * @see FileFormat + */ + public native long makeFileSystemDatasetFactory(String[] uris, int fileFormat); + /** * Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally * depends on C++ write API: FileSystemDataset::Write. diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index b8a13937a8a..735b3ae6110 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -101,6 +101,49 @@ public void testBaseParquetRead() throws Exception { AutoCloseables.close(factory); } + @Test + public void testMultipleParquetReadFromUris() throws Exception { + ParquetWriteSupport writeSupport1 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), + 1, "a"); + ParquetWriteSupport writeSupport2 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), + 2, "b"); + String expectedJsonUnordered = "[[1,\"a\"],[2,\"b\"]]"; + + ScanOptions options = new ScanOptions(1); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{writeSupport1.getOutputURI(), writeSupport2.getOutputURI()}); + Schema schema = inferResultSchemaFromFactory(factory, options); + List datum = collectResultFromFactory(factory, options); + + assertScanBatchesProduced(factory, options); + assertEquals(2, datum.size()); + datum.forEach(batch -> assertEquals(1, batch.getLength())); + checkParquetReadResult(schema, expectedJsonUnordered, datum); + + AutoCloseables.close(datum); + AutoCloseables.close(factory); + } + + + @Test + public void testMultipleParquetInvalidUri() throws Exception { + RuntimeException exc = assertThrows(RuntimeException.class, + () -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{"https://example.com", "file:///test/location"})); + Assertions.assertEquals("Unrecognized filesystem type in URI: https://example.com", exc.getMessage()); + } + + @Test + public void testMultipleParquetMultipleFilesystemTypes() throws Exception { + RuntimeException exc = assertThrows(RuntimeException.class, + () -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{"file:///test/location", "s3:///test/bucket/file" })); + Assertions.assertTrue( + exc.getMessage().startsWith("The filesystem expected a URI with one of the schemes (file) but received s3" + ) + ); + } + @Test public void testParquetProjectSingleColumn() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");