Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion common/src/main/java/org/apache/comet/parquet/Native.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.comet.parquet;

import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.comet.NativeBase;

Expand Down Expand Up @@ -258,7 +259,8 @@ public static native long initRecordBatchReader(
byte[] dataSchema,
String sessionTimezone,
int batchSize,
boolean caseSensitive);
boolean caseSensitive,
Map<String, String> objectStoreOptions);

// arrow native version of read batch
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

import org.apache.comet.CometConf;
import org.apache.comet.CometSchemaImporter;
import org.apache.comet.objectstore.NativeConfig;
import org.apache.comet.shims.ShimBatchReader;
import org.apache.comet.shims.ShimFileFormat;
import org.apache.comet.vector.CometVector;
Expand Down Expand Up @@ -253,6 +254,9 @@ public void init() throws Throwable {
}
ParquetReadOptions readOptions = builder.build();

Map<String, String> objectStoreOptions =
JavaConverters.mapAsJavaMap(NativeConfig.extractObjectStoreOptions(conf, file.pathUri()));

// TODO: enable off-heap buffer when they are ready
ReadOptions cometReadOptions = ReadOptions.builder(conf).build();

Expand Down Expand Up @@ -420,7 +424,8 @@ public void init() throws Throwable {
serializedDataArrowSchema,
timeZoneId,
batchSize,
caseSensitive);
caseSensitive,
objectStoreOptions);
}
isInitialized = true;
}
Expand Down
7 changes: 2 additions & 5 deletions docs/source/user-guide/datasources.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ DataFusion Comet has [multiple Parquet scan implementations](./compatibility.md#

The default `native_comet` Parquet scan implementation reads data from S3 using the [Hadoop-AWS module](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html), which is identical to the approach commonly used with vanilla Spark. AWS credential configuration and other Hadoop S3A configurations works the same way as in vanilla Spark.

### `native_datafusion`
### `native_datafusion` and `native_iceberg_compat`

The `native_datafusion` Parquet scan implementation completely offloads data loading to native code. It uses the [`object_store` crate](https://crates.io/crates/object_store) to read data from S3 and supports configuring S3 access using standard [Hadoop S3A configurations](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration) by translating them to the `object_store` crate's format.
The `native_datafusion` and `native_iceberg_compat` Parquet scan implementations completely offload data loading to native code. They use the [`object_store` crate](https://crates.io/crates/object_store) to read data from S3 and support configuring S3 access using standard [Hadoop S3A configurations](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration) by translating them to the `object_store` crate's format.

This implementation maintains compatibility with existing Hadoop S3A configurations, so existing code will continue to work as long as the configurations are supported and can be translated without loss of functionality.

Expand Down Expand Up @@ -240,6 +240,3 @@ The S3 support of `native_datafusion` has the following limitations:

2. **Custom credential providers**: Custom implementations of AWS credential providers are not supported. The implementation only supports the standard credential providers listed in the table above. We are planning to add support for custom credential providers through a JNI-based adapter that will allow calling Java credential providers from native code. See [issue #1829](https://github.com/apache/datafusion-comet/issues/1829) for more details.

### `native_iceberg_compat`

The `native_iceberg_compat` Parquet scan implementation does not support reading data from S3 yet, but we are working on it.
36 changes: 32 additions & 4 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod schema_adapter;

mod objectstore;

use std::collections::HashMap;
use std::task::Poll;
use std::{boxed::Box, ptr::NonNull, sync::Arc};

Expand All @@ -53,15 +54,17 @@ use crate::execution::serde;
use crate::execution::utils::SparkArrowConvert;
use crate::parquet::data_type::AsBytes;
use crate::parquet::parquet_exec::init_datasource_exec;
use crate::parquet::parquet_support::prepare_object_store;
use crate::parquet::parquet_support::prepare_object_store_with_configs;
use arrow::array::{Array, RecordBatch};
use arrow::buffer::{Buffer, MutableBuffer};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{SessionConfig, SessionContext};
use futures::{poll, StreamExt};
use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, JString, ReleaseMode};
use jni::objects::{
JBooleanArray, JByteArray, JLongArray, JMap, JObject, JPrimitiveArray, JString, ReleaseMode,
};
use jni::sys::{jstring, JNI_FALSE};
use object_store::path::Path;
use read::ColumnReader;
Expand Down Expand Up @@ -644,6 +647,25 @@ fn get_file_groups_single_file(
vec![groups]
}

pub fn get_object_store_options(
env: &mut JNIEnv,
map_object: JObject,
) -> Result<HashMap<String, String>, CometError> {
let map = JMap::from_env(env, &map_object)?;
// Convert to a HashMap
let mut collected_map = HashMap::new();
map.iter(env).and_then(|mut iter| {
while let Some((key, value)) = iter.next(env)? {
let key_string: String = String::from(env.get_string(&JString::from(key))?);
let value_string: String = String::from(env.get_string(&JString::from(value))?);
collected_map.insert(key_string, value_string);
}
Ok(())
})?;

Ok(collected_map)
}

/// # Safety
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
#[no_mangle]
Expand All @@ -660,6 +682,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
session_timezone: jstring,
batch_size: jint,
case_sensitive: jboolean,
object_store_options: jobject,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| unsafe {
let session_config = SessionConfig::new().with_batch_size(batch_size as usize);
Expand All @@ -672,8 +695,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
.unwrap()
.into();

let (object_store_url, object_store_path) =
prepare_object_store(session_ctx.runtime_env(), path.clone())?;
let object_store_config =
get_object_store_options(&mut env, JObject::from_raw(object_store_options))?;
let (object_store_url, object_store_path) = prepare_object_store_with_configs(
session_ctx.runtime_env(),
path.clone(),
&object_store_config,
)?;

let required_schema_array = JByteArray::from_raw(required_schema);
let required_schema_buffer = env.convert_byte_array(&required_schema_array)?;
Expand Down
20 changes: 11 additions & 9 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,14 +360,6 @@ fn parse_hdfs_url(_url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_sto
})
}

/// Parses the url, registers the object store, and returns a tuple of the object store url and object store path
pub(crate) fn prepare_object_store(
runtime_env: Arc<RuntimeEnv>,
url: String,
) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
prepare_object_store_with_configs(runtime_env, url, &HashMap::new())
}

/// Parses the url, registers the object store with configurations, and returns a tuple of the object store url
/// and object store path
pub(crate) fn prepare_object_store_with_configs(
Expand Down Expand Up @@ -406,13 +398,23 @@ pub(crate) fn prepare_object_store_with_configs(

#[cfg(test)]
mod tests {
use crate::parquet::parquet_support::prepare_object_store;
use crate::execution::operators::ExecutionError;
use crate::parquet::parquet_support::prepare_object_store_with_configs;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
use object_store::path::Path;
use std::collections::HashMap;
use std::sync::Arc;
use url::Url;

/// Parses the url, registers the object store, and returns a tuple of the object store url and object store path
pub(crate) fn prepare_object_store(
runtime_env: Arc<RuntimeEnv>,
url: String,
) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
prepare_object_store_with_configs(runtime_env, url, &HashMap::new())
}

#[cfg(not(feature = "hdfs"))]
#[test]
fn test_prepare_object_store() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import org.apache.spark.sql.comet.CometScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.{col, sum}

import org.apache.comet.CometConf.SCAN_NATIVE_ICEBERG_COMPAT

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.services.s3.S3Client
Expand Down Expand Up @@ -105,9 +103,6 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper
}

test("read parquet file from MinIO") {
// native_iceberg_compat mode does not have comprehensive S3 support, so we don't run tests
// under this mode.
assume(sys.env.getOrElse("COMET_PARQUET_SCAN_IMPL", "") != SCAN_NATIVE_ICEBERG_COMPAT)

val testFilePath = s"s3a://$testBucketName/data/test-file.parquet"
writeTestParquetFile(testFilePath)
Expand Down
Loading