From f099ff804dcd9d6a0f5530caa48d427a7747339d Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 30 Jan 2025 16:58:04 -0800 Subject: [PATCH 1/7] feat: add experimental remote HDFS support for native DataFusion reader --- Makefile | 5 +- docs/source/user-guide/datasources.md | 76 ++++++++++++++++++++++ native/core/src/execution/planner.rs | 13 ++-- native/core/src/parquet/parquet_support.rs | 37 +++++++++++ 4 files changed, 122 insertions(+), 9 deletions(-) diff --git a/Makefile b/Makefile index 80f334a34c..13c668d8a2 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,9 @@ define spark_jvm_17_extra_args $(shell ./mvnw help:evaluate -Dexpression=extraJavaTestArgs | grep -v '\[') endef +# Build optional Comet native features (like hdfs e.g) +FEATURES_ARG := $(shell ! [ -z $(COMET_FEATURES) ] && echo '--features=$(COMET_FEATURES)') + all: core jvm core: @@ -95,7 +98,7 @@ release-linux: clean cd native && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release ./mvnw install -Prelease -DskipTests $(PROFILES) release: - cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release + cd native && RUSTFLAGS="$(RUSTFLAGS) -Ctarget-cpu=native" && RUSTFLAGS=$$RUSTFLAGS cargo build --release $(FEATURES_ARG) ./mvnw install -Prelease -DskipTests $(PROFILES) release-nogit: cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release diff --git a/docs/source/user-guide/datasources.md b/docs/source/user-guide/datasources.md index 9607ba6038..a152f85c14 100644 --- a/docs/source/user-guide/datasources.md +++ b/docs/source/user-guide/datasources.md @@ -35,3 +35,79 @@ converted into Arrow format, allowing native execution to happen after that. Comet does not provide native JSON scan, but when `spark.comet.convert.json.enabled` is enabled, data is immediately converted into Arrow format, allowing native execution to happen after that. + +# Supported Storages + +## Local +In progress + +## HDFS + +Apache DataFusion Comet native reader seamlessly scans files from remote HDFS for [supported formats](#supported-spark-data-sources) + +### Using experimental native DataFusion reader +Unlike to native Comet reader the Datafusion reader fully supports nested types processing. This reader is currently experimental only + +To build Comet with native DataFusion reader and remote HDFS support it is required to have a JDK installed + +Example: +Build a Comet for `spark-3.4` provide a JDK path in `JAVA_HOME` +Provide the JRE linker path in `RUSTFLAGS`, the path can vary depending on the system. Typically JRE linker is a part of installed JDK + +```shell +export JAVA_HOME="/opt/homebrew/opt/openjdk@11" +make release PROFILES="-Pspark-3.4" COMET_FEATURES=hdfs RUSTFLAGS="-L $JAVA_HOME/libexec/openjdk.jdk/Contents/Home/lib/server" +``` + +Start Comet with experimental reader and HDFS support as [described](installation.md/#run-spark-shell-with-comet-enabled) +and add additional parameters + +```shell +--conf spark.comet.scan.impl=native_datafusion \ +--conf spark.hadoop.fs.defaultFS="hdfs://namenode:9000" \ +--conf spark.hadoop.dfs.client.use.datanode.hostname = true \ +--conf dfs.client.use.datanode.hostname = true +``` + +Query a struct type from Remote HDFS +```shell +spark.read.parquet("hdfs://namenode:9000/user/data").show(false) + +root + |-- id: integer (nullable = true) + |-- first_name: string (nullable = true) + |-- personal_info: struct (nullable = true) + | |-- firstName: string (nullable = true) + | |-- lastName: string (nullable = true) + | |-- ageInYears: integer (nullable = true) + +25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.6.0 initialized +== Physical Plan == +* CometColumnarToRow (2) ++- CometNativeScan: (1) + + +(1) CometNativeScan: +Output [3]: [id#0, first_name#1, personal_info#4] +Arguments: [id#0, first_name#1, personal_info#4] + +(2) CometColumnarToRow [codegen id : 1] +Input [3]: [id#0, first_name#1, personal_info#4] + + +25/01/30 16:50:44 INFO fs-hdfs-0.1.12/src/hdfs.rs: Connecting to Namenode (hdfs://namenode:9000) ++---+----------+-----------------+ +|id |first_name|personal_info | ++---+----------+-----------------+ +|2 |Jane |{Jane, Smith, 34}| +|1 |John |{John, Doe, 28} | ++---+----------+-----------------+ + + + +``` + +Verify the native scan type should be `CometNativeScan`. + +## S3 +In progress diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 878b2b7cf3..73f2a28b25 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -74,7 +74,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio use crate::execution::shuffle::CompressionCodec; use crate::execution::spark_plan::SparkPlan; -use crate::parquet::parquet_support::SparkParquetOptions; +use crate::parquet::parquet_support::{register_object_store, SparkParquetOptions}; use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; @@ -1165,12 +1165,9 @@ impl PhysicalPlanner { )) }); - let object_store = object_store::local::LocalFileSystem::new(); - // register the object store with the runtime environment - let url = Url::try_from("file://").unwrap(); - self.session_ctx - .runtime_env() - .register_object_store(&url, Arc::new(object_store)); + // By default, local FS object store registered + // if `hdfs` feature enabled then HDFS file object store registered + register_object_store(Arc::clone(&self.session_ctx))?; // Generate file groups let mut file_groups: Vec> = @@ -1230,7 +1227,7 @@ impl PhysicalPlanner { // TODO: I think we can remove partition_count in the future, but leave for testing. assert_eq!(file_groups.len(), partition_count); - let object_store_url = ObjectStoreUrl::local_filesystem(); + let object_store_url = ObjectStoreUrl::parse("hdfs://namenode:9000").unwrap(); let partition_fields: Vec = partition_schema .fields() .iter() diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 248f2babd6..d5c8655a80 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -40,6 +40,8 @@ use datafusion_comet_spark_expr::{timezone, EvalMode, SparkError, SparkResult}; use datafusion_common::{cast::as_generic_string_array, Result as DataFusionResult, ScalarValue}; use datafusion_expr::ColumnarValue; // use datafusion_physical_expr::PhysicalExpr; +use crate::execution::operators::ExecutionError; +use datafusion::prelude::SessionContext; use num::{ cast::AsPrimitive, integer::div_floor, traits::CheckedNeg, CheckedSub, Integer, Num, ToPrimitive, @@ -48,6 +50,7 @@ use regex::Regex; use std::collections::HashMap; use std::str::FromStr; use std::{fmt::Debug, hash::Hash, num::Wrapping, sync::Arc}; +use url::Url; static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f"); @@ -1861,6 +1864,40 @@ fn trim_end(s: &str) -> &str { } } +#[cfg(not(feature = "hdfs"))] +pub(crate) fn register_object_store( + session_context: Arc, +) -> Result<(), ExecutionError> { + let object_store = object_store::local::LocalFileSystem::new(); + let url = Url::try_from("file://").unwrap(); + session_context + .runtime_env() + .register_object_store(&url, Arc::new(object_store)); + Ok(()) +} + +#[cfg(feature = "hdfs")] +pub(crate) fn register_object_store( + session_context: Arc, +) -> Result<(), ExecutionError> { + // TODO: read the namenode configuration from file schema or from spark.defaultFS + let url = Url::try_from("hdfs://namenode:9000").unwrap(); + if let Some(object_store) = + datafusion_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new((&url).as_ref()) + { + session_context + .runtime_env() + .register_object_store(&url, Arc::new(object_store)); + + return Ok(()); + } + + Err(ExecutionError::GeneralError(format!( + "HDFS object store cannot be created for {}", + url + ))) +} + #[cfg(test)] mod tests { use arrow::datatypes::TimestampMicrosecondType; From 36600d2acbbadd260913c17f07431c0b4e784756 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 30 Jan 2025 17:22:58 -0800 Subject: [PATCH 2/7] fmt --- native/core/src/execution/planner.rs | 5 +---- native/core/src/parquet/parquet_support.rs | 23 +++++++++++----------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 73f2a28b25..f42a9ed192 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -106,7 +106,6 @@ use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter}, JoinType as DFJoinType, ScalarValue, }; -use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::type_coercion::other::get_coerce_type_for_case_expression; use datafusion_expr::{ AggregateUDF, ReturnTypeArgs, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, @@ -1167,7 +1166,7 @@ impl PhysicalPlanner { // By default, local FS object store registered // if `hdfs` feature enabled then HDFS file object store registered - register_object_store(Arc::clone(&self.session_ctx))?; + let object_store_url = register_object_store(Arc::clone(&self.session_ctx))?; // Generate file groups let mut file_groups: Vec> = @@ -1226,8 +1225,6 @@ impl PhysicalPlanner { // TODO: I think we can remove partition_count in the future, but leave for testing. assert_eq!(file_groups.len(), partition_count); - - let object_store_url = ObjectStoreUrl::parse("hdfs://namenode:9000").unwrap(); let partition_fields: Vec = partition_schema .fields() .iter() diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index d5c8655a80..7e1cf679c9 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::execution::operators::ExecutionError; use arrow::{ array::{ cast::AsArray, @@ -35,13 +36,12 @@ use arrow_array::builder::StringBuilder; use arrow_array::{DictionaryArray, StringArray, StructArray}; use arrow_schema::DataType; use chrono::{NaiveDate, NaiveDateTime, TimeZone, Timelike}; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::prelude::SessionContext; use datafusion_comet_spark_expr::utils::array_with_timezone; use datafusion_comet_spark_expr::{timezone, EvalMode, SparkError, SparkResult}; use datafusion_common::{cast::as_generic_string_array, Result as DataFusionResult, ScalarValue}; use datafusion_expr::ColumnarValue; -// use datafusion_physical_expr::PhysicalExpr; -use crate::execution::operators::ExecutionError; -use datafusion::prelude::SessionContext; use num::{ cast::AsPrimitive, integer::div_floor, traits::CheckedNeg, CheckedSub, Integer, Num, ToPrimitive, @@ -50,7 +50,6 @@ use regex::Regex; use std::collections::HashMap; use std::str::FromStr; use std::{fmt::Debug, hash::Hash, num::Wrapping, sync::Arc}; -use url::Url; static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f"); @@ -1867,29 +1866,29 @@ fn trim_end(s: &str) -> &str { #[cfg(not(feature = "hdfs"))] pub(crate) fn register_object_store( session_context: Arc, -) -> Result<(), ExecutionError> { +) -> Result { let object_store = object_store::local::LocalFileSystem::new(); - let url = Url::try_from("file://").unwrap(); + let url = ObjectStoreUrl::parse("file://").unwrap(); session_context .runtime_env() - .register_object_store(&url, Arc::new(object_store)); - Ok(()) + .register_object_store((&url).as_ref(), Arc::new(object_store)); + Ok(url) } #[cfg(feature = "hdfs")] pub(crate) fn register_object_store( session_context: Arc, -) -> Result<(), ExecutionError> { +) -> Result { // TODO: read the namenode configuration from file schema or from spark.defaultFS - let url = Url::try_from("hdfs://namenode:9000").unwrap(); + let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?; if let Some(object_store) = datafusion_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new((&url).as_ref()) { session_context .runtime_env() - .register_object_store(&url, Arc::new(object_store)); + .register_object_store((&url).as_ref(), Arc::new(object_store)); - return Ok(()); + return Ok(url); } Err(ExecutionError::GeneralError(format!( From 19c8bed44cc1ea898e52d5547d773098ff57b1bc Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 30 Jan 2025 17:48:59 -0800 Subject: [PATCH 3/7] fmt --- native/core/src/parquet/parquet_support.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 7e1cf679c9..9ab635d1c7 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -1863,6 +1863,7 @@ fn trim_end(s: &str) -> &str { } } +// Default object store which is local filesystem #[cfg(not(feature = "hdfs"))] pub(crate) fn register_object_store( session_context: Arc, @@ -1871,10 +1872,11 @@ pub(crate) fn register_object_store( let url = ObjectStoreUrl::parse("file://").unwrap(); session_context .runtime_env() - .register_object_store((&url).as_ref(), Arc::new(object_store)); + .register_object_store(url.as_ref(), Arc::new(object_store)); Ok(url) } +// HDFS object store #[cfg(feature = "hdfs")] pub(crate) fn register_object_store( session_context: Arc, @@ -1886,7 +1888,7 @@ pub(crate) fn register_object_store( { session_context .runtime_env() - .register_object_store((&url).as_ref(), Arc::new(object_store)); + .register_object_store(url.as_ref(), Arc::new(object_store)); return Ok(url); } From d9b33aa9ae56aeae478191cf6d191a78620592e1 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 30 Jan 2025 17:50:25 -0800 Subject: [PATCH 4/7] fmt --- native/core/src/parquet/parquet_support.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 9ab635d1c7..ba7e4ddd17 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -1869,7 +1869,7 @@ pub(crate) fn register_object_store( session_context: Arc, ) -> Result { let object_store = object_store::local::LocalFileSystem::new(); - let url = ObjectStoreUrl::parse("file://").unwrap(); + let url = ObjectStoreUrl::parse("file://")?; session_context .runtime_env() .register_object_store(url.as_ref(), Arc::new(object_store)); @@ -1884,7 +1884,7 @@ pub(crate) fn register_object_store( // TODO: read the namenode configuration from file schema or from spark.defaultFS let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?; if let Some(object_store) = - datafusion_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new((&url).as_ref()) + datafusion_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref()) { session_context .runtime_env() From ad030ca7de6e81315d14f7e16316d94f20d92c0d Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 4 Feb 2025 16:22:09 -0800 Subject: [PATCH 5/7] fmt --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 13c668d8a2..9382d1aeee 100644 --- a/Makefile +++ b/Makefile @@ -98,7 +98,7 @@ release-linux: clean cd native && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release ./mvnw install -Prelease -DskipTests $(PROFILES) release: - cd native && RUSTFLAGS="$(RUSTFLAGS) -Ctarget-cpu=native" && RUSTFLAGS=$$RUSTFLAGS cargo build --release $(FEATURES_ARG) + cd native && RUSTFLAGS="$(RUSTFLAGS) -Ctarget-cpu=native" cargo build --release $(FEATURES_ARG) ./mvnw install -Prelease -DskipTests $(PROFILES) release-nogit: cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release From cb53370d00122f41f116865039f737fbfee39d3d Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 13 Feb 2025 17:09:38 -0800 Subject: [PATCH 6/7] update references --- docs/source/user-guide/datasources.md | 2 ++ native/core/src/parquet/parquet_support.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/source/user-guide/datasources.md b/docs/source/user-guide/datasources.md index a152f85c14..27c5492d89 100644 --- a/docs/source/user-guide/datasources.md +++ b/docs/source/user-guide/datasources.md @@ -109,5 +109,7 @@ Input [3]: [id#0, first_name#1, personal_info#4] Verify the native scan type should be `CometNativeScan`. +More on [HDFS Reader](../../../native/hdfs/README.md) + ## S3 In progress diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index ba7e4ddd17..2b7af3ff66 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -1884,7 +1884,7 @@ pub(crate) fn register_object_store( // TODO: read the namenode configuration from file schema or from spark.defaultFS let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?; if let Some(object_store) = - datafusion_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref()) + datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref()) { session_context .runtime_env() From 66b6e7fb2ef2bf79c0a438b14f5d9ddd03762ab4 Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 15 Feb 2025 12:21:31 -0800 Subject: [PATCH 7/7] merge --- native/core/src/parquet/parquet_support.rs | 39 ++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 0fa671a307..3932652045 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::execution::operators::ExecutionError; use arrow::{ array::{cast::AsArray, types::Int32Type, Array, ArrayRef}, compute::{cast_with_options, take, CastOptions}, @@ -22,9 +23,11 @@ use arrow::{ }; use arrow_array::{DictionaryArray, StructArray}; use arrow_schema::DataType; +use datafusion::prelude::SessionContext; use datafusion_comet_spark_expr::utils::array_with_timezone; use datafusion_comet_spark_expr::EvalMode; use datafusion_common::{Result as DataFusionResult, ScalarValue}; +use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::ColumnarValue; use std::collections::HashMap; use std::{fmt::Debug, hash::Hash, sync::Arc}; @@ -195,3 +198,39 @@ fn cast_struct_to_struct( _ => unreachable!(), } } + +// Default object store which is local filesystem +#[cfg(not(feature = "hdfs"))] +pub(crate) fn register_object_store( + session_context: Arc, +) -> Result { + let object_store = object_store::local::LocalFileSystem::new(); + let url = ObjectStoreUrl::parse("file://")?; + session_context + .runtime_env() + .register_object_store(url.as_ref(), Arc::new(object_store)); + Ok(url) +} + +// HDFS object store +#[cfg(feature = "hdfs")] +pub(crate) fn register_object_store( + session_context: Arc, +) -> Result { + // TODO: read the namenode configuration from file schema or from spark.defaultFS + let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?; + if let Some(object_store) = + datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref()) + { + session_context + .runtime_env() + .register_object_store(url.as_ref(), Arc::new(object_store)); + + return Ok(url); + } + + Err(ExecutionError::GeneralError(format!( + "HDFS object store cannot be created for {}", + url + ))) +}