feat: Add JNI-based Hadoop FileSystem support for S3 and other Hadoop-compatible stores#1992
feat: Add JNI-based Hadoop FileSystem support for S3 and other Hadoop-compatible stores#1992drexler-sky wants to merge 7 commits intoapache:mainfrom
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1992 +/- ##
============================================
+ Coverage 56.12% 58.01% +1.88%
- Complexity 976 1152 +176
============================================
Files 119 134 +15
Lines 11743 13095 +1352
Branches 2251 2432 +181
============================================
+ Hits 6591 7597 +1006
- Misses 4012 4264 +252
- Partials 1140 1234 +94 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
parthchandra
left a comment
There was a problem hiding this comment.
Thanks @drexler-sky for this PR! I've been working on something similar and there is overlap as well as some differences in our respective methods. In particular I am looking at how to integrate with the native_iceberg_compat mode, as well as reusing the SeekableInputStream that native_iceberg_compat` already has.
I'll put my code in github in a day or so and we can discuss how to get to an implementation that works for both paths.
| * @param handle | ||
| */ | ||
| public static native void closeRecordBatchReader(long handle); | ||
|
|
There was a problem hiding this comment.
Can we move these to a new file, say JniHDFSBridge ?
| prost = "0.13.5" | ||
| jni = "0.21" | ||
| snap = "1.1" | ||
| chrono = { version = "0.4", default-features = false, features = ["clock"] } |
There was a problem hiding this comment.
Maybe use the dependency define in the main Cargo.toml (i.e. make this a workspace dependency)?
| /// # Returns | ||
| /// Returns `Ok(usize)` with the file size in bytes on success, or an `ObjectStoreError` | ||
| /// if the operation fails. | ||
| pub fn call_get_length( |
There was a problem hiding this comment.
just get_length perhaps?
There was a problem hiding this comment.
Changed to get_length
| /// # Returns | ||
| /// Returns `Ok(Vec<u8>)` containing the requested bytes on success, or an | ||
| /// `ObjectStoreError` if the operation fails. | ||
| pub fn call_read( |
There was a problem hiding this comment.
just read or maybe read_range ?
| &self, | ||
| _prefix: Option<&Path>, | ||
| ) -> BoxStream<'static, Result<ObjectMeta, ObjectStoreError>> { | ||
| futures::stream::empty().boxed() |
There was a problem hiding this comment.
Did you intend to return an empty stream here?
There was a problem hiding this comment.
Changed to todo!()
|
|
||
| let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if scheme == "hdfs" { | ||
| parse_hdfs_url(&url) | ||
| } else if scheme == "s3" && use_jni { |
There was a problem hiding this comment.
Ah, this is a tricky bit. If use_jni is true we should be able to use any file system available to hadoop. We really need to scan the config for all spark.hadoop.fs.customfs.impl and register the jni based object store for every customfs
There was a problem hiding this comment.
Thanks! I updated the code to scan for spark.hadoop.fs.<scheme>.impl configs and use JniObjectStore for any matching scheme when use_jni is true.
| @@ -0,0 +1,332 @@ | |||
| // Licensed to the Apache Software Foundation (ASF) under one | |||
There was a problem hiding this comment.
Can we name this jni_hdfs.rs since this is a jni based implementation of an hdfs object store?
parthchandra
left a comment
There was a problem hiding this comment.
Thanks for making the changes @drexler-sky. I did a more detailed pass and have some more comments.
| static JVM: OnceCell<JavaVM> = OnceCell::new(); | ||
|
|
||
| pub fn init_jvm(env: &JNIEnv) { | ||
| let _ = JVM.set(env.get_java_vm().expect("Failed to get JavaVM")); | ||
| } | ||
|
|
||
| fn get_jni_env<'a>() -> jni::AttachGuard<'a> { | ||
| JVM.get() | ||
| .expect("JVM not initialized") | ||
| .attach_current_thread() | ||
| .expect("Failed to attach thread") | ||
| } | ||
|
|
There was a problem hiding this comment.
We are already executing this code in a JVM and the JNIEnv is available to us in (jni_api.rs) Java_org_apache_comet_Native_executePlan We could probably pass that in all the way here.
| Ok(jmap) | ||
| } | ||
|
|
||
| pub fn jni_error(e: jni::errors::Error) -> ObjectStoreError { |
There was a problem hiding this comment.
jvm_bridge/mod.rs has a jni_map_error macro. It also has the very useful jni_call and jni_static_call. Perhaps we can use those and make the code consistent?
|
|
||
| let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if scheme == "hdfs" { | ||
| parse_hdfs_url(&url) | ||
| } else if use_jni && hadoop_schemes.contains(scheme) { |
There was a problem hiding this comment.
I think this check should come first, even before the s3a scheme has been renamed to s3. Note that a user could override the implementation to be used for the hdfs scheme.
Either way, if the use_jni flag is set, we should use jni if the scheme is hdfs or if the config specifies an implementation for the scheme.
Also , we can rename s3a to s3 only if use_jni is false; this way we won't be renaminbg s3a back and forth.
|
|
||
| test("Comet uses JNI object store when use_jni is true") { | ||
| spark.conf.set("spark.comet.use_jni_object_store", "true") | ||
| runParquetScanAndAssert() |
There was a problem hiding this comment.
Is there a way we can verify the jni implementation did get called?
|
@comphead @Kontinuation you might be interested in looking at this PR. |
comphead
left a comment
There was a problem hiding this comment.
Thanks @drexler-sky for the PR
Please help me to understand how this object store is different from https://github.com/apache/datafusion-comet/blob/main/native/hdfs/src/object_store/hdfs.rs ?
|
I took a look at the
BTW, is there any concern enabling hdfs support by default and switching the default fs-hdfs dependency to datafusion-comet/native/hdfs/Cargo.toml Lines 34 to 37 in d885f4a |
Thanks @Kontinuation I was about to create a PR to enable hdfs support by default @kazuyukitanimura cc for Reg to configuring the Hadoop Client from Rust side I used the command line So the Rust was able to get the Hadoop client configurations. But if it could be improved feel free to extend the |
|
Thank you for the comments. That all makes sense to me. Here’s the plan I propose:
While it’s currently possible to set configurations via environment variables or spark.hadoop.*, I believe enabling a more explicit and programmatic approach will improve flexibility and user experience. Does this approach sound reasonable to everyone? If so, I’ll start with step 1 and submit a PR to arrow-rs once it’s ready. |
I did not realize that |
|
I have a couple of use cases in mind that I'm hoping this will cover -
I'm assuming that as long as the we are using libhdfs, these cases are covered? |
Thanks @drexler-sky I think this approach makes a lot of sense. You probably also want to cover the cases if multiple config sources are set, like env var, spark hadoop and programmatical, which one should be overriding others. @parthchandra I'm not sure if with with without |
Hope so, my understanding is |
@comphead were you able to run this on a system with no hadoop/hdfs installed? I have an old version of hadoop on my mac and |
|
Update on this:
With the above addressed and with appropriate modifications to Comet to always use the hdfs object store, I was able to access an I'll verify with a custom credentials provider as well. So it appears that using the |
|
There are some problems with the approach of using fs-hdfs (libhdfs). Problems1. Linking against libjvm.soAs we discovered before, using fs-hdfs makes Here are the problems with linking against libjvm.so
Linux: macOS: We have to point 2. Problem with
|
I did not run into this issue when trying to use fs-hdfs with S3. I did have to add S3 to fs-hdfs (I'll submit a PR for this) and a hit a few minor issues but nothing major. I do recall we had the requirement to have libjvm in the library load path early on, but don't recall what changed to remove the requirement. |
With @drexler-sky and @Kontinuation's changes we might be in a better position to evaluate this approach in a production environment. |
+1 |
|
I found another problem when setting up GitHub Workflow for fs-hdfs:
The UPDATE: The CI failure is not caused by incomplete reads, but caused by hardcoding the value of O_APPEND on a specific platform instead of defining it in a portable way. But my point still holds. |
Yes, I saw this too. I will have a PR ready today. |
|
Should we set this to draft as we discuss the design going forward? |
|
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
Which issue does this PR close?
Closes #.
Rationale for this change
This PR adds support for Approach 2 (JNI-based Hadoop FileSystem access) to enable S3 reads via the native DataFusion Parquet scanner. The original discussion of both approaches can be found in issue #1766.
What changes are included in this PR?
Added JNI-based integration for accessing Hadoop FileSystem in Comet
Introduced a new config flag:
spark.comet.use_jni_object_storeto toggle this featureHow are these changes tested?
new test