From 176c2acb804f5cc6c6927e14dcd4284326e0668c Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Mon, 23 Jun 2025 13:37:43 -0700 Subject: [PATCH 1/4] Support hadoop s3a config in native_iceberg_compat --- .../java/org/apache/comet/parquet/Native.java | 4 +- .../comet/parquet/NativeBatchReader.java | 7 +++- .../comet/objectstore/NativeConfig.scala | 0 docs/source/user-guide/datasources.md | 7 +--- native/core/src/parquet/mod.rs | 37 +++++++++++++++++-- native/core/src/parquet/parquet_support.rs | 8 ---- .../parquet/ParquetReadFromS3Suite.scala | 3 -- 7 files changed, 44 insertions(+), 22 deletions(-) rename {spark => common}/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala (100%) diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java b/common/src/main/java/org/apache/comet/parquet/Native.java index 9070487ffa..c9ad447fd2 100644 --- a/common/src/main/java/org/apache/comet/parquet/Native.java +++ b/common/src/main/java/org/apache/comet/parquet/Native.java @@ -20,6 +20,7 @@ package org.apache.comet.parquet; import java.nio.ByteBuffer; +import java.util.Map; import org.apache.comet.NativeBase; @@ -258,7 +259,8 @@ public static native long initRecordBatchReader( byte[] dataSchema, String sessionTimezone, int batchSize, - boolean caseSensitive); + boolean caseSensitive, + Map objectStoreOptions); // arrow native version of read batch /** diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index b57746c17d..ec22c8e4db 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -75,6 +75,7 @@ import org.apache.comet.CometConf; import org.apache.comet.CometSchemaImporter; +import org.apache.comet.objectstore.NativeConfig; import org.apache.comet.shims.ShimBatchReader; import org.apache.comet.shims.ShimFileFormat; import org.apache.comet.vector.CometVector; @@ -253,6 +254,9 @@ public void init() throws Throwable { } ParquetReadOptions readOptions = builder.build(); + Map objectStoreOptions = + JavaConverters.mapAsJavaMap(NativeConfig.extractObjectStoreOptions(conf, file.pathUri())); + // TODO: enable off-heap buffer when they are ready ReadOptions cometReadOptions = ReadOptions.builder(conf).build(); @@ -420,7 +424,8 @@ public void init() throws Throwable { serializedDataArrowSchema, timeZoneId, batchSize, - caseSensitive); + caseSensitive, + objectStoreOptions); } isInitialized = true; } diff --git a/spark/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala b/common/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala similarity index 100% rename from spark/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala rename to common/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala diff --git a/docs/source/user-guide/datasources.md b/docs/source/user-guide/datasources.md index e6a5509261..bd28297f91 100644 --- a/docs/source/user-guide/datasources.md +++ b/docs/source/user-guide/datasources.md @@ -163,9 +163,9 @@ DataFusion Comet has [multiple Parquet scan implementations](./compatibility.md# The default `native_comet` Parquet scan implementation reads data from S3 using the [Hadoop-AWS module](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html), which is identical to the approach commonly used with vanilla Spark. AWS credential configuration and other Hadoop S3A configurations works the same way as in vanilla Spark. -### `native_datafusion` +### `native_datafusion` and `native_iceberg_compat` -The `native_datafusion` Parquet scan implementation completely offloads data loading to native code. It uses the [`object_store` crate](https://crates.io/crates/object_store) to read data from S3 and supports configuring S3 access using standard [Hadoop S3A configurations](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration) by translating them to the `object_store` crate's format. +The `native_datafusion` and `native_iceberg_compat` Parquet scan implementations completely offload data loading to native code. They use the [`object_store` crate](https://crates.io/crates/object_store) to read data from S3 and support configuring S3 access using standard [Hadoop S3A configurations](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration) by translating them to the `object_store` crate's format. This implementation maintains compatibility with existing Hadoop S3A configurations, so existing code will continue to work as long as the configurations are supported and can be translated without loss of functionality. @@ -240,6 +240,3 @@ The S3 support of `native_datafusion` has the following limitations: 2. **Custom credential providers**: Custom implementations of AWS credential providers are not supported. The implementation only supports the standard credential providers listed in the table above. We are planning to add support for custom credential providers through a JNI-based adapter that will allow calling Java credential providers from native code. See [issue #1829](https://github.com/apache/datafusion-comet/issues/1829) for more details. -### `native_iceberg_compat` - -The `native_iceberg_compat` Parquet scan implementation does not support reading data from S3 yet, but we are working on it. diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index d5a8fa2b84..91fb00f621 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -28,6 +28,7 @@ pub mod schema_adapter; mod objectstore; +use std::collections::HashMap; use std::task::Poll; use std::{boxed::Box, ptr::NonNull, sync::Arc}; @@ -53,7 +54,7 @@ use crate::execution::serde; use crate::execution::utils::SparkArrowConvert; use crate::parquet::data_type::AsBytes; use crate::parquet::parquet_exec::init_datasource_exec; -use crate::parquet::parquet_support::prepare_object_store; +use crate::parquet::parquet_support::prepare_object_store_with_configs; use arrow::array::{Array, RecordBatch}; use arrow::buffer::{Buffer, MutableBuffer}; use datafusion::datasource::listing::PartitionedFile; @@ -61,7 +62,9 @@ use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{SessionConfig, SessionContext}; use futures::{poll, StreamExt}; -use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, JString, ReleaseMode}; +use jni::objects::{ + JBooleanArray, JByteArray, JLongArray, JMap, JObject, JPrimitiveArray, JString, ReleaseMode, +}; use jni::sys::{jstring, JNI_FALSE}; use object_store::path::Path; use read::ColumnReader; @@ -644,6 +647,26 @@ fn get_file_groups_single_file( vec![groups] } +pub fn get_object_store_options( + env: &mut JNIEnv, + map_object: JObject, +) -> Result, CometError> { + let map = JMap::from_env(env, &map_object)?; + // Convert to a HashMap + let mut collected_map = HashMap::new(); + // let mut iter = map.iter(&mut env)?; + map.iter(env).and_then(|mut iter| { + while let Some((key, value)) = iter.next(env)? { + let key_string: String = String::from(env.get_string(&JString::from(key))?); + let value_string: String = String::from(env.get_string(&JString::from(value))?); + collected_map.insert(key_string, value_string); + } + Ok(()) + })?; + + Ok(collected_map) +} + /// # Safety /// This function is inherently unsafe since it deals with raw pointers passed from JNI. #[no_mangle] @@ -660,6 +683,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat session_timezone: jstring, batch_size: jint, case_sensitive: jboolean, + object_store_options: jobject, ) -> jlong { try_unwrap_or_throw(&e, |mut env| unsafe { let session_config = SessionConfig::new().with_batch_size(batch_size as usize); @@ -672,8 +696,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat .unwrap() .into(); - let (object_store_url, object_store_path) = - prepare_object_store(session_ctx.runtime_env(), path.clone())?; + let object_store_config = + get_object_store_options(&mut env, JObject::from_raw(object_store_options))?; + let (object_store_url, object_store_path) = prepare_object_store_with_configs( + session_ctx.runtime_env(), + path.clone(), + &object_store_config, + )?; let required_schema_array = JByteArray::from_raw(required_schema); let required_schema_buffer = env.convert_byte_array(&required_schema_array)?; diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 00961d33c0..600706671e 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -360,14 +360,6 @@ fn parse_hdfs_url(_url: &Url) -> Result<(Box, Path), object_sto }) } -/// Parses the url, registers the object store, and returns a tuple of the object store url and object store path -pub(crate) fn prepare_object_store( - runtime_env: Arc, - url: String, -) -> Result<(ObjectStoreUrl, Path), ExecutionError> { - prepare_object_store_with_configs(runtime_env, url, &HashMap::new()) -} - /// Parses the url, registers the object store with configurations, and returns a tuple of the object store url /// and object store path pub(crate) fn prepare_object_store_with_configs( diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala index e8e2cc1df2..5512c1893f 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala @@ -105,9 +105,6 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper } test("read parquet file from MinIO") { - // native_iceberg_compat mode does not have comprehensive S3 support, so we don't run tests - // under this mode. - assume(sys.env.getOrElse("COMET_PARQUET_SCAN_IMPL", "") != SCAN_NATIVE_ICEBERG_COMPAT) val testFilePath = s"s3a://$testBucketName/data/test-file.parquet" writeTestParquetFile(testFilePath) From 2af2464c38e65eb3720c18ebcf514db58d9f1505 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Mon, 23 Jun 2025 16:59:40 -0700 Subject: [PATCH 2/4] remove unused import --- .../scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala index 5512c1893f..f94e53ed94 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala @@ -34,8 +34,6 @@ import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.{col, sum} -import org.apache.comet.CometConf.SCAN_NATIVE_ICEBERG_COMPAT - import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider import software.amazon.awssdk.services.s3.S3Client From 8e606f16c5c84bacf467bfbeb6424a903b6c8a21 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Mon, 23 Jun 2025 18:22:25 -0700 Subject: [PATCH 3/4] Add back removed function --- native/core/src/parquet/parquet_support.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 600706671e..5c425c6688 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -398,13 +398,23 @@ pub(crate) fn prepare_object_store_with_configs( #[cfg(test)] mod tests { - use crate::parquet::parquet_support::prepare_object_store; + use crate::execution::operators::ExecutionError; + use crate::parquet::parquet_support::prepare_object_store_with_configs; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::execution::runtime_env::RuntimeEnv; use object_store::path::Path; + use std::collections::HashMap; use std::sync::Arc; use url::Url; + /// Parses the url, registers the object store, and returns a tuple of the object store url and object store path + pub(crate) fn prepare_object_store( + runtime_env: Arc, + url: String, + ) -> Result<(ObjectStoreUrl, Path), ExecutionError> { + prepare_object_store_with_configs(runtime_env, url, &HashMap::new()) + } + #[cfg(not(feature = "hdfs"))] #[test] fn test_prepare_object_store() { From 1a7362d7db05163cd3490d2998386ec51f00c43e Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Tue, 24 Jun 2025 15:25:19 -0700 Subject: [PATCH 4/4] remove commented code --- native/core/src/parquet/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 91fb00f621..840e43d57d 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -654,7 +654,6 @@ pub fn get_object_store_options( let map = JMap::from_env(env, &map_object)?; // Convert to a HashMap let mut collected_map = HashMap::new(); - // let mut iter = map.iter(&mut env)?; map.iter(env).and_then(|mut iter| { while let Some((key, value)) = iter.next(env)? { let key_string: String = String::from(env.get_string(&JString::from(key))?);