Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion java/dataset/src/main/cpp/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <mutex>
#include <utility>
#include <unordered_map>

#include "arrow/array.h"
Expand All @@ -25,6 +26,7 @@
#include "arrow/dataset/api.h"
#include "arrow/dataset/file_base.h"
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/engine/substrait/util.h"
#include "arrow/ipc/api.h"
#include "arrow/util/iterator.h"
Expand Down Expand Up @@ -569,7 +571,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_releaseBuffe
* Signature: (Ljava/lang/String;II)J
*/
JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I(
JNIEnv* env, jobject, jstring uri, jint file_format_id) {
JNI_METHOD_START
std::shared_ptr<arrow::dataset::FileFormat> file_format =
Expand All @@ -582,6 +584,50 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: makeFileSystemDatasetFactory
* Signature: ([Ljava/lang/String;II)J
*/
JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I(
JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) {
JNI_METHOD_START

std::shared_ptr<arrow::dataset::FileFormat> file_format =
JniGetOrThrow(GetFileFormat(file_format_id));
arrow::dataset::FileSystemFactoryOptions options;

std::vector<std::string> uri_vec = ToStringVector(env, uris);
if (uri_vec.size() == 0) {
JniThrow("No URIs provided.");
}

// If not all URIs, throw exception
if (auto elem = std::find_if_not(uri_vec.begin(), uri_vec.end(), arrow::fs::internal::IsLikelyUri);
elem != uri_vec.end()) {
JniThrow("Unrecognized file type in URI: " + *elem);
}

std::vector<std::string> output_paths;
std::string first_path;
// We know that uri_vec isn't empty, from the conditional above
auto fs = JniGetOrThrow(arrow::fs::FileSystemFromUri(uri_vec[0], &first_path));
output_paths.push_back(first_path);

std::transform(uri_vec.begin() + 1, uri_vec.end(), std::back_inserter(output_paths),
[&](const auto& s) -> std::string {
auto result = JniGetOrThrow(fs->PathFromUri(s));
return std::move(result);
});

std::shared_ptr<arrow::dataset::DatasetFactory> d =
JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make(
std::move(fs), std::move(output_paths), file_format, options));
return CreateNativeRef(d);
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: writeFromScannerToFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,17 @@ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memo
super(allocator, memoryPool, createNative(format, uri));
}

public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format,
String[] uris) {
super(allocator, memoryPool, createNative(format, uris));
}

private static long createNative(FileFormat format, String uri) {
return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id());
}

private static long createNative(FileFormat format, String[] uris) {
return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class JniWrapper {

private static final JniWrapper INSTANCE = new JniWrapper();

public static JniWrapper get() {
JniLoader.get().ensureLoaded();
return INSTANCE;
Expand All @@ -45,6 +45,17 @@ private JniWrapper() {
*/
public native long makeFileSystemDatasetFactory(String uri, int fileFormat);

/**
* Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a
* intermediate shared_ptr of the factory instance.
*
* @param uris List of file uris to read, each path pointing to an individual file
* @param fileFormat file format ID
* @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance.
* @see FileFormat
*/
public native long makeFileSystemDatasetFactory(String[] uris, int fileFormat);

/**
* Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally
* depends on C++ write API: FileSystemDataset::Write.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,49 @@ public void testBaseParquetRead() throws Exception {
AutoCloseables.close(factory);
}

@Test
public void testMultipleParquetReadFromUris() throws Exception {
ParquetWriteSupport writeSupport1 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(),
1, "a");
ParquetWriteSupport writeSupport2 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(),
2, "b");
String expectedJsonUnordered = "[[1,\"a\"],[2,\"b\"]]";

ScanOptions options = new ScanOptions(1);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, new String[]{writeSupport1.getOutputURI(), writeSupport2.getOutputURI()});
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertScanBatchesProduced(factory, options);
assertEquals(2, datum.size());
datum.forEach(batch -> assertEquals(1, batch.getLength()));
checkParquetReadResult(schema, expectedJsonUnordered, datum);

AutoCloseables.close(datum);
AutoCloseables.close(factory);
}


@Test
public void testMultipleParquetInvalidUri() throws Exception {
RuntimeException exc = assertThrows(RuntimeException.class,
() -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, new String[]{"https://example.com", "file:///test/location"}));
Assertions.assertEquals("Unrecognized filesystem type in URI: https://example.com", exc.getMessage());
}

@Test
public void testMultipleParquetMultipleFilesystemTypes() throws Exception {
RuntimeException exc = assertThrows(RuntimeException.class,
() -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, new String[]{"file:///test/location", "s3:///test/bucket/file" }));
Assertions.assertTrue(
exc.getMessage().startsWith("The filesystem expected a URI with one of the schemes (file) but received s3"
)
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be possible to add testing for exception throws: (1) Invalid URI , (2) ensure that they all share a FileSystem type


@Test
public void testParquetProjectSingleColumn() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
Expand Down