diff --git a/Makefile b/Makefile index 80f334a34c..9382d1aeee 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,9 @@ define spark_jvm_17_extra_args $(shell ./mvnw help:evaluate -Dexpression=extraJavaTestArgs | grep -v '\[') endef +# Build optional Comet native features (like hdfs e.g) +FEATURES_ARG := $(shell ! [ -z $(COMET_FEATURES) ] && echo '--features=$(COMET_FEATURES)') + all: core jvm core: @@ -95,7 +98,7 @@ release-linux: clean cd native && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release ./mvnw install -Prelease -DskipTests $(PROFILES) release: - cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release + cd native && RUSTFLAGS="$(RUSTFLAGS) -Ctarget-cpu=native" cargo build --release $(FEATURES_ARG) ./mvnw install -Prelease -DskipTests $(PROFILES) release-nogit: cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release diff --git a/docs/source/user-guide/datasources.md b/docs/source/user-guide/datasources.md index 9607ba6038..27c5492d89 100644 --- a/docs/source/user-guide/datasources.md +++ b/docs/source/user-guide/datasources.md @@ -35,3 +35,81 @@ converted into Arrow format, allowing native execution to happen after that. Comet does not provide native JSON scan, but when `spark.comet.convert.json.enabled` is enabled, data is immediately converted into Arrow format, allowing native execution to happen after that. + +# Supported Storages + +## Local +In progress + +## HDFS + +Apache DataFusion Comet native reader seamlessly scans files from remote HDFS for [supported formats](#supported-spark-data-sources) + +### Using experimental native DataFusion reader +Unlike to native Comet reader the Datafusion reader fully supports nested types processing. This reader is currently experimental only + +To build Comet with native DataFusion reader and remote HDFS support it is required to have a JDK installed + +Example: +Build a Comet for `spark-3.4` provide a JDK path in `JAVA_HOME` +Provide the JRE linker path in `RUSTFLAGS`, the path can vary depending on the system. Typically JRE linker is a part of installed JDK + +```shell +export JAVA_HOME="/opt/homebrew/opt/openjdk@11" +make release PROFILES="-Pspark-3.4" COMET_FEATURES=hdfs RUSTFLAGS="-L $JAVA_HOME/libexec/openjdk.jdk/Contents/Home/lib/server" +``` + +Start Comet with experimental reader and HDFS support as [described](installation.md/#run-spark-shell-with-comet-enabled) +and add additional parameters + +```shell +--conf spark.comet.scan.impl=native_datafusion \ +--conf spark.hadoop.fs.defaultFS="hdfs://namenode:9000" \ +--conf spark.hadoop.dfs.client.use.datanode.hostname = true \ +--conf dfs.client.use.datanode.hostname = true +``` + +Query a struct type from Remote HDFS +```shell +spark.read.parquet("hdfs://namenode:9000/user/data").show(false) + +root + |-- id: integer (nullable = true) + |-- first_name: string (nullable = true) + |-- personal_info: struct (nullable = true) + | |-- firstName: string (nullable = true) + | |-- lastName: string (nullable = true) + | |-- ageInYears: integer (nullable = true) + +25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.6.0 initialized +== Physical Plan == +* CometColumnarToRow (2) ++- CometNativeScan: (1) + + +(1) CometNativeScan: +Output [3]: [id#0, first_name#1, personal_info#4] +Arguments: [id#0, first_name#1, personal_info#4] + +(2) CometColumnarToRow [codegen id : 1] +Input [3]: [id#0, first_name#1, personal_info#4] + + +25/01/30 16:50:44 INFO fs-hdfs-0.1.12/src/hdfs.rs: Connecting to Namenode (hdfs://namenode:9000) ++---+----------+-----------------+ +|id |first_name|personal_info | ++---+----------+-----------------+ +|2 |Jane |{Jane, Smith, 34}| +|1 |John |{John, Doe, 28} | ++---+----------+-----------------+ + + + +``` + +Verify the native scan type should be `CometNativeScan`. + +More on [HDFS Reader](../../../native/hdfs/README.md) + +## S3 +In progress diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 878b2b7cf3..f42a9ed192 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -74,7 +74,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio use crate::execution::shuffle::CompressionCodec; use crate::execution::spark_plan::SparkPlan; -use crate::parquet::parquet_support::SparkParquetOptions; +use crate::parquet::parquet_support::{register_object_store, SparkParquetOptions}; use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; @@ -106,7 +106,6 @@ use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter}, JoinType as DFJoinType, ScalarValue, }; -use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::type_coercion::other::get_coerce_type_for_case_expression; use datafusion_expr::{ AggregateUDF, ReturnTypeArgs, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, @@ -1165,12 +1164,9 @@ impl PhysicalPlanner { )) }); - let object_store = object_store::local::LocalFileSystem::new(); - // register the object store with the runtime environment - let url = Url::try_from("file://").unwrap(); - self.session_ctx - .runtime_env() - .register_object_store(&url, Arc::new(object_store)); + // By default, local FS object store registered + // if `hdfs` feature enabled then HDFS file object store registered + let object_store_url = register_object_store(Arc::clone(&self.session_ctx))?; // Generate file groups let mut file_groups: Vec> = @@ -1229,8 +1225,6 @@ impl PhysicalPlanner { // TODO: I think we can remove partition_count in the future, but leave for testing. assert_eq!(file_groups.len(), partition_count); - - let object_store_url = ObjectStoreUrl::local_filesystem(); let partition_fields: Vec = partition_schema .fields() .iter() diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 0fa671a307..3932652045 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::execution::operators::ExecutionError; use arrow::{ array::{cast::AsArray, types::Int32Type, Array, ArrayRef}, compute::{cast_with_options, take, CastOptions}, @@ -22,9 +23,11 @@ use arrow::{ }; use arrow_array::{DictionaryArray, StructArray}; use arrow_schema::DataType; +use datafusion::prelude::SessionContext; use datafusion_comet_spark_expr::utils::array_with_timezone; use datafusion_comet_spark_expr::EvalMode; use datafusion_common::{Result as DataFusionResult, ScalarValue}; +use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::ColumnarValue; use std::collections::HashMap; use std::{fmt::Debug, hash::Hash, sync::Arc}; @@ -195,3 +198,39 @@ fn cast_struct_to_struct( _ => unreachable!(), } } + +// Default object store which is local filesystem +#[cfg(not(feature = "hdfs"))] +pub(crate) fn register_object_store( + session_context: Arc, +) -> Result { + let object_store = object_store::local::LocalFileSystem::new(); + let url = ObjectStoreUrl::parse("file://")?; + session_context + .runtime_env() + .register_object_store(url.as_ref(), Arc::new(object_store)); + Ok(url) +} + +// HDFS object store +#[cfg(feature = "hdfs")] +pub(crate) fn register_object_store( + session_context: Arc, +) -> Result { + // TODO: read the namenode configuration from file schema or from spark.defaultFS + let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?; + if let Some(object_store) = + datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref()) + { + session_context + .runtime_env() + .register_object_store(url.as_ref(), Arc::new(object_store)); + + return Ok(url); + } + + Err(ExecutionError::GeneralError(format!( + "HDFS object store cannot be created for {}", + url + ))) +}