From f5c800e0ec4392cd9faf78386de27e38b15227ae Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Sun, 6 Jul 2025 18:29:09 -0700 Subject: [PATCH 1/7] feat: Add JNI-based Hadoop FileSystem support for S3 and other Hadoop-compatible stores --- .../java/org/apache/comet/parquet/Native.java | 108 ++++++ .../scala/org/apache/comet/CometConf.scala | 9 + native/Cargo.lock | 1 + native/core/Cargo.toml | 1 + native/core/src/execution/jni_api.rs | 5 + native/core/src/parquet/mod.rs | 2 +- native/core/src/parquet/objectstore/jni.rs | 353 ++++++++++++++++++ native/core/src/parquet/objectstore/mod.rs | 1 + native/core/src/parquet/parquet_support.rs | 20 + .../apache/comet/serde/QueryPlanSerde.scala | 8 +- .../parquet/ParquetReadFromS3Suite.scala | 19 +- 11 files changed, 520 insertions(+), 7 deletions(-) create mode 100644 native/core/src/parquet/objectstore/jni.rs diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java b/common/src/main/java/org/apache/comet/parquet/Native.java index c9ad447fd2..91eb5145d4 100644 --- a/common/src/main/java/org/apache/comet/parquet/Native.java +++ b/common/src/main/java/org/apache/comet/parquet/Native.java @@ -19,9 +19,16 @@ package org.apache.comet.parquet; +import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + import org.apache.comet.NativeBase; public final class Native extends NativeBase { @@ -292,4 +299,105 @@ public static native void currentColumnBatch( * @param handle */ public static native void closeRecordBatchReader(long handle); + + /** + * Reads a byte range from a file using Hadoop FileSystem API. + * + * @param path The file path to read from + * @param configs Configuration properties for the filesystem + * @param offset Starting byte position (0-based) + * @param len Number of bytes to read + * @return Byte array containing the read data, or null if error occurs + * @throws IllegalArgumentException If parameters are invalid + */ + public static byte[] read(String path, Map configs, long offset, int len) { + if (path == null || path.isEmpty()) { + throw new IllegalArgumentException("Path cannot be null or empty"); + } + if (offset < 0) { + throw new IllegalArgumentException("Offset cannot be negative"); + } + if (len < 0) { + throw new IllegalArgumentException("Length cannot be negative"); + } + + try { + Path p = new Path(path); + Configuration conf = new Configuration(); + + // Set configurations if provided + if (configs != null) { + for (Map.Entry entry : configs.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + } + org.apache.hadoop.fs.FileSystem fs = p.getFileSystem(conf); + + long fileLen = fs.getFileStatus(p).getLen(); + + if (offset > fileLen) { + throw new IOException( + "Offset beyond file length: offset=" + offset + ", fileLen=" + fileLen); + } + + if (len == 0) { + return new byte[0]; + } + + // Adjust length if it exceeds remaining bytes + if (offset + len > fileLen) { + len = (int) (fileLen - offset); + if (len <= 0) { + return new byte[0]; + } + } + + FSDataInputStream inputStream = fs.open(p); + inputStream.seek(offset); + byte[] buffer = new byte[len]; + int totalBytesRead = 0; + while (totalBytesRead < len) { + int read = inputStream.read(buffer, totalBytesRead, len - totalBytesRead); + if (read == -1) break; + totalBytesRead += read; + } + inputStream.close(); + + return totalBytesRead < len ? Arrays.copyOf(buffer, totalBytesRead) : buffer; + + } catch (Exception e) { + System.err.println("Native.read failed: " + e); + return null; + } + } + + /** + * Gets the length of a file using Hadoop FileSystem API. + * + * @param path The file path to check + * @param configs Configuration properties for the filesystem + * @return File length in bytes, or -1 if the file doesn't exist + * @throws IllegalArgumentException If path is invalid or configs contain invalid values + */ + public static long getLength(String path, Map configs) { + if (path == null || path.isEmpty()) { + throw new IllegalArgumentException("Path cannot be null or empty"); + } + + try { + Path p = new Path(path); + Configuration conf = new Configuration(); + if (configs != null) { + for (Map.Entry entry : configs.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + } + + FileSystem fs = p.getFileSystem(conf); + return fs.getFileStatus(p).getLen(); + } catch (Exception e) { + System.err.println("Native.getLength failed: " + e); + return -1; + } + } } diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 4575a2fb78..b73b23d169 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -65,6 +65,15 @@ object CometConf extends ShimCometConf { val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec"; + val COMET_USE_JNI_OBJECT_STORE: ConfigEntry[Boolean] = + conf("spark.comet.use_jni_object_store") + .doc( + "If enabled, Comet will access Hadoop-compatible file systems using the Hadoop FileSystem" + + " API via JNI, bypassing the native Rust object store implementations.") + .internal() + .booleanConf + .createWithDefault(false) + val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled") .doc( "Whether to enable Comet extension for Spark. When this is turned on, Spark will use " + diff --git a/native/Cargo.lock b/native/Cargo.lock index b6e4c0e0f7..af8eee2027 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1370,6 +1370,7 @@ dependencies = [ "aws-config", "aws-credential-types", "bytes", + "chrono", "crc32fast", "criterion", "datafusion", diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index e248068b35..8c15905e92 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -50,6 +50,7 @@ lazy_static = "1.4.0" prost = "0.13.5" jni = "0.21" snap = "1.1" +chrono = { version = "0.4", default-features = false, features = ["clock"] } # we disable default features in lz4_flex to force the use of the faster unsafe encoding and decoding implementation lz4_flex = { version = "0.11.3", default-features = false } zstd = "0.13.3" diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a7ddce34fd..950c10535f 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -77,6 +77,9 @@ use log::info; use once_cell::sync::Lazy; #[cfg(feature = "jemalloc")] use tikv_jemalloc_ctl::{epoch, stats}; +use crate::parquet::objectstore::jni::init_jvm; + + static TOKIO_RUNTIME: Lazy = Lazy::new(|| { let mut builder = tokio::runtime::Builder::new_multi_thread(); @@ -166,6 +169,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( ) -> jlong { try_unwrap_or_throw(&e, |mut env| { with_trace("createPlan", tracing_enabled != JNI_FALSE, || { + init_jvm(&env); + // Init JVM classes JVMClasses::init(&mut env); diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 2f23a8cfd8..d13603af7d 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -26,7 +26,7 @@ pub mod parquet_support; pub mod read; pub mod schema_adapter; -mod objectstore; +pub mod objectstore; use std::collections::HashMap; use std::task::Poll; diff --git a/native/core/src/parquet/objectstore/jni.rs b/native/core/src/parquet/objectstore/jni.rs new file mode 100644 index 0000000000..224cc5aa22 --- /dev/null +++ b/native/core/src/parquet/objectstore/jni.rs @@ -0,0 +1,353 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use bytes::Bytes; +use chrono::Utc; +use futures::{stream, stream::BoxStream, StreamExt}; +use jni::{ + objects::{JClass, JObject, JValue}, + JNIEnv, JavaVM +}; +use once_cell::sync::OnceCell; +use object_store::{ + path::Path, + Attributes, Error as ObjectStoreError, GetOptions, GetRange, GetResult, GetResultPayload, + ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, + PutResult, +}; + +static JVM: OnceCell = OnceCell::new(); + +pub fn init_jvm(env: &JNIEnv) { + let _ = JVM.set(env.get_java_vm().expect("Failed to get JavaVM")); +} + +fn get_jni_env<'a>() -> jni::AttachGuard<'a> { + JVM.get() + .expect("JVM not initialized") + .attach_current_thread() + .expect("Failed to attach thread") +} + +mod jni_helpers { + use super::*; + + pub fn create_jni_hashmap<'local>( + env: &mut JNIEnv<'local>, + configs: &HashMap, + ) -> Result, ObjectStoreError> { + let map_class = env.find_class("java/util/HashMap").map_err(jni_error)?; + let jmap = env.new_object(map_class, "()V", &[]).map_err(jni_error)?; + + for (k, v) in configs { + let jkey = env.new_string(k).map_err(jni_error)?; + let jval = env.new_string(v).map_err(jni_error)?; + + env.call_method( + &jmap, + "put", + "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;", + &[JValue::Object(&jkey), JValue::Object(&jval)], + ) + .map_err(jni_error)?; + } + + Ok(jmap) + } + + pub fn jni_error(e: jni::errors::Error) -> ObjectStoreError { + ObjectStoreError::Generic { + store: "jni", + source: Box::new(e), + } + } + + pub fn get_native_class<'local>( + env: &mut JNIEnv<'local>, + ) -> Result, ObjectStoreError> { + env.find_class("org/apache/comet/parquet/Native") + .map_err(jni_error) + } +} + +/// Retrieves the length (in bytes) of a file through JNI interface. +/// +/// This method makes a JNI call to Java to get the size of the file at the specified path, +/// using the provided configuration parameters for the storage backend. +/// +/// # Arguments +/// * `path` - The filesystem path or URI of the target file +/// * `configs` - Configuration parameters for the storage backend as key-value pairs. +/// Common configurations include authentication credentials, region settings, +/// and timeout values. +/// +/// # Returns +/// Returns `Ok(usize)` with the file size in bytes on success, or an `ObjectStoreError` +/// if the operation fails. +pub fn call_get_length( + path: &str, + configs: &HashMap, +) -> Result { + let mut env = get_jni_env(); + let jmap = jni_helpers::create_jni_hashmap(&mut env, configs)?; + let class = jni_helpers::get_native_class(&mut env)?; + let jpath = env.new_string(path).map_err(jni_helpers::jni_error)?; + + let result = env + .call_static_method( + class, + "getLength", + "(Ljava/lang/String;Ljava/util/Map;)J", + &[JValue::Object(&jpath), JValue::Object(&jmap)], + ) + .map_err(jni_helpers::jni_error)? + .j() + .unwrap_or(-1); + + if result < 0 { + Err(ObjectStoreError::NotFound { + path: path.to_string(), + source: Box::new(Arc::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("File not found or error reading: {}", path), + ))), + }) + } else { + Ok(result as usize) + } +} + +/// Reads a range of bytes from a file through JNI interface. +/// +/// # Arguments +/// * `raw_path` - The filesystem path or URI of the file to read +/// * `configs` - Configuration parameters for the read operation as key-value pairs +/// * `offset` - The starting byte position to read from (0-based) +/// * `len` - The number of bytes to read +/// +/// # Returns +/// Returns `Ok(Vec)` containing the requested bytes on success, or an +/// `ObjectStoreError` if the operation fails. +pub fn call_read( + path: &str, + configs: &HashMap, + offset: usize, + len: usize, +) -> Result, ObjectStoreError> { + let mut env = get_jni_env(); + let jmap = jni_helpers::create_jni_hashmap(&mut env, configs)?; + let class = jni_helpers::get_native_class(&mut env)?; + + let jpath = env.new_string(path).map_err(jni_helpers::jni_error)?; + + let result = env + .call_static_method( + class, + "read", + "(Ljava/lang/String;Ljava/util/Map;JI)[B", + &[ + JValue::Object(&jpath), + JValue::Object(&jmap), + JValue::Long(offset as i64), + JValue::Int(len as i32), + ], + ) + .map_err(jni_helpers::jni_error)?; + + let byte_array = jni::objects::JByteArray::from( + result + .l() + .map_err(jni_helpers::jni_error)?, + ); + + if byte_array.is_null() { + return Err(ObjectStoreError::Generic { + store: "jni", + source: "Received null byte array from Java".into(), + }); + } + + let output = env + .convert_byte_array(byte_array) + .map_err(jni_helpers::jni_error)?; + Ok(output) +} + +/// A JNI-backed implementation of [`ObjectStore`] for interacting with storage systems +/// through Java Native Interface. +#[derive(Debug, Clone)] +pub struct JniObjectStore { + base_uri: String, + configs: HashMap, +} + +// Mark as thread-safe +unsafe impl Send for JniObjectStore {} +unsafe impl Sync for JniObjectStore {} + +impl JniObjectStore { + /// Creates a new JniObjectStore with the given base URI and configurations. + pub fn new(base_uri: String, configs: HashMap) -> Self { + Self { + base_uri: base_uri.trim_end_matches('/').to_string(), + configs, + } + } + + /// Converts a relative path to an absolute URI using the store's base URI. + fn to_absolute_uri(&self, location: &Path) -> String { + let path_str = location.to_string(); + + // If already absolute-looking (s3a://, hdfs://, etc.), return as is + if path_str.contains("://") { + return path_str; + } + + // Handle absolute-looking paths (start with /) + let clean_path = path_str.trim_start_matches('/'); + format!("{}/{}", self.base_uri, clean_path) + } +} + +impl std::fmt::Display for JniObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "JniObjectStore") + } +} + +#[async_trait::async_trait] +impl ObjectStore for JniObjectStore { + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> Result { + let path_str = self.to_absolute_uri(location); + let total_len = call_get_length(&path_str, &self.configs)? as u64; + + let range = match options.range { + Some(GetRange::Bounded(range)) => range, + Some(GetRange::Offset(offset)) => offset..total_len, + Some(GetRange::Suffix(length)) => { + let start = total_len.saturating_sub(length); + start..total_len + } + None => 0..total_len, + }; + + if range.end > total_len { + return Err(ObjectStoreError::NotFound { + path: path_str.clone(), + source: format!( + "Invalid range {}-{} for file of length {}", + range.start, + range.end, + total_len + ).into(), + }); + } + + let range_len = (range.end - range.start) as usize; + let bytes = call_read(&path_str, &self.configs, range.start as usize, range_len)?; + + Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(stream::once(async move { + Ok(Bytes::from(bytes)) + }))), + meta: ObjectMeta { + location: location.clone(), + last_modified: Utc::now(), + size: total_len, + version: None, + e_tag: None, + }, + range, + attributes: Attributes::default(), + }) + } + + async fn put_opts( + &self, + _location: &Path, + _bytes: PutPayload, + _opts: PutOptions, + ) -> Result { + todo!() + } + + async fn put_multipart_opts( + &self, + _location: &Path, + _opts: PutMultipartOpts, + ) -> Result, ObjectStoreError> { + todo!() + } + + async fn delete( + &self, + _location: &Path, + ) -> Result<(), ObjectStoreError> { + todo!() + } + + fn list( + &self, + _prefix: Option<&Path>, + ) -> BoxStream<'static, Result> { + futures::stream::empty().boxed() + } + + async fn list_with_delimiter( + &self, + _prefix: Option<&Path>, + ) -> Result { + todo!() + } + + async fn copy( + &self, + _from: &Path, + _to: &Path, + ) -> Result<(), ObjectStoreError> { + todo!() + } + + async fn copy_if_not_exists( + &self, + _from: &Path, + _to: &Path, + ) -> Result<(), ObjectStoreError> { + todo!() + } + async fn head( + &self, + location: &Path, + ) -> Result { + let path = location.to_string(); + let len = call_get_length(&path, &self.configs)? as usize; + Ok(ObjectMeta { + location: location.clone(), + last_modified: Utc::now(), + size: len as u64, + version: None, + e_tag: None, + }) + } +} diff --git a/native/core/src/parquet/objectstore/mod.rs b/native/core/src/parquet/objectstore/mod.rs index bedae08f69..5317d05561 100644 --- a/native/core/src/parquet/objectstore/mod.rs +++ b/native/core/src/parquet/objectstore/mod.rs @@ -16,3 +16,4 @@ // under the License. pub mod s3; +pub mod jni; \ No newline at end of file diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 5c425c6688..929719a9f1 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -43,6 +43,8 @@ use std::{fmt::Debug, hash::Hash, sync::Arc}; use url::Url; use super::objectstore; +use super::objectstore::jni::JniObjectStore; +use datafusion::execution::context::SessionContext; // This file originates from cast.rs. While developing native scan support and implementing // SparkSchemaAdapter we observed that Spark's type conversion logic on Parquet reads does not @@ -367,8 +369,15 @@ pub(crate) fn prepare_object_store_with_configs( url: String, object_store_configs: &HashMap, ) -> Result<(ObjectStoreUrl, Path), ExecutionError> { + let use_jni = object_store_configs + .get("use_jni") + .map(|v| v == "true") + .unwrap_or(false); + let mut url = Url::parse(url.as_str()) .map_err(|e| ExecutionError::GeneralError(format!("Error parsing URL {url}: {e}")))?; + let original_scheme = url.scheme().to_string(); + let mut scheme = url.scheme(); if scheme == "s3a" { scheme = "s3"; @@ -384,6 +393,17 @@ pub(crate) fn prepare_object_store_with_configs( let (object_store, object_store_path): (Box, Path) = if scheme == "hdfs" { parse_hdfs_url(&url) + } else if scheme == "s3" && use_jni { + let base_uri = format!( + "{}://{}", + original_scheme, + &url[url::Position::BeforeHost..url::Position::AfterPort] + ); + let store = JniObjectStore::new(base_uri, object_store_configs.clone()); + Ok(( + Box::new(store) as Box, + Path::from(url.path()), + )) } else if scheme == "s3" { objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300)) } else { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index aebdbdea94..608b305cd9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -50,6 +50,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometConf +import org.apache.comet.CometConf.COMET_USE_JNI_OBJECT_STORE import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo} import org.apache.comet.expressions._ import org.apache.comet.objectstore.NativeConfig @@ -2284,8 +2285,13 @@ object QueryPlanSerde extends Logging with CometExprShim { val hadoopConf = scan.relation.sparkSession.sessionState .newHadoopConfWithOptions(scan.relation.options) firstPartition.foreach { partitionFile => - val objectStoreOptions = + val baseOptions = NativeConfig.extractObjectStoreOptions(hadoopConf, partitionFile.pathUri) + + val useJni = + COMET_USE_JNI_OBJECT_STORE.get(scan.relation.sparkSession.sessionState.conf) + val objectStoreOptions = baseOptions.updated("use_jni", useJni.toString) + objectStoreOptions.foreach { case (key, value) => nativeScanBuilder.putObjectStoreOptions(key, value) } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala index f94e53ed94..b0f4a56460 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.functions.{col, sum} import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider +import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3Client import software.amazon.awssdk.services.s3.model.CreateBucketRequest import software.amazon.awssdk.services.s3.model.HeadBucketRequest @@ -103,19 +104,27 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper } test("read parquet file from MinIO") { + runParquetScanAndAssert() + } + + test("Comet uses JNI object store when use_jni is true") { + spark.conf.set("spark.comet.use_jni_object_store", "true") + runParquetScanAndAssert() + } + private def runParquetScanAndAssert(): Unit = { val testFilePath = s"s3a://$testBucketName/data/test-file.parquet" + writeTestParquetFile(testFilePath) val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) + val scans = collect(df.queryExecution.executedPlan) { - case p: CometScanExec => - p - case p: CometNativeScanExec => - p + case p: CometScanExec => p + case p: CometNativeScanExec => p } - assert(scans.size == 1) + assert(scans.size == 1) assert(df.first().getLong(0) == 499500) } } From bf465d08d17559a054df9ffc657b39dd426328e7 Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Sun, 6 Jul 2025 23:45:09 -0700 Subject: [PATCH 2/7] fix format --- .../java/org/apache/comet/parquet/Native.java | 1 - native/core/src/execution/jni_api.rs | 2 - native/core/src/parquet/objectstore/jni.rs | 45 ++++++------------- native/core/src/parquet/objectstore/mod.rs | 2 +- .../parquet/ParquetReadFromS3Suite.scala | 2 - 5 files changed, 14 insertions(+), 38 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java b/common/src/main/java/org/apache/comet/parquet/Native.java index 91eb5145d4..6526d8ee7d 100644 --- a/common/src/main/java/org/apache/comet/parquet/Native.java +++ b/common/src/main/java/org/apache/comet/parquet/Native.java @@ -364,7 +364,6 @@ public static byte[] read(String path, Map configs, long offset, inputStream.close(); return totalBytesRead < len ? Arrays.copyOf(buffer, totalBytesRead) : buffer; - } catch (Exception e) { System.err.println("Native.read failed: " + e); return null; diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 950c10535f..e5efd4b9cf 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -79,8 +79,6 @@ use once_cell::sync::Lazy; use tikv_jemalloc_ctl::{epoch, stats}; use crate::parquet::objectstore::jni::init_jvm; - - static TOKIO_RUNTIME: Lazy = Lazy::new(|| { let mut builder = tokio::runtime::Builder::new_multi_thread(); if let Some(n) = parse_usize_env_var("COMET_WORKER_THREADS") { diff --git a/native/core/src/parquet/objectstore/jni.rs b/native/core/src/parquet/objectstore/jni.rs index 224cc5aa22..7e88c47d92 100644 --- a/native/core/src/parquet/objectstore/jni.rs +++ b/native/core/src/parquet/objectstore/jni.rs @@ -25,13 +25,12 @@ use jni::{ objects::{JClass, JObject, JValue}, JNIEnv, JavaVM }; -use once_cell::sync::OnceCell; use object_store::{ - path::Path, - Attributes, Error as ObjectStoreError, GetOptions, GetRange, GetResult, GetResultPayload, - ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, - PutResult, + path::Path, Attributes, Error as ObjectStoreError, GetOptions, GetRange, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, + PutOptions, PutPayload, PutResult, }; +use once_cell::sync::OnceCell; static JVM: OnceCell = OnceCell::new(); @@ -171,11 +170,7 @@ pub fn call_read( ) .map_err(jni_helpers::jni_error)?; - let byte_array = jni::objects::JByteArray::from( - result - .l() - .map_err(jni_helpers::jni_error)?, - ); + let byte_array = jni::objects::JByteArray::from(result.l().map_err(jni_helpers::jni_error)?); if byte_array.is_null() { return Err(ObjectStoreError::Generic { @@ -257,10 +252,9 @@ impl ObjectStore for JniObjectStore { path: path_str.clone(), source: format!( "Invalid range {}-{} for file of length {}", - range.start, - range.end, - total_len - ).into(), + range.start, range.end, total_len + ) + .into(), }); } @@ -300,10 +294,7 @@ impl ObjectStore for JniObjectStore { todo!() } - async fn delete( - &self, - _location: &Path, - ) -> Result<(), ObjectStoreError> { + async fn delete(&self, _location: &Path) -> Result<(), ObjectStoreError> { todo!() } @@ -321,25 +312,15 @@ impl ObjectStore for JniObjectStore { todo!() } - async fn copy( - &self, - _from: &Path, - _to: &Path, - ) -> Result<(), ObjectStoreError> { + async fn copy(&self, _from: &Path, _to: &Path) -> Result<(), ObjectStoreError> { todo!() } - async fn copy_if_not_exists( - &self, - _from: &Path, - _to: &Path, - ) -> Result<(), ObjectStoreError> { + async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<(), ObjectStoreError> { todo!() } - async fn head( - &self, - location: &Path, - ) -> Result { + + async fn head(&self, location: &Path) -> Result { let path = location.to_string(); let len = call_get_length(&path, &self.configs)? as usize; Ok(ObjectMeta { diff --git a/native/core/src/parquet/objectstore/mod.rs b/native/core/src/parquet/objectstore/mod.rs index 5317d05561..ef23453995 100644 --- a/native/core/src/parquet/objectstore/mod.rs +++ b/native/core/src/parquet/objectstore/mod.rs @@ -15,5 +15,5 @@ // specific language governing permissions and limitations // under the License. +pub mod jni; pub mod s3; -pub mod jni; \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala index b0f4a56460..555e38c25e 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.functions.{col, sum} import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider -import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3Client import software.amazon.awssdk.services.s3.model.CreateBucketRequest import software.amazon.awssdk.services.s3.model.HeadBucketRequest @@ -114,7 +113,6 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper private def runParquetScanAndAssert(): Unit = { val testFilePath = s"s3a://$testBucketName/data/test-file.parquet" - writeTestParquetFile(testFilePath) val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) From 81cb9bd5b17f7e7971999a625223c96b817e62c6 Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Sun, 6 Jul 2025 23:55:37 -0700 Subject: [PATCH 3/7] fix format --- native/core/src/execution/jni_api.rs | 2 +- native/core/src/parquet/objectstore/jni.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index e5efd4b9cf..a1794d537a 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -72,12 +72,12 @@ use crate::execution::spark_plan::SparkPlan; use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace}; +use crate::parquet::objectstore::jni::init_jvm; use datafusion_comet_proto::spark_operator::operator::OpStruct; use log::info; use once_cell::sync::Lazy; #[cfg(feature = "jemalloc")] use tikv_jemalloc_ctl::{epoch, stats}; -use crate::parquet::objectstore::jni::init_jvm; static TOKIO_RUNTIME: Lazy = Lazy::new(|| { let mut builder = tokio::runtime::Builder::new_multi_thread(); diff --git a/native/core/src/parquet/objectstore/jni.rs b/native/core/src/parquet/objectstore/jni.rs index 7e88c47d92..38b6371711 100644 --- a/native/core/src/parquet/objectstore/jni.rs +++ b/native/core/src/parquet/objectstore/jni.rs @@ -23,7 +23,7 @@ use chrono::Utc; use futures::{stream, stream::BoxStream, StreamExt}; use jni::{ objects::{JClass, JObject, JValue}, - JNIEnv, JavaVM + JNIEnv, JavaVM, }; use object_store::{ path::Path, Attributes, Error as ObjectStoreError, GetOptions, GetRange, GetResult, @@ -252,7 +252,7 @@ impl ObjectStore for JniObjectStore { path: path_str.clone(), source: format!( "Invalid range {}-{} for file of length {}", - range.start, range.end, total_len + range.start, range.end, total_len ) .into(), }); From 63e815f5c01cdced3d13db32ddb86a0cace75240 Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Mon, 7 Jul 2025 00:10:37 -0700 Subject: [PATCH 4/7] fix format --- native/core/src/parquet/objectstore/jni.rs | 4 +--- native/core/src/parquet/parquet_support.rs | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/native/core/src/parquet/objectstore/jni.rs b/native/core/src/parquet/objectstore/jni.rs index 38b6371711..fbe09054eb 100644 --- a/native/core/src/parquet/objectstore/jni.rs +++ b/native/core/src/parquet/objectstore/jni.rs @@ -94,8 +94,6 @@ mod jni_helpers { /// # Arguments /// * `path` - The filesystem path or URI of the target file /// * `configs` - Configuration parameters for the storage backend as key-value pairs. -/// Common configurations include authentication credentials, region settings, -/// and timeout values. /// /// # Returns /// Returns `Ok(usize)` with the file size in bytes on success, or an `ObjectStoreError` @@ -125,7 +123,7 @@ pub fn call_get_length( path: path.to_string(), source: Box::new(Arc::new(std::io::Error::new( std::io::ErrorKind::NotFound, - format!("File not found or error reading: {}", path), + format!("File not found or error reading: {path}"), ))), }) } else { diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 929719a9f1..6ab4366da3 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -44,7 +44,6 @@ use url::Url; use super::objectstore; use super::objectstore::jni::JniObjectStore; -use datafusion::execution::context::SessionContext; // This file originates from cast.rs. While developing native scan support and implementing // SparkSchemaAdapter we observed that Spark's type conversion logic on Parquet reads does not From 8e2fcc0585a051a4fac4daf5329965b7d3c25128 Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Mon, 7 Jul 2025 23:40:57 -0700 Subject: [PATCH 5/7] address comments --- .../apache/comet/parquet/JniHDFSBridge.java | 131 ++++++++++++++++++ .../java/org/apache/comet/parquet/Native.java | 107 -------------- native/core/Cargo.toml | 2 +- native/core/src/execution/jni_api.rs | 2 +- .../objectstore/{jni.rs => jni_hdfs.rs} | 14 +- native/core/src/parquet/objectstore/mod.rs | 2 +- native/core/src/parquet/parquet_support.rs | 14 +- 7 files changed, 152 insertions(+), 120 deletions(-) create mode 100644 common/src/main/java/org/apache/comet/parquet/JniHDFSBridge.java rename native/core/src/parquet/objectstore/{jni.rs => jni_hdfs.rs} (96%) diff --git a/common/src/main/java/org/apache/comet/parquet/JniHDFSBridge.java b/common/src/main/java/org/apache/comet/parquet/JniHDFSBridge.java new file mode 100644 index 0000000000..a69d121fb8 --- /dev/null +++ b/common/src/main/java/org/apache/comet/parquet/JniHDFSBridge.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public final class JniHDFSBridge { + /** + * Reads a byte range from a file using Hadoop FileSystem API. + * + * @param path The file path to read from + * @param configs Configuration properties for the filesystem + * @param offset Starting byte position (0-based) + * @param len Number of bytes to read + * @return Byte array containing the read data, or null if error occurs + * @throws IllegalArgumentException If parameters are invalid + */ + public static byte[] read(String path, Map configs, long offset, int len) { + if (path == null || path.isEmpty()) { + throw new IllegalArgumentException("Path cannot be null or empty"); + } + if (offset < 0) { + throw new IllegalArgumentException("Offset cannot be negative"); + } + if (len < 0) { + throw new IllegalArgumentException("Length cannot be negative"); + } + + try { + Path p = new Path(path); + Configuration conf = new Configuration(); + + // Set configurations if provided + if (configs != null) { + for (Map.Entry entry : configs.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + } + FileSystem fs = p.getFileSystem(conf); + + long fileLen = fs.getFileStatus(p).getLen(); + + if (offset > fileLen) { + throw new IOException( + "Offset beyond file length: offset=" + offset + ", fileLen=" + fileLen); + } + + if (len == 0) { + return new byte[0]; + } + + // Adjust length if it exceeds remaining bytes + if (offset + len > fileLen) { + len = (int) (fileLen - offset); + if (len <= 0) { + return new byte[0]; + } + } + + FSDataInputStream inputStream = fs.open(p); + inputStream.seek(offset); + byte[] buffer = new byte[len]; + int totalBytesRead = 0; + while (totalBytesRead < len) { + int read = inputStream.read(buffer, totalBytesRead, len - totalBytesRead); + if (read == -1) break; + totalBytesRead += read; + } + inputStream.close(); + + return totalBytesRead < len ? Arrays.copyOf(buffer, totalBytesRead) : buffer; + } catch (Exception e) { + System.err.println("Native.read failed: " + e); + return null; + } + } + + /** + * Gets the length of a file using Hadoop FileSystem API. + * + * @param path The file path to check + * @param configs Configuration properties for the filesystem + * @return File length in bytes, or -1 if the file doesn't exist + * @throws IllegalArgumentException If path is invalid or configs contain invalid values + */ + public static long getLength(String path, Map configs) { + if (path == null || path.isEmpty()) { + throw new IllegalArgumentException("Path cannot be null or empty"); + } + + try { + Path p = new Path(path); + Configuration conf = new Configuration(); + if (configs != null) { + for (Map.Entry entry : configs.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + } + + FileSystem fs = p.getFileSystem(conf); + return fs.getFileStatus(p).getLen(); + } catch (Exception e) { + System.err.println("Native.getLength failed: " + e); + return -1; + } + } +} diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java b/common/src/main/java/org/apache/comet/parquet/Native.java index 6526d8ee7d..c9ad447fd2 100644 --- a/common/src/main/java/org/apache/comet/parquet/Native.java +++ b/common/src/main/java/org/apache/comet/parquet/Native.java @@ -19,16 +19,9 @@ package org.apache.comet.parquet; -import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - import org.apache.comet.NativeBase; public final class Native extends NativeBase { @@ -299,104 +292,4 @@ public static native void currentColumnBatch( * @param handle */ public static native void closeRecordBatchReader(long handle); - - /** - * Reads a byte range from a file using Hadoop FileSystem API. - * - * @param path The file path to read from - * @param configs Configuration properties for the filesystem - * @param offset Starting byte position (0-based) - * @param len Number of bytes to read - * @return Byte array containing the read data, or null if error occurs - * @throws IllegalArgumentException If parameters are invalid - */ - public static byte[] read(String path, Map configs, long offset, int len) { - if (path == null || path.isEmpty()) { - throw new IllegalArgumentException("Path cannot be null or empty"); - } - if (offset < 0) { - throw new IllegalArgumentException("Offset cannot be negative"); - } - if (len < 0) { - throw new IllegalArgumentException("Length cannot be negative"); - } - - try { - Path p = new Path(path); - Configuration conf = new Configuration(); - - // Set configurations if provided - if (configs != null) { - for (Map.Entry entry : configs.entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } - } - org.apache.hadoop.fs.FileSystem fs = p.getFileSystem(conf); - - long fileLen = fs.getFileStatus(p).getLen(); - - if (offset > fileLen) { - throw new IOException( - "Offset beyond file length: offset=" + offset + ", fileLen=" + fileLen); - } - - if (len == 0) { - return new byte[0]; - } - - // Adjust length if it exceeds remaining bytes - if (offset + len > fileLen) { - len = (int) (fileLen - offset); - if (len <= 0) { - return new byte[0]; - } - } - - FSDataInputStream inputStream = fs.open(p); - inputStream.seek(offset); - byte[] buffer = new byte[len]; - int totalBytesRead = 0; - while (totalBytesRead < len) { - int read = inputStream.read(buffer, totalBytesRead, len - totalBytesRead); - if (read == -1) break; - totalBytesRead += read; - } - inputStream.close(); - - return totalBytesRead < len ? Arrays.copyOf(buffer, totalBytesRead) : buffer; - } catch (Exception e) { - System.err.println("Native.read failed: " + e); - return null; - } - } - - /** - * Gets the length of a file using Hadoop FileSystem API. - * - * @param path The file path to check - * @param configs Configuration properties for the filesystem - * @return File length in bytes, or -1 if the file doesn't exist - * @throws IllegalArgumentException If path is invalid or configs contain invalid values - */ - public static long getLength(String path, Map configs) { - if (path == null || path.isEmpty()) { - throw new IllegalArgumentException("Path cannot be null or empty"); - } - - try { - Path p = new Path(path); - Configuration conf = new Configuration(); - if (configs != null) { - for (Map.Entry entry : configs.entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } - } - - FileSystem fs = p.getFileSystem(conf); - return fs.getFileStatus(p).getLen(); - } catch (Exception e) { - System.err.println("Native.getLength failed: " + e); - return -1; - } - } } diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 8c15905e92..68898b1ea5 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -36,6 +36,7 @@ publish = false [dependencies] arrow = { workspace = true } +chrono = { workspace = true } parquet = { workspace = true, default-features = false, features = ["experimental"] } futures = { workspace = true } mimalloc = { version = "*", default-features = false, optional = true } @@ -50,7 +51,6 @@ lazy_static = "1.4.0" prost = "0.13.5" jni = "0.21" snap = "1.1" -chrono = { version = "0.4", default-features = false, features = ["clock"] } # we disable default features in lz4_flex to force the use of the faster unsafe encoding and decoding implementation lz4_flex = { version = "0.11.3", default-features = false } zstd = "0.13.3" diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a1794d537a..a6e563841b 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -72,7 +72,7 @@ use crate::execution::spark_plan::SparkPlan; use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace}; -use crate::parquet::objectstore::jni::init_jvm; +use crate::parquet::objectstore::jni_hdfs::init_jvm; use datafusion_comet_proto::spark_operator::operator::OpStruct; use log::info; use once_cell::sync::Lazy; diff --git a/native/core/src/parquet/objectstore/jni.rs b/native/core/src/parquet/objectstore/jni_hdfs.rs similarity index 96% rename from native/core/src/parquet/objectstore/jni.rs rename to native/core/src/parquet/objectstore/jni_hdfs.rs index fbe09054eb..c75995a5a9 100644 --- a/native/core/src/parquet/objectstore/jni.rs +++ b/native/core/src/parquet/objectstore/jni_hdfs.rs @@ -81,7 +81,7 @@ mod jni_helpers { pub fn get_native_class<'local>( env: &mut JNIEnv<'local>, ) -> Result, ObjectStoreError> { - env.find_class("org/apache/comet/parquet/Native") + env.find_class("org/apache/comet/parquet/JniHDFSBridge") .map_err(jni_error) } } @@ -98,7 +98,7 @@ mod jni_helpers { /// # Returns /// Returns `Ok(usize)` with the file size in bytes on success, or an `ObjectStoreError` /// if the operation fails. -pub fn call_get_length( +pub fn get_length( path: &str, configs: &HashMap, ) -> Result { @@ -142,7 +142,7 @@ pub fn call_get_length( /// # Returns /// Returns `Ok(Vec)` containing the requested bytes on success, or an /// `ObjectStoreError` if the operation fails. -pub fn call_read( +pub fn read( path: &str, configs: &HashMap, offset: usize, @@ -233,7 +233,7 @@ impl ObjectStore for JniObjectStore { options: GetOptions, ) -> Result { let path_str = self.to_absolute_uri(location); - let total_len = call_get_length(&path_str, &self.configs)? as u64; + let total_len = get_length(&path_str, &self.configs)? as u64; let range = match options.range { Some(GetRange::Bounded(range)) => range, @@ -257,7 +257,7 @@ impl ObjectStore for JniObjectStore { } let range_len = (range.end - range.start) as usize; - let bytes = call_read(&path_str, &self.configs, range.start as usize, range_len)?; + let bytes = read(&path_str, &self.configs, range.start as usize, range_len)?; Ok(GetResult { payload: GetResultPayload::Stream(Box::pin(stream::once(async move { @@ -300,7 +300,7 @@ impl ObjectStore for JniObjectStore { &self, _prefix: Option<&Path>, ) -> BoxStream<'static, Result> { - futures::stream::empty().boxed() + todo!() } async fn list_with_delimiter( @@ -320,7 +320,7 @@ impl ObjectStore for JniObjectStore { async fn head(&self, location: &Path) -> Result { let path = location.to_string(); - let len = call_get_length(&path, &self.configs)? as usize; + let len = get_length(&path, &self.configs)? as usize; Ok(ObjectMeta { location: location.clone(), last_modified: Utc::now(), diff --git a/native/core/src/parquet/objectstore/mod.rs b/native/core/src/parquet/objectstore/mod.rs index ef23453995..b7d9506763 100644 --- a/native/core/src/parquet/objectstore/mod.rs +++ b/native/core/src/parquet/objectstore/mod.rs @@ -15,5 +15,5 @@ // specific language governing permissions and limitations // under the License. -pub mod jni; +pub mod jni_hdfs; pub mod s3; diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 6ab4366da3..217fa6c622 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -37,13 +37,13 @@ use datafusion::physical_plan::ColumnarValue; use datafusion_comet_spark_expr::EvalMode; use object_store::path::Path; use object_store::{parse_url, ObjectStore}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use std::{fmt::Debug, hash::Hash, sync::Arc}; use url::Url; use super::objectstore; -use super::objectstore::jni::JniObjectStore; +use super::objectstore::jni_hdfs::JniObjectStore; // This file originates from cast.rs. While developing native scan support and implementing // SparkSchemaAdapter we observed that Spark's type conversion logic on Parquet reads does not @@ -390,9 +390,17 @@ pub(crate) fn prepare_object_store_with_configs( &url[url::Position::BeforeHost..url::Position::AfterPort], ); + let mut hadoop_schemes = HashSet::new(); + for (key, _) in object_store_configs.iter() { + if let Some(scheme) = key.strip_prefix("spark.hadoop.fs.") + .and_then(|k| k.strip_suffix(".impl")) { + hadoop_schemes.insert(scheme.to_string()); + } + } + let (object_store, object_store_path): (Box, Path) = if scheme == "hdfs" { parse_hdfs_url(&url) - } else if scheme == "s3" && use_jni { + } else if use_jni && hadoop_schemes.contains(scheme) { let base_uri = format!( "{}://{}", original_scheme, From 87b936e5d78f93587980e5738f6b1767bc7148e3 Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Mon, 7 Jul 2025 23:48:39 -0700 Subject: [PATCH 6/7] fix style --- native/core/src/parquet/objectstore/jni_hdfs.rs | 2 +- native/core/src/parquet/parquet_support.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/native/core/src/parquet/objectstore/jni_hdfs.rs b/native/core/src/parquet/objectstore/jni_hdfs.rs index c75995a5a9..57f2d8b583 100644 --- a/native/core/src/parquet/objectstore/jni_hdfs.rs +++ b/native/core/src/parquet/objectstore/jni_hdfs.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use bytes::Bytes; use chrono::Utc; -use futures::{stream, stream::BoxStream, StreamExt}; +use futures::{stream, stream::BoxStream}; use jni::{ objects::{JClass, JObject, JValue}, JNIEnv, JavaVM, diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 217fa6c622..28c7ed694d 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -392,8 +392,10 @@ pub(crate) fn prepare_object_store_with_configs( let mut hadoop_schemes = HashSet::new(); for (key, _) in object_store_configs.iter() { - if let Some(scheme) = key.strip_prefix("spark.hadoop.fs.") - .and_then(|k| k.strip_suffix(".impl")) { + if let Some(scheme) = key + .strip_prefix("spark.hadoop.fs.") + .and_then(|k| k.strip_suffix(".impl")) + { hadoop_schemes.insert(scheme.to_string()); } } From 97e6aeead46520447b8916062aeece900d050384 Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Mon, 7 Jul 2025 23:55:16 -0700 Subject: [PATCH 7/7] fix style --- native/core/src/parquet/parquet_support.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 28c7ed694d..4690ba8227 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -395,7 +395,7 @@ pub(crate) fn prepare_object_store_with_configs( if let Some(scheme) = key .strip_prefix("spark.hadoop.fs.") .and_then(|k| k.strip_suffix(".impl")) - { + { hadoop_schemes.insert(scheme.to_string()); } }