diff --git a/common/src/main/java/org/apache/comet/parquet/JniHDFSBridge.java b/common/src/main/java/org/apache/comet/parquet/JniHDFSBridge.java new file mode 100644 index 0000000000..a69d121fb8 --- /dev/null +++ b/common/src/main/java/org/apache/comet/parquet/JniHDFSBridge.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public final class JniHDFSBridge { + /** + * Reads a byte range from a file using Hadoop FileSystem API. + * + * @param path The file path to read from + * @param configs Configuration properties for the filesystem + * @param offset Starting byte position (0-based) + * @param len Number of bytes to read + * @return Byte array containing the read data, or null if error occurs + * @throws IllegalArgumentException If parameters are invalid + */ + public static byte[] read(String path, Map configs, long offset, int len) { + if (path == null || path.isEmpty()) { + throw new IllegalArgumentException("Path cannot be null or empty"); + } + if (offset < 0) { + throw new IllegalArgumentException("Offset cannot be negative"); + } + if (len < 0) { + throw new IllegalArgumentException("Length cannot be negative"); + } + + try { + Path p = new Path(path); + Configuration conf = new Configuration(); + + // Set configurations if provided + if (configs != null) { + for (Map.Entry entry : configs.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + } + FileSystem fs = p.getFileSystem(conf); + + long fileLen = fs.getFileStatus(p).getLen(); + + if (offset > fileLen) { + throw new IOException( + "Offset beyond file length: offset=" + offset + ", fileLen=" + fileLen); + } + + if (len == 0) { + return new byte[0]; + } + + // Adjust length if it exceeds remaining bytes + if (offset + len > fileLen) { + len = (int) (fileLen - offset); + if (len <= 0) { + return new byte[0]; + } + } + + FSDataInputStream inputStream = fs.open(p); + inputStream.seek(offset); + byte[] buffer = new byte[len]; + int totalBytesRead = 0; + while (totalBytesRead < len) { + int read = inputStream.read(buffer, totalBytesRead, len - totalBytesRead); + if (read == -1) break; + totalBytesRead += read; + } + inputStream.close(); + + return totalBytesRead < len ? Arrays.copyOf(buffer, totalBytesRead) : buffer; + } catch (Exception e) { + System.err.println("Native.read failed: " + e); + return null; + } + } + + /** + * Gets the length of a file using Hadoop FileSystem API. + * + * @param path The file path to check + * @param configs Configuration properties for the filesystem + * @return File length in bytes, or -1 if the file doesn't exist + * @throws IllegalArgumentException If path is invalid or configs contain invalid values + */ + public static long getLength(String path, Map configs) { + if (path == null || path.isEmpty()) { + throw new IllegalArgumentException("Path cannot be null or empty"); + } + + try { + Path p = new Path(path); + Configuration conf = new Configuration(); + if (configs != null) { + for (Map.Entry entry : configs.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + } + + FileSystem fs = p.getFileSystem(conf); + return fs.getFileStatus(p).getLen(); + } catch (Exception e) { + System.err.println("Native.getLength failed: " + e); + return -1; + } + } +} diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 4575a2fb78..b73b23d169 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -65,6 +65,15 @@ object CometConf extends ShimCometConf { val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec"; + val COMET_USE_JNI_OBJECT_STORE: ConfigEntry[Boolean] = + conf("spark.comet.use_jni_object_store") + .doc( + "If enabled, Comet will access Hadoop-compatible file systems using the Hadoop FileSystem" + + " API via JNI, bypassing the native Rust object store implementations.") + .internal() + .booleanConf + .createWithDefault(false) + val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled") .doc( "Whether to enable Comet extension for Spark. When this is turned on, Spark will use " + diff --git a/native/Cargo.lock b/native/Cargo.lock index b6e4c0e0f7..af8eee2027 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1370,6 +1370,7 @@ dependencies = [ "aws-config", "aws-credential-types", "bytes", + "chrono", "crc32fast", "criterion", "datafusion", diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index e248068b35..68898b1ea5 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -36,6 +36,7 @@ publish = false [dependencies] arrow = { workspace = true } +chrono = { workspace = true } parquet = { workspace = true, default-features = false, features = ["experimental"] } futures = { workspace = true } mimalloc = { version = "*", default-features = false, optional = true } diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a7ddce34fd..a6e563841b 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -72,6 +72,7 @@ use crate::execution::spark_plan::SparkPlan; use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace}; +use crate::parquet::objectstore::jni_hdfs::init_jvm; use datafusion_comet_proto::spark_operator::operator::OpStruct; use log::info; use once_cell::sync::Lazy; @@ -166,6 +167,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( ) -> jlong { try_unwrap_or_throw(&e, |mut env| { with_trace("createPlan", tracing_enabled != JNI_FALSE, || { + init_jvm(&env); + // Init JVM classes JVMClasses::init(&mut env); diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 2f23a8cfd8..d13603af7d 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -26,7 +26,7 @@ pub mod parquet_support; pub mod read; pub mod schema_adapter; -mod objectstore; +pub mod objectstore; use std::collections::HashMap; use std::task::Poll; diff --git a/native/core/src/parquet/objectstore/jni_hdfs.rs b/native/core/src/parquet/objectstore/jni_hdfs.rs new file mode 100644 index 0000000000..57f2d8b583 --- /dev/null +++ b/native/core/src/parquet/objectstore/jni_hdfs.rs @@ -0,0 +1,332 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use bytes::Bytes; +use chrono::Utc; +use futures::{stream, stream::BoxStream}; +use jni::{ + objects::{JClass, JObject, JValue}, + JNIEnv, JavaVM, +}; +use object_store::{ + path::Path, Attributes, Error as ObjectStoreError, GetOptions, GetRange, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, + PutOptions, PutPayload, PutResult, +}; +use once_cell::sync::OnceCell; + +static JVM: OnceCell = OnceCell::new(); + +pub fn init_jvm(env: &JNIEnv) { + let _ = JVM.set(env.get_java_vm().expect("Failed to get JavaVM")); +} + +fn get_jni_env<'a>() -> jni::AttachGuard<'a> { + JVM.get() + .expect("JVM not initialized") + .attach_current_thread() + .expect("Failed to attach thread") +} + +mod jni_helpers { + use super::*; + + pub fn create_jni_hashmap<'local>( + env: &mut JNIEnv<'local>, + configs: &HashMap, + ) -> Result, ObjectStoreError> { + let map_class = env.find_class("java/util/HashMap").map_err(jni_error)?; + let jmap = env.new_object(map_class, "()V", &[]).map_err(jni_error)?; + + for (k, v) in configs { + let jkey = env.new_string(k).map_err(jni_error)?; + let jval = env.new_string(v).map_err(jni_error)?; + + env.call_method( + &jmap, + "put", + "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;", + &[JValue::Object(&jkey), JValue::Object(&jval)], + ) + .map_err(jni_error)?; + } + + Ok(jmap) + } + + pub fn jni_error(e: jni::errors::Error) -> ObjectStoreError { + ObjectStoreError::Generic { + store: "jni", + source: Box::new(e), + } + } + + pub fn get_native_class<'local>( + env: &mut JNIEnv<'local>, + ) -> Result, ObjectStoreError> { + env.find_class("org/apache/comet/parquet/JniHDFSBridge") + .map_err(jni_error) + } +} + +/// Retrieves the length (in bytes) of a file through JNI interface. +/// +/// This method makes a JNI call to Java to get the size of the file at the specified path, +/// using the provided configuration parameters for the storage backend. +/// +/// # Arguments +/// * `path` - The filesystem path or URI of the target file +/// * `configs` - Configuration parameters for the storage backend as key-value pairs. +/// +/// # Returns +/// Returns `Ok(usize)` with the file size in bytes on success, or an `ObjectStoreError` +/// if the operation fails. +pub fn get_length( + path: &str, + configs: &HashMap, +) -> Result { + let mut env = get_jni_env(); + let jmap = jni_helpers::create_jni_hashmap(&mut env, configs)?; + let class = jni_helpers::get_native_class(&mut env)?; + let jpath = env.new_string(path).map_err(jni_helpers::jni_error)?; + + let result = env + .call_static_method( + class, + "getLength", + "(Ljava/lang/String;Ljava/util/Map;)J", + &[JValue::Object(&jpath), JValue::Object(&jmap)], + ) + .map_err(jni_helpers::jni_error)? + .j() + .unwrap_or(-1); + + if result < 0 { + Err(ObjectStoreError::NotFound { + path: path.to_string(), + source: Box::new(Arc::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("File not found or error reading: {path}"), + ))), + }) + } else { + Ok(result as usize) + } +} + +/// Reads a range of bytes from a file through JNI interface. +/// +/// # Arguments +/// * `raw_path` - The filesystem path or URI of the file to read +/// * `configs` - Configuration parameters for the read operation as key-value pairs +/// * `offset` - The starting byte position to read from (0-based) +/// * `len` - The number of bytes to read +/// +/// # Returns +/// Returns `Ok(Vec)` containing the requested bytes on success, or an +/// `ObjectStoreError` if the operation fails. +pub fn read( + path: &str, + configs: &HashMap, + offset: usize, + len: usize, +) -> Result, ObjectStoreError> { + let mut env = get_jni_env(); + let jmap = jni_helpers::create_jni_hashmap(&mut env, configs)?; + let class = jni_helpers::get_native_class(&mut env)?; + + let jpath = env.new_string(path).map_err(jni_helpers::jni_error)?; + + let result = env + .call_static_method( + class, + "read", + "(Ljava/lang/String;Ljava/util/Map;JI)[B", + &[ + JValue::Object(&jpath), + JValue::Object(&jmap), + JValue::Long(offset as i64), + JValue::Int(len as i32), + ], + ) + .map_err(jni_helpers::jni_error)?; + + let byte_array = jni::objects::JByteArray::from(result.l().map_err(jni_helpers::jni_error)?); + + if byte_array.is_null() { + return Err(ObjectStoreError::Generic { + store: "jni", + source: "Received null byte array from Java".into(), + }); + } + + let output = env + .convert_byte_array(byte_array) + .map_err(jni_helpers::jni_error)?; + Ok(output) +} + +/// A JNI-backed implementation of [`ObjectStore`] for interacting with storage systems +/// through Java Native Interface. +#[derive(Debug, Clone)] +pub struct JniObjectStore { + base_uri: String, + configs: HashMap, +} + +// Mark as thread-safe +unsafe impl Send for JniObjectStore {} +unsafe impl Sync for JniObjectStore {} + +impl JniObjectStore { + /// Creates a new JniObjectStore with the given base URI and configurations. + pub fn new(base_uri: String, configs: HashMap) -> Self { + Self { + base_uri: base_uri.trim_end_matches('/').to_string(), + configs, + } + } + + /// Converts a relative path to an absolute URI using the store's base URI. + fn to_absolute_uri(&self, location: &Path) -> String { + let path_str = location.to_string(); + + // If already absolute-looking (s3a://, hdfs://, etc.), return as is + if path_str.contains("://") { + return path_str; + } + + // Handle absolute-looking paths (start with /) + let clean_path = path_str.trim_start_matches('/'); + format!("{}/{}", self.base_uri, clean_path) + } +} + +impl std::fmt::Display for JniObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "JniObjectStore") + } +} + +#[async_trait::async_trait] +impl ObjectStore for JniObjectStore { + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> Result { + let path_str = self.to_absolute_uri(location); + let total_len = get_length(&path_str, &self.configs)? as u64; + + let range = match options.range { + Some(GetRange::Bounded(range)) => range, + Some(GetRange::Offset(offset)) => offset..total_len, + Some(GetRange::Suffix(length)) => { + let start = total_len.saturating_sub(length); + start..total_len + } + None => 0..total_len, + }; + + if range.end > total_len { + return Err(ObjectStoreError::NotFound { + path: path_str.clone(), + source: format!( + "Invalid range {}-{} for file of length {}", + range.start, range.end, total_len + ) + .into(), + }); + } + + let range_len = (range.end - range.start) as usize; + let bytes = read(&path_str, &self.configs, range.start as usize, range_len)?; + + Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(stream::once(async move { + Ok(Bytes::from(bytes)) + }))), + meta: ObjectMeta { + location: location.clone(), + last_modified: Utc::now(), + size: total_len, + version: None, + e_tag: None, + }, + range, + attributes: Attributes::default(), + }) + } + + async fn put_opts( + &self, + _location: &Path, + _bytes: PutPayload, + _opts: PutOptions, + ) -> Result { + todo!() + } + + async fn put_multipart_opts( + &self, + _location: &Path, + _opts: PutMultipartOpts, + ) -> Result, ObjectStoreError> { + todo!() + } + + async fn delete(&self, _location: &Path) -> Result<(), ObjectStoreError> { + todo!() + } + + fn list( + &self, + _prefix: Option<&Path>, + ) -> BoxStream<'static, Result> { + todo!() + } + + async fn list_with_delimiter( + &self, + _prefix: Option<&Path>, + ) -> Result { + todo!() + } + + async fn copy(&self, _from: &Path, _to: &Path) -> Result<(), ObjectStoreError> { + todo!() + } + + async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<(), ObjectStoreError> { + todo!() + } + + async fn head(&self, location: &Path) -> Result { + let path = location.to_string(); + let len = get_length(&path, &self.configs)? as usize; + Ok(ObjectMeta { + location: location.clone(), + last_modified: Utc::now(), + size: len as u64, + version: None, + e_tag: None, + }) + } +} diff --git a/native/core/src/parquet/objectstore/mod.rs b/native/core/src/parquet/objectstore/mod.rs index bedae08f69..b7d9506763 100644 --- a/native/core/src/parquet/objectstore/mod.rs +++ b/native/core/src/parquet/objectstore/mod.rs @@ -15,4 +15,5 @@ // specific language governing permissions and limitations // under the License. +pub mod jni_hdfs; pub mod s3; diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 5c425c6688..4690ba8227 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -37,12 +37,13 @@ use datafusion::physical_plan::ColumnarValue; use datafusion_comet_spark_expr::EvalMode; use object_store::path::Path; use object_store::{parse_url, ObjectStore}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use std::{fmt::Debug, hash::Hash, sync::Arc}; use url::Url; use super::objectstore; +use super::objectstore::jni_hdfs::JniObjectStore; // This file originates from cast.rs. While developing native scan support and implementing // SparkSchemaAdapter we observed that Spark's type conversion logic on Parquet reads does not @@ -367,8 +368,15 @@ pub(crate) fn prepare_object_store_with_configs( url: String, object_store_configs: &HashMap, ) -> Result<(ObjectStoreUrl, Path), ExecutionError> { + let use_jni = object_store_configs + .get("use_jni") + .map(|v| v == "true") + .unwrap_or(false); + let mut url = Url::parse(url.as_str()) .map_err(|e| ExecutionError::GeneralError(format!("Error parsing URL {url}: {e}")))?; + let original_scheme = url.scheme().to_string(); + let mut scheme = url.scheme(); if scheme == "s3a" { scheme = "s3"; @@ -382,8 +390,29 @@ pub(crate) fn prepare_object_store_with_configs( &url[url::Position::BeforeHost..url::Position::AfterPort], ); + let mut hadoop_schemes = HashSet::new(); + for (key, _) in object_store_configs.iter() { + if let Some(scheme) = key + .strip_prefix("spark.hadoop.fs.") + .and_then(|k| k.strip_suffix(".impl")) + { + hadoop_schemes.insert(scheme.to_string()); + } + } + let (object_store, object_store_path): (Box, Path) = if scheme == "hdfs" { parse_hdfs_url(&url) + } else if use_jni && hadoop_schemes.contains(scheme) { + let base_uri = format!( + "{}://{}", + original_scheme, + &url[url::Position::BeforeHost..url::Position::AfterPort] + ); + let store = JniObjectStore::new(base_uri, object_store_configs.clone()); + Ok(( + Box::new(store) as Box, + Path::from(url.path()), + )) } else if scheme == "s3" { objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300)) } else { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index aebdbdea94..608b305cd9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -50,6 +50,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometConf +import org.apache.comet.CometConf.COMET_USE_JNI_OBJECT_STORE import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo} import org.apache.comet.expressions._ import org.apache.comet.objectstore.NativeConfig @@ -2284,8 +2285,13 @@ object QueryPlanSerde extends Logging with CometExprShim { val hadoopConf = scan.relation.sparkSession.sessionState .newHadoopConfWithOptions(scan.relation.options) firstPartition.foreach { partitionFile => - val objectStoreOptions = + val baseOptions = NativeConfig.extractObjectStoreOptions(hadoopConf, partitionFile.pathUri) + + val useJni = + COMET_USE_JNI_OBJECT_STORE.get(scan.relation.sparkSession.sessionState.conf) + val objectStoreOptions = baseOptions.updated("use_jni", useJni.toString) + objectStoreOptions.foreach { case (key, value) => nativeScanBuilder.putObjectStoreOptions(key, value) } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala index f94e53ed94..555e38c25e 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala @@ -103,19 +103,26 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper } test("read parquet file from MinIO") { + runParquetScanAndAssert() + } + + test("Comet uses JNI object store when use_jni is true") { + spark.conf.set("spark.comet.use_jni_object_store", "true") + runParquetScanAndAssert() + } + private def runParquetScanAndAssert(): Unit = { val testFilePath = s"s3a://$testBucketName/data/test-file.parquet" writeTestParquetFile(testFilePath) val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) + val scans = collect(df.queryExecution.executedPlan) { - case p: CometScanExec => - p - case p: CometNativeScanExec => - p + case p: CometScanExec => p + case p: CometNativeScanExec => p } - assert(scans.size == 1) + assert(scans.size == 1) assert(df.first().getLong(0) == 499500) } }