diff --git a/README.md b/README.md index 195cd9d693..c3159820b3 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,8 @@ Apache DataFusion Comet is a high-performance accelerator for Apache Spark, buil performance of Apache Spark workloads while leveraging commodity hardware and seamlessly integrating with the Spark ecosystem without requiring any code changes. +Comet also accelerates Apache Iceberg, when performing Parquet scans from Spark. + [Apache DataFusion]: https://datafusion.apache.org # Benefits of Using Comet diff --git a/docs/source/contributor-guide/benchmarking_aws_ec2.md b/docs/source/contributor-guide/benchmarking_aws_ec2.md index 0ec33bf7ed..bf0b89f67d 100644 --- a/docs/source/contributor-guide/benchmarking_aws_ec2.md +++ b/docs/source/contributor-guide/benchmarking_aws_ec2.md @@ -179,8 +179,8 @@ $SPARK_HOME/bin/spark-submit \ Install Comet JAR from Maven: ```shell -wget https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.7.0/comet-spark-spark3.5_2.12-0.7.0.jar -P $SPARK_HOME/jars -export COMET_JAR=$SPARK_HOME/jars/comet-spark-spark3.5_2.12-0.7.0.jar +wget https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.9.0/comet-spark-spark3.5_2.12-0.9.0.jar -P $SPARK_HOME/jars +export COMET_JAR=$SPARK_HOME/jars/comet-spark-spark3.5_2.12-0.9.0.jar ``` Run the following command (the `--data` parameter will need to be updated to point to your S3 bucket): diff --git a/docs/source/index.rst b/docs/source/index.rst index e8efc2ea76..a47c8f1f54 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -43,7 +43,6 @@ as a native runtime to achieve improvement in terms of query efficiency and quer Comet Overview Installing Comet Building From Source - Kubernetes Guide Supported Data Sources Supported Data Types Supported Operators @@ -52,6 +51,8 @@ as a native runtime to achieve improvement in terms of query efficiency and quer Compatibility Guide Tuning Guide Metrics Guide + Iceberg Guide + Kubernetes Guide .. _toc.contributor-guide-links: .. toctree:: diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index 5f0842eb4d..84c4aab0e9 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -38,15 +38,19 @@ Comet does not support reading decimals encoded in binary format. ### Parquet Scans Comet currently has three distinct implementations of the Parquet scan operator. The configuration property -`spark.comet.scan.impl` is used to select an implementation. - -| Implementation | Description | -| ----------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `native_comet` | This is the default implementation. It provides strong compatibility with Spark but does not support complex types. | -| `native_datafusion` | This implementation delegates to DataFusion's `DataSourceExec`. | -| `native_iceberg_compat` | This implementation also delegates to DataFusion's `DataSourceExec` but uses a hybrid approach of JVM and native code. This scan is designed to be integrated with Iceberg in the future. | - -The new (and currently experimental) `native_datafusion` and `native_iceberg_compat` scans provide the following benefits over the `native_comet` +`spark.comet.scan.impl` is used to select an implementation. The default setting is `spark.comet.scan.impl=auto`, and +Comet will choose the most appropriate implementation based on the Parquet schema and other Comet configuration +settings. Most users should not need to change this setting. However, it is possible to force Comet to try and use +a particular implementation for all scan operations by setting this configuration property to one of the following +implementations. + +| Implementation | Description | +| ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| `native_comet` | This implementation provides strong compatibility with Spark but does not support complex types. This is the original scan implementation in Comet and may eventually be removed. | +| `native_iceberg_compat` | This implementation delegates to DataFusion's `DataSourceExec` but uses a hybrid approach of JVM and native code. This scan is designed to be integrated with Iceberg in the future. | +| `native_datafusion` | This experimental implementation delegates to DataFusion's `DataSourceExec` for full native execution. There are known compatibility issues when using this scan. | + +The `native_datafusion` and `native_iceberg_compat` scans provide the following benefits over the `native_comet` implementation: - Leverages the DataFusion community's ongoing improvements to `DataSourceExec` @@ -54,36 +58,28 @@ implementation: - Removes the use of reusable mutable-buffers in Comet, which is complex to maintain - Improves performance -The new scans currently have the following limitations: - -Issues common to both `native_datafusion` and `native_iceberg_compat`: +The `native_datafusion` and `native_iceberg_compat` scans share the following limitations: - When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8` -or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these -logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned -rather than signed. By default, Comet will fall back to Spark when scanning Parquet files containing `byte` or `short` -types (regardless of the logical type). This behavior can be disabled by setting -`spark.comet.scan.allowIncompatible=true`. -- There is a known performance issue when pushing filters down to Parquet. See the [Comet Tuning Guide] for more -information. -- Reading maps containing complex types can result in errors or incorrect results [#1754] -- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] -- There are failures in the Spark SQL test suite when enabling these new scans (tracking issues: [#1542] and [#1545]). + or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these + logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned + rather than signed. By default, Comet will fall back to `native_comet` when scanning Parquet files containing `byte` or `short` + types (regardless of the logical type). This behavior can be disabled by setting + `spark.comet.scan.allowIncompatible=true`. - No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. -- Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with `native_datafusion` scan. -Issues specific to `native_datafusion`: +The `native_datafusion` scan has some additional limitations: - Bucketed scans are not supported - No support for row indexes +- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] +- There are failures in the Spark SQL test suite [#1545] +- Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with Spark [#1545]: https://github.com/apache/datafusion-comet/issues/1545 -[#1542]: https://github.com/apache/datafusion-comet/issues/1542 -[#1754]: https://github.com/apache/datafusion-comet/issues/1754 [#1758]: https://github.com/apache/datafusion-comet/issues/1758 -[Comet Tuning Guide]: tuning.md -## ANSI mode +## ANSI Mode Comet currently ignores ANSI mode in most cases, and therefore can produce different results than Spark. By default, Comet will fall back to Spark if ANSI mode is enabled. To enable Comet to accelerate queries when ANSI mode is enabled, @@ -92,7 +88,7 @@ be used in production. There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support. -## Floating number comparison +## Floating-point Number Comparison Spark normalizes NaN and zero for floating point numbers for several cases. See `NormalizeFloatingNumbers` optimization rule in Spark. However, one exception is comparison. Spark does not normalize NaN and zero when comparing values @@ -125,104 +121,104 @@ Cast operations in Comet fall into three levels of support: - **Compatible**: The results match Apache Spark - **Incompatible**: The results may match Apache Spark for some inputs, but there are known issues where some inputs -will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting -`spark.comet.cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not -recommended for production use. + will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting + `spark.comet.cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not + recommended for production use. - **Unsupported**: Comet does not provide a native version of this cast expression and the query stage will fall back to -Spark. + Spark. ### Compatible Casts The following cast operations are generally compatible with Spark except for the differences noted here. -| From Type | To Type | Notes | -|-|-|-| -| boolean | byte | | -| boolean | short | | -| boolean | integer | | -| boolean | long | | -| boolean | float | | -| boolean | double | | -| boolean | string | | -| byte | boolean | | -| byte | short | | -| byte | integer | | -| byte | long | | -| byte | float | | -| byte | double | | -| byte | decimal | | -| byte | string | | -| short | boolean | | -| short | byte | | -| short | integer | | -| short | long | | -| short | float | | -| short | double | | -| short | decimal | | -| short | string | | -| integer | boolean | | -| integer | byte | | -| integer | short | | -| integer | long | | -| integer | float | | -| integer | double | | -| integer | string | | -| long | boolean | | -| long | byte | | -| long | short | | -| long | integer | | -| long | float | | -| long | double | | -| long | string | | -| float | boolean | | -| float | byte | | -| float | short | | -| float | integer | | -| float | long | | -| float | double | | -| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | -| double | boolean | | -| double | byte | | -| double | short | | -| double | integer | | -| double | long | | -| double | float | | -| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | -| decimal | byte | | -| decimal | short | | -| decimal | integer | | -| decimal | long | | -| decimal | float | | -| decimal | double | | -| decimal | decimal | | -| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | -| string | boolean | | -| string | byte | | -| string | short | | -| string | integer | | -| string | long | | -| string | binary | | -| string | date | Only supports years between 262143 BC and 262142 AD | -| date | string | | -| timestamp | long | | -| timestamp | string | | -| timestamp | date | | +| From Type | To Type | Notes | +| --------- | ------- | --------------------------------------------------------------------------------------------------------------- | +| boolean | byte | | +| boolean | short | | +| boolean | integer | | +| boolean | long | | +| boolean | float | | +| boolean | double | | +| boolean | string | | +| byte | boolean | | +| byte | short | | +| byte | integer | | +| byte | long | | +| byte | float | | +| byte | double | | +| byte | decimal | | +| byte | string | | +| short | boolean | | +| short | byte | | +| short | integer | | +| short | long | | +| short | float | | +| short | double | | +| short | decimal | | +| short | string | | +| integer | boolean | | +| integer | byte | | +| integer | short | | +| integer | long | | +| integer | float | | +| integer | double | | +| integer | string | | +| long | boolean | | +| long | byte | | +| long | short | | +| long | integer | | +| long | float | | +| long | double | | +| long | string | | +| float | boolean | | +| float | byte | | +| float | short | | +| float | integer | | +| float | long | | +| float | double | | +| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| double | boolean | | +| double | byte | | +| double | short | | +| double | integer | | +| double | long | | +| double | float | | +| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| decimal | byte | | +| decimal | short | | +| decimal | integer | | +| decimal | long | | +| decimal | float | | +| decimal | double | | +| decimal | decimal | | +| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | +| string | boolean | | +| string | byte | | +| string | short | | +| string | integer | | +| string | long | | +| string | binary | | +| string | date | Only supports years between 262143 BC and 262142 AD | +| date | string | | +| timestamp | long | | +| timestamp | string | | +| timestamp | date | | ### Incompatible Casts The following cast operations are not compatible with Spark for all inputs and are disabled by default. -| From Type | To Type | Notes | -|-|-|-| -| integer | decimal | No overflow check | -| long | decimal | No overflow check | -| float | decimal | There can be rounding differences | -| double | decimal | There can be rounding differences | -| string | float | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | -| string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | -| string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits | -| string | timestamp | Not all valid formats are supported | -| binary | string | Only works for binary data representing valid UTF-8 strings | +| From Type | To Type | Notes | +| --------- | --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | +| integer | decimal | No overflow check | +| long | decimal | No overflow check | +| float | decimal | There can be rounding differences | +| double | decimal | There can be rounding differences | +| string | float | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | +| string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | +| string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits | +| string | timestamp | Not all valid formats are supported | +| binary | string | Only works for binary data representing valid UTF-8 strings | ### Unsupported Casts diff --git a/docs/source/user-guide/datasources.md b/docs/source/user-guide/datasources.md index bd28297f91..45a1e18b02 100644 --- a/docs/source/user-guide/datasources.md +++ b/docs/source/user-guide/datasources.md @@ -28,6 +28,12 @@ in the schema are supported. When this option is not enabled, the scan will fall enabling `spark.comet.convert.parquet.enabled` will immediately convert the data into Arrow format, allowing native execution to happen after that, but the process may not be efficient. +### Apache Iceberg + +Comet accelerates Iceberg scans of Parquet files. See the [Iceberg Guide] for more information. + +[Iceberg Guide]: iceberg.md + ### CSV Comet does not provide native CSV scan, but when `spark.comet.convert.csv.enabled` is enabled, data is immediately @@ -88,7 +94,7 @@ root | |-- lastName: string (nullable = true) | |-- ageInYears: integer (nullable = true) -25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.7.0 initialized +25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized == Physical Plan == * CometColumnarToRow (2) +- CometNativeScan: (1) diff --git a/docs/source/user-guide/datatypes.md b/docs/source/user-guide/datatypes.md index 28aae0060b..3b7940dd67 100644 --- a/docs/source/user-guide/datatypes.md +++ b/docs/source/user-guide/datatypes.md @@ -39,5 +39,7 @@ The following Spark data types are currently available: - Timestamp - TimestampNTZ - Null -- Struct -- Array +- Complex Types + - Struct + - Array + - Map diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index b191f42a75..bc507e3a65 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -44,22 +44,13 @@ export COMET_JAR=`pwd`/spark/target/comet-spark-spark3.5_2.12-0.10.0-SNAPSHOT.ja ## Build Iceberg -Clone the Iceberg repository. +Clone the Iceberg repository and apply code changes needed by Comet ```shell git clone git@github.com:apache/iceberg.git -``` - -It will be necessary to make some small changes to Iceberg: - -- Update Gradle files to change Comet version to `0.10.0-SNAPSHOT`. -- Replace `import org.apache.comet.shaded.arrow.c.CometSchemaImporter;` with `import org.apache.comet.CometSchemaImporter;` -- Modify `SparkBatchQueryScan` so that it implements the `SupportsComet` interface -- Stop shading Parquet by commenting out the following lines in the iceberg-spark build: - -``` -// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' -// relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' +cd iceberg +git checkout apache-iceberg-1.8.1 +git apply ../datafusion-comet/dev/diffs/iceberg/1.8.1.diff ``` Perform a clean build @@ -74,7 +65,7 @@ Perform a clean build Set `ICEBERG_JAR` environment variable. ```shell -export ICEBERG_JAR=`pwd`/spark/v3.5/spark-runtime/build/libs/iceberg-spark-runtime-3.5_2.12-1.10.0-SNAPSHOT.jar +export ICEBERG_JAR=`pwd`/spark/v3.5/spark-runtime/build/libs/iceberg-spark-runtime-3.5_2.12-1.9.0-SNAPSHOT.jar ``` Launch Spark Shell: @@ -93,7 +84,7 @@ $SPARK_HOME/bin/spark-shell \ --conf spark.sql.iceberg.parquet.reader-type=COMET \ --conf spark.comet.explainFallback.enabled=true \ --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g + --conf spark.memory.offHeap.size=2g ``` Create an Iceberg table. Note that Comet will not accelerate this part. @@ -113,12 +104,6 @@ This should produce the following output: ``` scala> spark.sql(s"SELECT * from t1").show() -25/04/28 07:29:37 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized -25/04/28 07:29:37 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging): - CollectLimit -+- Project [COMET: toprettystring is not supported] - +- CometScanWrapper - +---+---+ | c0| c1| +---+---+ @@ -145,3 +130,12 @@ scala> spark.sql(s"SELECT * from t1").show() +---+---+ only showing top 20 rows ``` + +Confirm that the query was accelerated by Comet: + +``` +scala> spark.sql(s"SELECT * from t1").explain() +== Physical Plan == +*(1) CometColumnarToRow ++- CometBatchScan spark_catalog.default.t1[c0#26, c1#27] spark_catalog.default.t1 (branch=null) [filters=, groupedBy=] RuntimeFilters: [] +``` \ No newline at end of file diff --git a/docs/source/user-guide/installation.md b/docs/source/user-guide/installation.md index b43753fd97..12f41ef538 100644 --- a/docs/source/user-guide/installation.md +++ b/docs/source/user-guide/installation.md @@ -51,7 +51,7 @@ use only and should not be used in production yet. | Spark Version | Java Version | Scala Version | Comet Tests in CI | Spark SQL Tests in CI | | -------------- | ------------ | ------------- | ----------------- |-----------------------| -| 4.0.0-preview1 | 17 | 2.13 | Yes | Yes | +| 4.0.0 | 17 | 2.13 | Yes | Yes | Note that Comet may not fully work with proprietary forks of Apache Spark such as the Spark versions offered by Cloud Service Providers. diff --git a/docs/source/user-guide/overview.md b/docs/source/user-guide/overview.md index 92dfe2bb94..f011885645 100644 --- a/docs/source/user-guide/overview.md +++ b/docs/source/user-guide/overview.md @@ -30,14 +30,6 @@ The following diagram provides an overview of Comet's architecture. ![Comet Overview](../_static/images/comet-overview.png) -Comet aims to support: - -- a native Parquet implementation, including both reader and writer -- full implementation of Spark operators, including - Filter/Project/Aggregation/Join/Exchange etc. -- full implementation of Spark built-in expressions. -- a UDF framework for users to migrate their existing UDF to native - ## Architecture The following diagram shows how Comet integrates with Apache Spark. diff --git a/docs/source/user-guide/source.md b/docs/source/user-guide/source.md index b7038d341c..e49323b26d 100644 --- a/docs/source/user-guide/source.md +++ b/docs/source/user-guide/source.md @@ -27,7 +27,7 @@ Official source releases can be downloaded from https://dist.apache.org/repos/di ```console # Pick the latest version -export COMET_VERSION=0.7.0 +export COMET_VERSION=0.9.0 # Download the tarball curl -O "https://dist.apache.org/repos/dist/release/datafusion/datafusion-comet-$COMET_VERSION/apache-datafusion-comet-$COMET_VERSION.tar.gz" # Unpack diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 3b495e9f25..04f0c5523e 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -21,18 +21,6 @@ under the License. Comet provides some tuning options to help you get the best performance from your queries. -## Parquet Scans - -Comet currently has three distinct implementations of the Parquet scan operator. The configuration property -`spark.comet.scan.impl` is used to select an implementation. These scans are described in the -[Compatibility Guide]. - -[Compatibility Guide]: compatibility.md - -When using `native_datafusion` or `native_iceberg_compat`, there are known performance issues when pushing filters -down to Parquet scans. Until this issue is resolved, performance can be improved by setting -`spark.sql.parquet.filterPushdown=false`. - ## Configuring Tokio Runtime Comet uses a global tokio runtime per executor process using tokio's defaults of one worker thread per core and a