diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 2aca85e6adc..28e29412b65 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -217,7 +217,7 @@ Result> FileSystemDatasetFactory::Make( // ARROW-12481. std::string internal_path; ARROW_ASSIGN_OR_RAISE(std::shared_ptr filesystem, - fs::FileSystemFromUri(uri, &internal_path)) + fs::FileSystemFromUriAndFs(uri, &internal_path, options.file_system_java)) ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, filesystem->GetFileInfo(internal_path)) if (file_info.IsDirectory()) { fs::FileSelector selector; diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index 238b33e40fe..ad4745bbebd 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -192,6 +192,10 @@ struct FileSystemFactoryOptions { ".", "_", }; + + /// when java context have a file system reference to be used + // then use this file system reference in the context + std::shared_ptr file_system_java = nullptr; }; /// \brief FileSystemDatasetFactory creates a Dataset from a vector of diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 9ecc4610f38..a85431567a0 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -680,7 +680,8 @@ namespace { Result> FileSystemFromUriReal(const Uri& uri, const std::string& uri_string, const io::IOContext& io_context, - std::string* out_path) { + std::string* out_path, + std::shared_ptr file_system_java) { const auto scheme = uri.scheme(); if (scheme == "file") { @@ -703,6 +704,7 @@ Result> FileSystemFromUriReal(const Uri& uri, if (scheme == "hdfs" || scheme == "viewfs") { #ifdef ARROW_HDFS ARROW_ASSIGN_OR_RAISE(auto options, HdfsOptions::FromUri(uri)); + options.connection_config.filesystem_java = filesystem_java; if (out_path != nullptr) { *out_path = uri.path(); } @@ -736,6 +738,13 @@ Result> FileSystemFromUriReal(const Uri& uri, return Status::Invalid("Unrecognized filesystem type in URI: ", uri_string); } +Result> FileSystemFromUriReal(const Uri& uri, + const std::string& uri_string, + const io::IOContext& io_context, + std::string* out_path) { + return FileSystemFromUriReal(uri, uri_string, io_context, out_path, nullptr); +} + } // namespace Result> FileSystemFromUri(const std::string& uri_string, @@ -743,6 +752,12 @@ Result> FileSystemFromUri(const std::string& uri_str return FileSystemFromUri(uri_string, io::default_io_context(), out_path); } +Result> FileSystemFromUriAndFs(const std::string& uri_string, + std::string* out_path, std::shared_ptr file_system_java) { + ARROW_ASSIGN_OR_RAISE(auto fsuri, ParseFileSystemUri(uri_string)) + return FileSystemFromUriReal(fsuri, uri_string, io::default_io_context(), out_path, file_system_java); +} + Result> FileSystemFromUri(const std::string& uri_string, const io::IOContext& io_context, std::string* out_path) { diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index 559f1335f12..0ed4114d817 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -484,6 +484,11 @@ ARROW_EXPORT Result> FileSystemFromUri(const std::string& uri, std::string* out_path = NULLPTR); +ARROW_EXPORT +Result> FileSystemFromUriAndFs(const std::string& uri_string, + std::string* out_path, + std::shared_ptr file_system_java); + /// \brief Create a new FileSystem by URI with a custom IO context /// /// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3", diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 5d3edcd3ba6..0320bc25a14 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -380,7 +380,11 @@ class HadoopFileSystem::HadoopFileSystemImpl { } driver_->BuilderSetForceNewInstance(builder); - fs_ = driver_->BuilderConnect(builder); + if (config -> filesystem_java == nullptr) { + fs_ = driver_->BuilderConnect(builder); + } else { + fs_ = *(static_cast(config -> filesystem_java.get())); + } if (fs_ == nullptr) { return Status::IOError("HDFS connection failed"); diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index 46038070ae4..86d49df7943 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -90,6 +90,7 @@ struct HdfsConnectionConfig { std::string user; std::string kerb_ticket; std::unordered_map extra_conf; + std::shared_ptr filesystem_java = nullptr; }; class ARROW_EXPORT HadoopFileSystem : public FileSystem { diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 5640bc43496..a82aeeb3942 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -601,6 +601,28 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljav JNI_METHOD_END(-1L) } +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: makeFileSystemDatasetFactory + * Signature: (Ljava/lang/String;ILjava/lang/Object;)J + */ +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2ILjava_lang_Object_2( + JNIEnv* env, jobject, jstring uri, jint file_format_id, jobject fs) { + JNI_METHOD_START + std::shared_ptr file_format = + JniGetOrThrow(GetFileFormat(file_format_id)); + arrow::dataset::FileSystemFactoryOptions options; + jobject fsObj = env->NewGlobalRef(fs); + options.file_system_java = &fsObj; + + std::shared_ptr d = + JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( + JStringToCString(env, uri), file_format, options)); + return CreateNativeRef(d); + JNI_METHOD_END(-1L) +} + /* * Class: org_apache_arrow_dataset_file_JniWrapper * Method: makeFileSystemDatasetFactory diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index c3a1a4e58a1..813a40fd4aa 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -45,6 +45,18 @@ private JniWrapper() { */ public native long makeFileSystemDatasetFactory(String uri, int fileFormat); + /** + * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a + * intermediate shared_ptr of the factory instance. + * + * @param uri file uri to read, either a file or a directory + * @param fileFormat file format ID + * @param fs the existing java context file system + * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. + * @see FileFormat + */ + public native long makeFileSystemDatasetFactory(String uri, int fileFormat, Object fs); + /** * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a * intermediate shared_ptr of the factory instance.