Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ define spark_jvm_17_extra_args
$(shell ./mvnw help:evaluate -Dexpression=extraJavaTestArgs | grep -v '\[')
endef

# Build optional Comet native features (like hdfs e.g)
FEATURES_ARG := $(shell ! [ -z $(COMET_FEATURES) ] && echo '--features=$(COMET_FEATURES)')

all: core jvm

core:
Expand Down Expand Up @@ -95,7 +98,7 @@ release-linux: clean
cd native && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release
./mvnw install -Prelease -DskipTests $(PROFILES)
release:
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
cd native && RUSTFLAGS="$(RUSTFLAGS) -Ctarget-cpu=native" cargo build --release $(FEATURES_ARG)
./mvnw install -Prelease -DskipTests $(PROFILES)
release-nogit:
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
Expand Down
78 changes: 78 additions & 0 deletions docs/source/user-guide/datasources.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,81 @@ converted into Arrow format, allowing native execution to happen after that.

Comet does not provide native JSON scan, but when `spark.comet.convert.json.enabled` is enabled, data is immediately
converted into Arrow format, allowing native execution to happen after that.

# Supported Storages

## Local
In progress

## HDFS

Apache DataFusion Comet native reader seamlessly scans files from remote HDFS for [supported formats](#supported-spark-data-sources)

### Using experimental native DataFusion reader
Unlike to native Comet reader the Datafusion reader fully supports nested types processing. This reader is currently experimental only

To build Comet with native DataFusion reader and remote HDFS support it is required to have a JDK installed

Example:
Build a Comet for `spark-3.4` provide a JDK path in `JAVA_HOME`
Provide the JRE linker path in `RUSTFLAGS`, the path can vary depending on the system. Typically JRE linker is a part of installed JDK

```shell
export JAVA_HOME="/opt/homebrew/opt/openjdk@11"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is JAVA_HOME still the requirement?

make release PROFILES="-Pspark-3.4" COMET_FEATURES=hdfs RUSTFLAGS="-L $JAVA_HOME/libexec/openjdk.jdk/Contents/Home/lib/server"
```

Start Comet with experimental reader and HDFS support as [described](installation.md/#run-spark-shell-with-comet-enabled)
and add additional parameters

```shell
--conf spark.comet.scan.impl=native_datafusion \
--conf spark.hadoop.fs.defaultFS="hdfs://namenode:9000" \
--conf spark.hadoop.dfs.client.use.datanode.hostname = true \
--conf dfs.client.use.datanode.hostname = true
```

Query a struct type from Remote HDFS
```shell
spark.read.parquet("hdfs://namenode:9000/user/data").show(false)

root
|-- id: integer (nullable = true)
|-- first_name: string (nullable = true)
|-- personal_info: struct (nullable = true)
| |-- firstName: string (nullable = true)
| |-- lastName: string (nullable = true)
| |-- ageInYears: integer (nullable = true)

25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.6.0 initialized
== Physical Plan ==
* CometColumnarToRow (2)
+- CometNativeScan: (1)


(1) CometNativeScan:
Output [3]: [id#0, first_name#1, personal_info#4]
Arguments: [id#0, first_name#1, personal_info#4]

(2) CometColumnarToRow [codegen id : 1]
Input [3]: [id#0, first_name#1, personal_info#4]


25/01/30 16:50:44 INFO fs-hdfs-0.1.12/src/hdfs.rs: Connecting to Namenode (hdfs://namenode:9000)
+---+----------+-----------------+
|id |first_name|personal_info |
+---+----------+-----------------+
|2 |Jane |{Jane, Smith, 34}|
|1 |John |{John, Doe, 28} |
+---+----------+-----------------+



```

Verify the native scan type should be `CometNativeScan`.

More on [HDFS Reader](../../../native/hdfs/README.md)

## S3
In progress
14 changes: 4 additions & 10 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio

use crate::execution::shuffle::CompressionCodec;
use crate::execution::spark_plan::SparkPlan;
use crate::parquet::parquet_support::SparkParquetOptions;
use crate::parquet::parquet_support::{register_object_store, SparkParquetOptions};
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
Expand Down Expand Up @@ -106,7 +106,6 @@ use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter},
JoinType as DFJoinType, ScalarValue,
};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::type_coercion::other::get_coerce_type_for_case_expression;
use datafusion_expr::{
AggregateUDF, ReturnTypeArgs, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits,
Expand Down Expand Up @@ -1165,12 +1164,9 @@ impl PhysicalPlanner {
))
});

let object_store = object_store::local::LocalFileSystem::new();
// register the object store with the runtime environment
let url = Url::try_from("file://").unwrap();
self.session_ctx
.runtime_env()
.register_object_store(&url, Arc::new(object_store));
// By default, local FS object store registered
// if `hdfs` feature enabled then HDFS file object store registered
let object_store_url = register_object_store(Arc::clone(&self.session_ctx))?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update this function (get_file_path) as well?
It's currently used by NATIVE_ICEBERG_COMPAT but the goal is to unify it with COMET_DATAFUSION.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good point, to verify it we probably need to read Iceberg from HDFS which can be done in #1367

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to wait for actual iceberg integration. CometScan will use COMPAT_ICEBERG if the configuration is set (That's how we are able to run the unit tests).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@comphead we can log a follow up issue to update get_file_path if you like.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @parthchandra lets create a followup ticket. Appreciate if you do it as I'm afraid I can miss some Iceberg details in ticket description

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1407. There's no detail in the PR. Can you assign to me if possible, and I'll remember to take care of it.


// Generate file groups
let mut file_groups: Vec<Vec<PartitionedFile>> =
Expand Down Expand Up @@ -1229,8 +1225,6 @@ impl PhysicalPlanner {

// TODO: I think we can remove partition_count in the future, but leave for testing.
assert_eq!(file_groups.len(), partition_count);

let object_store_url = ObjectStoreUrl::local_filesystem();
let partition_fields: Vec<Field> = partition_schema
.fields()
.iter()
Expand Down
39 changes: 39 additions & 0 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::operators::ExecutionError;
use arrow::{
array::{cast::AsArray, types::Int32Type, Array, ArrayRef},
compute::{cast_with_options, take, CastOptions},
util::display::FormatOptions,
};
use arrow_array::{DictionaryArray, StructArray};
use arrow_schema::DataType;
use datafusion::prelude::SessionContext;
use datafusion_comet_spark_expr::utils::array_with_timezone;
use datafusion_comet_spark_expr::EvalMode;
use datafusion_common::{Result as DataFusionResult, ScalarValue};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::ColumnarValue;
use std::collections::HashMap;
use std::{fmt::Debug, hash::Hash, sync::Arc};
Expand Down Expand Up @@ -195,3 +198,39 @@ fn cast_struct_to_struct(
_ => unreachable!(),
}
}

// Default object store which is local filesystem
#[cfg(not(feature = "hdfs"))]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hdfs cargo feature makes a conditional compilation if hdfs needed

pub(crate) fn register_object_store(
session_context: Arc<SessionContext>,
) -> Result<ObjectStoreUrl, ExecutionError> {
let object_store = object_store::local::LocalFileSystem::new();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't have to be only a local file system.

Copy link
Copy Markdown
Contributor Author

@comphead comphead Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on the feature enabled for the Comet. LocalFileSystem is by default if no specific features selected.
the annotation on this method is

#[cfg(not(feature = "hdfs"))]

This allows to plugin other features like S3, etc

This particular method is responsible for no remote feature selected e.g. for local filesystem.
If a feature selected the conditional compilation will register an object store related to the feature, like HDFS or S3

let url = ObjectStoreUrl::parse("file://")?;
session_context
.runtime_env()
.register_object_store(url.as_ref(), Arc::new(object_store));
Ok(url)
}

// HDFS object store
#[cfg(feature = "hdfs")]
pub(crate) fn register_object_store(
session_context: Arc<SessionContext>,
) -> Result<ObjectStoreUrl, ExecutionError> {
// TODO: read the namenode configuration from file schema or from spark.defaultFS
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to register object store from native_scan.file_partitions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wForget I'm not sure I'm getting it, do you mean the better place to register the object store will be inside file_partitions iterator loop ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean the better place to register the object store will be inside file_partitions iterator loop ?

Yes, is it possible that native scan paths correspond to multiple object stores or are different from spark.defaultFs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for HDFS/S3 the default fs can be taken from spark.hadoop.fs.defaultFS parameter.
To support multiple object stores that is interesting idea however I'm not sure when it can be addressed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for HDFS/S3 the default fs can be taken from spark.hadoop.fs.defaultFS parameter.

Sometimes I also access other hdfs ns like:

select * from `parquet`.`hdfs://other-ns:8020/warehouse/db/table`

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is interesting scenario, I'll add a separate test case for this

let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?;
if let Some(object_store) =
datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
{
session_context
.runtime_env()
.register_object_store(url.as_ref(), Arc::new(object_store));

return Ok(url);
}

Err(ExecutionError::GeneralError(format!(
"HDFS object store cannot be created for {}",
url
)))
}