From e1d3b41b8b29bed2ee471bd13c6d0693dd2f66ca Mon Sep 17 00:00:00 2001 From: zhen wang Date: Wed, 13 Sep 2023 15:30:41 +0800 Subject: [PATCH 1/2] support using existing java file system --- cpp/src/arrow/dataset/discovery.cc | 2 +- cpp/src/arrow/dataset/discovery.h | 4 ++++ cpp/src/arrow/filesystem/filesystem.cc | 17 +++++++++++++- cpp/src/arrow/filesystem/filesystem.h | 4 ++++ cpp/src/arrow/io/hdfs.cc | 7 +++++- cpp/src/arrow/io/hdfs.h | 1 + java/dataset/src/main/cpp/jni_wrapper.cc | 22 +++++++++++++++++++ .../apache/arrow/dataset/file/JniWrapper.java | 12 ++++++++++ 8 files changed, 66 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 2aca85e6adc..0d31eacbedb 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -217,7 +217,7 @@ Result> FileSystemDatasetFactory::Make( // ARROW-12481. std::string internal_path; ARROW_ASSIGN_OR_RAISE(std::shared_ptr filesystem, - fs::FileSystemFromUri(uri, &internal_path)) + fs::FileSystemFromUriAndFs(uri, &internal_path, options.java_fs_global_ref)) ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, filesystem->GetFileInfo(internal_path)) if (file_info.IsDirectory()) { fs::FileSelector selector; diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index 238b33e40fe..02977ff1faa 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -192,6 +192,10 @@ struct FileSystemFactoryOptions { ".", "_", }; + + /// when java context have a file system reference to be used + // then use this file system reference in the context + void* java_fs_global_ref = nullptr; }; /// \brief FileSystemDatasetFactory creates a Dataset from a vector of diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 9ecc4610f38..91f3cc1a064 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -680,7 +680,8 @@ namespace { Result> FileSystemFromUriReal(const Uri& uri, const std::string& uri_string, const io::IOContext& io_context, - std::string* out_path) { + std::string* out_path, + void* fsRef) { const auto scheme = uri.scheme(); if (scheme == "file") { @@ -703,6 +704,7 @@ Result> FileSystemFromUriReal(const Uri& uri, if (scheme == "hdfs" || scheme == "viewfs") { #ifdef ARROW_HDFS ARROW_ASSIGN_OR_RAISE(auto options, HdfsOptions::FromUri(uri)); + options.connection_config.fsRef = fsRef; if (out_path != nullptr) { *out_path = uri.path(); } @@ -736,6 +738,13 @@ Result> FileSystemFromUriReal(const Uri& uri, return Status::Invalid("Unrecognized filesystem type in URI: ", uri_string); } +Result> FileSystemFromUriReal(const Uri& uri, + const std::string& uri_string, + const io::IOContext& io_context, + std::string* out_path) { + return FileSystemFromUriReal(uri, uri_string, io_context, out_path, nullptr); +} + } // namespace Result> FileSystemFromUri(const std::string& uri_string, @@ -743,6 +752,12 @@ Result> FileSystemFromUri(const std::string& uri_str return FileSystemFromUri(uri_string, io::default_io_context(), out_path); } +Result> FileSystemFromUriAndFs(const std::string& uri_string, + std::string* out_path, void* fsRef) { + ARROW_ASSIGN_OR_RAISE(auto fsuri, ParseFileSystemUri(uri_string)) + return FileSystemFromUriReal(fsuri, uri_string, io::default_io_context(), out_path, fsRef); +} + Result> FileSystemFromUri(const std::string& uri_string, const io::IOContext& io_context, std::string* out_path) { diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index 559f1335f12..a4f22e78507 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -484,6 +484,10 @@ ARROW_EXPORT Result> FileSystemFromUri(const std::string& uri, std::string* out_path = NULLPTR); +ARROW_EXPORT +Result> FileSystemFromUriAndFs(const std::string& uri_string, + std::string* out_path, void* fsRef); + /// \brief Create a new FileSystem by URI with a custom IO context /// /// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3", diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 5d3edcd3ba6..d836992e36c 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -380,7 +380,12 @@ class HadoopFileSystem::HadoopFileSystemImpl { } driver_->BuilderSetForceNewInstance(builder); - fs_ = driver_->BuilderConnect(builder); + if (config -> fsRef == nullptr) { + fs_ = driver_->BuilderConnect(builder); + } else { + fs_ = *((hdfsFS* )(config -> fsRef)); + } + if (fs_ == nullptr) { return Status::IOError("HDFS connection failed"); diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index 46038070ae4..22f33c1d32c 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -90,6 +90,7 @@ struct HdfsConnectionConfig { std::string user; std::string kerb_ticket; std::unordered_map extra_conf; + void* fsRef = nullptr; }; class ARROW_EXPORT HadoopFileSystem : public FileSystem { diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 5640bc43496..42ee927f451 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -601,6 +601,28 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljav JNI_METHOD_END(-1L) } +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: makeFileSystemDatasetFactory + * Signature: (Ljava/lang/String;ILjava/lang/Object;)J + */ +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2ILjava_lang_Object_2( + JNIEnv* env, jobject, jstring uri, jint file_format_id, jobject fs) { + JNI_METHOD_START + std::shared_ptr file_format = + JniGetOrThrow(GetFileFormat(file_format_id)); + arrow::dataset::FileSystemFactoryOptions options; + jobject fsObj = env->NewGlobalRef(fs); + options.java_fs_global_ref = &fsObj; + + std::shared_ptr d = + JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( + JStringToCString(env, uri), file_format, options)); + return CreateNativeRef(d); + JNI_METHOD_END(-1L) +} + /* * Class: org_apache_arrow_dataset_file_JniWrapper * Method: makeFileSystemDatasetFactory diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index c3a1a4e58a1..813a40fd4aa 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -45,6 +45,18 @@ private JniWrapper() { */ public native long makeFileSystemDatasetFactory(String uri, int fileFormat); + /** + * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a + * intermediate shared_ptr of the factory instance. + * + * @param uri file uri to read, either a file or a directory + * @param fileFormat file format ID + * @param fs the existing java context file system + * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. + * @see FileFormat + */ + public native long makeFileSystemDatasetFactory(String uri, int fileFormat, Object fs); + /** * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a * intermediate shared_ptr of the factory instance. From 94983c75bb1c339606a58af9c66717d6507e5999 Mon Sep 17 00:00:00 2001 From: zhen wang Date: Fri, 15 Sep 2023 07:45:10 +0800 Subject: [PATCH 2/2] address comments, adjust naming and types --- cpp/src/arrow/dataset/discovery.cc | 2 +- cpp/src/arrow/dataset/discovery.h | 2 +- cpp/src/arrow/filesystem/filesystem.cc | 8 ++++---- cpp/src/arrow/filesystem/filesystem.h | 3 ++- cpp/src/arrow/io/hdfs.cc | 5 ++--- cpp/src/arrow/io/hdfs.h | 2 +- java/dataset/src/main/cpp/jni_wrapper.cc | 2 +- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 0d31eacbedb..28e29412b65 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -217,7 +217,7 @@ Result> FileSystemDatasetFactory::Make( // ARROW-12481. std::string internal_path; ARROW_ASSIGN_OR_RAISE(std::shared_ptr filesystem, - fs::FileSystemFromUriAndFs(uri, &internal_path, options.java_fs_global_ref)) + fs::FileSystemFromUriAndFs(uri, &internal_path, options.file_system_java)) ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, filesystem->GetFileInfo(internal_path)) if (file_info.IsDirectory()) { fs::FileSelector selector; diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index 02977ff1faa..ad4745bbebd 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -195,7 +195,7 @@ struct FileSystemFactoryOptions { /// when java context have a file system reference to be used // then use this file system reference in the context - void* java_fs_global_ref = nullptr; + std::shared_ptr file_system_java = nullptr; }; /// \brief FileSystemDatasetFactory creates a Dataset from a vector of diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 91f3cc1a064..a85431567a0 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -681,7 +681,7 @@ Result> FileSystemFromUriReal(const Uri& uri, const std::string& uri_string, const io::IOContext& io_context, std::string* out_path, - void* fsRef) { + std::shared_ptr file_system_java) { const auto scheme = uri.scheme(); if (scheme == "file") { @@ -704,7 +704,7 @@ Result> FileSystemFromUriReal(const Uri& uri, if (scheme == "hdfs" || scheme == "viewfs") { #ifdef ARROW_HDFS ARROW_ASSIGN_OR_RAISE(auto options, HdfsOptions::FromUri(uri)); - options.connection_config.fsRef = fsRef; + options.connection_config.filesystem_java = filesystem_java; if (out_path != nullptr) { *out_path = uri.path(); } @@ -753,9 +753,9 @@ Result> FileSystemFromUri(const std::string& uri_str } Result> FileSystemFromUriAndFs(const std::string& uri_string, - std::string* out_path, void* fsRef) { + std::string* out_path, std::shared_ptr file_system_java) { ARROW_ASSIGN_OR_RAISE(auto fsuri, ParseFileSystemUri(uri_string)) - return FileSystemFromUriReal(fsuri, uri_string, io::default_io_context(), out_path, fsRef); + return FileSystemFromUriReal(fsuri, uri_string, io::default_io_context(), out_path, file_system_java); } Result> FileSystemFromUri(const std::string& uri_string, diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index a4f22e78507..0ed4114d817 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -486,7 +486,8 @@ Result> FileSystemFromUri(const std::string& uri, ARROW_EXPORT Result> FileSystemFromUriAndFs(const std::string& uri_string, - std::string* out_path, void* fsRef); + std::string* out_path, + std::shared_ptr file_system_java); /// \brief Create a new FileSystem by URI with a custom IO context /// diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index d836992e36c..0320bc25a14 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -380,13 +380,12 @@ class HadoopFileSystem::HadoopFileSystemImpl { } driver_->BuilderSetForceNewInstance(builder); - if (config -> fsRef == nullptr) { + if (config -> filesystem_java == nullptr) { fs_ = driver_->BuilderConnect(builder); } else { - fs_ = *((hdfsFS* )(config -> fsRef)); + fs_ = *(static_cast(config -> filesystem_java.get())); } - if (fs_ == nullptr) { return Status::IOError("HDFS connection failed"); } diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index 22f33c1d32c..86d49df7943 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -90,7 +90,7 @@ struct HdfsConnectionConfig { std::string user; std::string kerb_ticket; std::unordered_map extra_conf; - void* fsRef = nullptr; + std::shared_ptr filesystem_java = nullptr; }; class ARROW_EXPORT HadoopFileSystem : public FileSystem { diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 42ee927f451..a82aeeb3942 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -614,7 +614,7 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljav JniGetOrThrow(GetFileFormat(file_format_id)); arrow::dataset::FileSystemFactoryOptions options; jobject fsObj = env->NewGlobalRef(fs); - options.java_fs_global_ref = &fsObj; + options.file_system_java = &fsObj; std::shared_ptr d = JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make(