From 55b1322f081ff407ac568a2031ac7e72059b589b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 06:30:15 -0600 Subject: [PATCH 01/13] Update scan info --- docs/source/user-guide/compatibility.md | 20 +++++++++----------- docs/source/user-guide/tuning.md | 12 ------------ 2 files changed, 9 insertions(+), 23 deletions(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index 5f0842eb4d..40ec72561d 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -38,15 +38,18 @@ Comet does not support reading decimals encoded in binary format. ### Parquet Scans Comet currently has three distinct implementations of the Parquet scan operator. The configuration property -`spark.comet.scan.impl` is used to select an implementation. +`spark.comet.scan.impl` is used to select an implementation. The default setting is `spark.comet.scan.impl=auto`, and +Comet will choose the most appropriate implementation based on the Parquet schema and other Comet configuration +settings. Most users should not need to change this setting. However, it is possible to force Comet to try and use +a particular implementation for all scan operations. | Implementation | Description | -| ----------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| ----------------------- |-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `native_comet` | This is the default implementation. It provides strong compatibility with Spark but does not support complex types. | -| `native_datafusion` | This implementation delegates to DataFusion's `DataSourceExec`. | | `native_iceberg_compat` | This implementation also delegates to DataFusion's `DataSourceExec` but uses a hybrid approach of JVM and native code. This scan is designed to be integrated with Iceberg in the future. | +| `native_datafusion` | This experimental implementation delegates to DataFusion's `DataSourceExec` for full native execution. There are known compatibility issues when using this scan. | -The new (and currently experimental) `native_datafusion` and `native_iceberg_compat` scans provide the following benefits over the `native_comet` +The `native_datafusion` and `native_iceberg_compat` scans provide the following benefits over the `native_comet` implementation: - Leverages the DataFusion community's ongoing improvements to `DataSourceExec` @@ -64,22 +67,17 @@ logical types. Arrow-based readers, such as DataFusion and Comet do respect thes rather than signed. By default, Comet will fall back to Spark when scanning Parquet files containing `byte` or `short` types (regardless of the logical type). This behavior can be disabled by setting `spark.comet.scan.allowIncompatible=true`. -- There is a known performance issue when pushing filters down to Parquet. See the [Comet Tuning Guide] for more -information. -- Reading maps containing complex types can result in errors or incorrect results [#1754] - `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] -- There are failures in the Spark SQL test suite when enabling these new scans (tracking issues: [#1542] and [#1545]). - No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. -- Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with `native_datafusion` scan. Issues specific to `native_datafusion`: - Bucketed scans are not supported - No support for row indexes +- There are failures in the Spark SQL test suite [#1545] +- Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with Spark [#1545]: https://github.com/apache/datafusion-comet/issues/1545 -[#1542]: https://github.com/apache/datafusion-comet/issues/1542 -[#1754]: https://github.com/apache/datafusion-comet/issues/1754 [#1758]: https://github.com/apache/datafusion-comet/issues/1758 [Comet Tuning Guide]: tuning.md diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 3b495e9f25..04f0c5523e 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -21,18 +21,6 @@ under the License. Comet provides some tuning options to help you get the best performance from your queries. -## Parquet Scans - -Comet currently has three distinct implementations of the Parquet scan operator. The configuration property -`spark.comet.scan.impl` is used to select an implementation. These scans are described in the -[Compatibility Guide]. - -[Compatibility Guide]: compatibility.md - -When using `native_datafusion` or `native_iceberg_compat`, there are known performance issues when pushing filters -down to Parquet scans. Until this issue is resolved, performance can be improved by setting -`spark.sql.parquet.filterPushdown=false`. - ## Configuring Tokio Runtime Comet uses a global tokio runtime per executor process using tokio's defaults of one worker thread per core and a From 768808ee3adf6da21dbd0f2f4fec2be74b540a9c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 06:34:37 -0600 Subject: [PATCH 02/13] update --- docs/source/user-guide/compatibility.md | 203 ++++++++++++------------ 1 file changed, 101 insertions(+), 102 deletions(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index 40ec72561d..9831a780a5 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -38,16 +38,16 @@ Comet does not support reading decimals encoded in binary format. ### Parquet Scans Comet currently has three distinct implementations of the Parquet scan operator. The configuration property -`spark.comet.scan.impl` is used to select an implementation. The default setting is `spark.comet.scan.impl=auto`, and -Comet will choose the most appropriate implementation based on the Parquet schema and other Comet configuration -settings. Most users should not need to change this setting. However, it is possible to force Comet to try and use -a particular implementation for all scan operations. +`spark.comet.scan.impl` is used to select an implementation. The default setting is `spark.comet.scan.impl=auto`, and +Comet will choose the most appropriate implementation based on the Parquet schema and other Comet configuration +settings. Most users should not need to change this setting. However, it is possible to force Comet to try and use +a particular implementation for all scan operations. -| Implementation | Description | -| ----------------------- |-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `native_comet` | This is the default implementation. It provides strong compatibility with Spark but does not support complex types. | -| `native_iceberg_compat` | This implementation also delegates to DataFusion's `DataSourceExec` but uses a hybrid approach of JVM and native code. This scan is designed to be integrated with Iceberg in the future. | -| `native_datafusion` | This experimental implementation delegates to DataFusion's `DataSourceExec` for full native execution. There are known compatibility issues when using this scan. | +| Implementation | Description | +| ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| `native_comet` | This implementation provides strong compatibility with Spark but does not support complex types. This is the original scan implementation in Comet and may eventually be removed. | +| `native_iceberg_compat` | This implementation delegates to DataFusion's `DataSourceExec` but uses a hybrid approach of JVM and native code. This scan is designed to be integrated with Iceberg in the future. | +| `native_datafusion` | This experimental implementation delegates to DataFusion's `DataSourceExec` for full native execution. There are known compatibility issues when using this scan. | The `native_datafusion` and `native_iceberg_compat` scans provide the following benefits over the `native_comet` implementation: @@ -62,11 +62,11 @@ The new scans currently have the following limitations: Issues common to both `native_datafusion` and `native_iceberg_compat`: - When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8` -or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these -logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned -rather than signed. By default, Comet will fall back to Spark when scanning Parquet files containing `byte` or `short` -types (regardless of the logical type). This behavior can be disabled by setting -`spark.comet.scan.allowIncompatible=true`. + or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these + logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned + rather than signed. By default, Comet will fall back to `native_comet` when scanning Parquet files containing `byte` or `short` + types (regardless of the logical type). This behavior can be disabled by setting + `spark.comet.scan.allowIncompatible=true`. - `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] - No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. @@ -79,7 +79,6 @@ Issues specific to `native_datafusion`: [#1545]: https://github.com/apache/datafusion-comet/issues/1545 [#1758]: https://github.com/apache/datafusion-comet/issues/1758 -[Comet Tuning Guide]: tuning.md ## ANSI mode @@ -123,104 +122,104 @@ Cast operations in Comet fall into three levels of support: - **Compatible**: The results match Apache Spark - **Incompatible**: The results may match Apache Spark for some inputs, but there are known issues where some inputs -will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting -`spark.comet.cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not -recommended for production use. + will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting + `spark.comet.cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not + recommended for production use. - **Unsupported**: Comet does not provide a native version of this cast expression and the query stage will fall back to -Spark. + Spark. ### Compatible Casts The following cast operations are generally compatible with Spark except for the differences noted here. -| From Type | To Type | Notes | -|-|-|-| -| boolean | byte | | -| boolean | short | | -| boolean | integer | | -| boolean | long | | -| boolean | float | | -| boolean | double | | -| boolean | string | | -| byte | boolean | | -| byte | short | | -| byte | integer | | -| byte | long | | -| byte | float | | -| byte | double | | -| byte | decimal | | -| byte | string | | -| short | boolean | | -| short | byte | | -| short | integer | | -| short | long | | -| short | float | | -| short | double | | -| short | decimal | | -| short | string | | -| integer | boolean | | -| integer | byte | | -| integer | short | | -| integer | long | | -| integer | float | | -| integer | double | | -| integer | string | | -| long | boolean | | -| long | byte | | -| long | short | | -| long | integer | | -| long | float | | -| long | double | | -| long | string | | -| float | boolean | | -| float | byte | | -| float | short | | -| float | integer | | -| float | long | | -| float | double | | -| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | -| double | boolean | | -| double | byte | | -| double | short | | -| double | integer | | -| double | long | | -| double | float | | -| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | -| decimal | byte | | -| decimal | short | | -| decimal | integer | | -| decimal | long | | -| decimal | float | | -| decimal | double | | -| decimal | decimal | | -| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | -| string | boolean | | -| string | byte | | -| string | short | | -| string | integer | | -| string | long | | -| string | binary | | -| string | date | Only supports years between 262143 BC and 262142 AD | -| date | string | | -| timestamp | long | | -| timestamp | string | | -| timestamp | date | | +| From Type | To Type | Notes | +| --------- | ------- | --------------------------------------------------------------------------------------------------------------- | +| boolean | byte | | +| boolean | short | | +| boolean | integer | | +| boolean | long | | +| boolean | float | | +| boolean | double | | +| boolean | string | | +| byte | boolean | | +| byte | short | | +| byte | integer | | +| byte | long | | +| byte | float | | +| byte | double | | +| byte | decimal | | +| byte | string | | +| short | boolean | | +| short | byte | | +| short | integer | | +| short | long | | +| short | float | | +| short | double | | +| short | decimal | | +| short | string | | +| integer | boolean | | +| integer | byte | | +| integer | short | | +| integer | long | | +| integer | float | | +| integer | double | | +| integer | string | | +| long | boolean | | +| long | byte | | +| long | short | | +| long | integer | | +| long | float | | +| long | double | | +| long | string | | +| float | boolean | | +| float | byte | | +| float | short | | +| float | integer | | +| float | long | | +| float | double | | +| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| double | boolean | | +| double | byte | | +| double | short | | +| double | integer | | +| double | long | | +| double | float | | +| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| decimal | byte | | +| decimal | short | | +| decimal | integer | | +| decimal | long | | +| decimal | float | | +| decimal | double | | +| decimal | decimal | | +| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | +| string | boolean | | +| string | byte | | +| string | short | | +| string | integer | | +| string | long | | +| string | binary | | +| string | date | Only supports years between 262143 BC and 262142 AD | +| date | string | | +| timestamp | long | | +| timestamp | string | | +| timestamp | date | | ### Incompatible Casts The following cast operations are not compatible with Spark for all inputs and are disabled by default. -| From Type | To Type | Notes | -|-|-|-| -| integer | decimal | No overflow check | -| long | decimal | No overflow check | -| float | decimal | There can be rounding differences | -| double | decimal | There can be rounding differences | -| string | float | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | -| string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | -| string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits | -| string | timestamp | Not all valid formats are supported | -| binary | string | Only works for binary data representing valid UTF-8 strings | +| From Type | To Type | Notes | +| --------- | --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | +| integer | decimal | No overflow check | +| long | decimal | No overflow check | +| float | decimal | There can be rounding differences | +| double | decimal | There can be rounding differences | +| string | float | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | +| string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | +| string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits | +| string | timestamp | Not all valid formats are supported | +| binary | string | Only works for binary data representing valid UTF-8 strings | ### Unsupported Casts From bf8869383663ff9109f90c27d47a63a311d0c22b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 06:38:09 -0600 Subject: [PATCH 03/13] update --- docs/source/user-guide/compatibility.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index 9831a780a5..1327b84f3f 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -41,7 +41,8 @@ Comet currently has three distinct implementations of the Parquet scan operator. `spark.comet.scan.impl` is used to select an implementation. The default setting is `spark.comet.scan.impl=auto`, and Comet will choose the most appropriate implementation based on the Parquet schema and other Comet configuration settings. Most users should not need to change this setting. However, it is possible to force Comet to try and use -a particular implementation for all scan operations. +a particular implementation for all scan operations by setting this configuration property to one of the following +implementations. | Implementation | Description | | ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | @@ -57,9 +58,7 @@ implementation: - Removes the use of reusable mutable-buffers in Comet, which is complex to maintain - Improves performance -The new scans currently have the following limitations: - -Issues common to both `native_datafusion` and `native_iceberg_compat`: +The `native_datafusion` and `native_iceberg_compat` scans shared the following limitations: - When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8` or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these @@ -70,7 +69,7 @@ Issues common to both `native_datafusion` and `native_iceberg_compat`: - `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] - No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. -Issues specific to `native_datafusion`: +The `native_datafusion` scan has some additional limitations: - Bucketed scans are not supported - No support for row indexes From 6abcd84baa99181b16f4bafb7d74399c4d3fefd4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 06:39:19 -0600 Subject: [PATCH 04/13] update --- docs/source/user-guide/compatibility.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index 1327b84f3f..db0425f6c1 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -58,7 +58,7 @@ implementation: - Removes the use of reusable mutable-buffers in Comet, which is complex to maintain - Improves performance -The `native_datafusion` and `native_iceberg_compat` scans shared the following limitations: +The `native_datafusion` and `native_iceberg_compat` scans share the following limitations: - When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8` or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these @@ -88,7 +88,7 @@ be used in production. There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support. -## Floating number comparison +## Floating-point Number Comparison Spark normalizes NaN and zero for floating point numbers for several cases. See `NormalizeFloatingNumbers` optimization rule in Spark. However, one exception is comparison. Spark does not normalize NaN and zero when comparing values From 720cd2e7fae958214a535894914fda7494321785 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 06:39:43 -0600 Subject: [PATCH 05/13] update --- docs/source/user-guide/compatibility.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index db0425f6c1..d94dd65062 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -79,7 +79,7 @@ The `native_datafusion` scan has some additional limitations: [#1545]: https://github.com/apache/datafusion-comet/issues/1545 [#1758]: https://github.com/apache/datafusion-comet/issues/1758 -## ANSI mode +## ANSI Mode Comet currently ignores ANSI mode in most cases, and therefore can produce different results than Spark. By default, Comet will fall back to Spark if ANSI mode is enabled. To enable Comet to accelerate queries when ANSI mode is enabled, From d406336c81da5ba683b1ff51e2ac4aed4c83cd35 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 06:51:12 -0600 Subject: [PATCH 06/13] update --- docs/source/index.rst | 3 ++- docs/source/user-guide/datasources.md | 6 ++++++ docs/source/user-guide/datatypes.md | 6 ++++-- docs/source/user-guide/iceberg.md | 15 +++++---------- docs/source/user-guide/installation.md | 2 +- docs/source/user-guide/overview.md | 8 -------- 6 files changed, 18 insertions(+), 22 deletions(-) diff --git a/docs/source/index.rst b/docs/source/index.rst index e8efc2ea76..a47c8f1f54 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -43,7 +43,6 @@ as a native runtime to achieve improvement in terms of query efficiency and quer Comet Overview Installing Comet Building From Source - Kubernetes Guide Supported Data Sources Supported Data Types Supported Operators @@ -52,6 +51,8 @@ as a native runtime to achieve improvement in terms of query efficiency and quer Compatibility Guide Tuning Guide Metrics Guide + Iceberg Guide + Kubernetes Guide .. _toc.contributor-guide-links: .. toctree:: diff --git a/docs/source/user-guide/datasources.md b/docs/source/user-guide/datasources.md index bd28297f91..0e3988e58f 100644 --- a/docs/source/user-guide/datasources.md +++ b/docs/source/user-guide/datasources.md @@ -28,6 +28,12 @@ in the schema are supported. When this option is not enabled, the scan will fall enabling `spark.comet.convert.parquet.enabled` will immediately convert the data into Arrow format, allowing native execution to happen after that, but the process may not be efficient. +### Apache Iceberg + +Comet accelerates Iceberg scans of Parquet files. See the [Iceberg Guide] for more information. + +[Iceberg Guide]: iceberg.md + ### CSV Comet does not provide native CSV scan, but when `spark.comet.convert.csv.enabled` is enabled, data is immediately diff --git a/docs/source/user-guide/datatypes.md b/docs/source/user-guide/datatypes.md index 28aae0060b..3b7940dd67 100644 --- a/docs/source/user-guide/datatypes.md +++ b/docs/source/user-guide/datatypes.md @@ -39,5 +39,7 @@ The following Spark data types are currently available: - Timestamp - TimestampNTZ - Null -- Struct -- Array +- Complex Types + - Struct + - Array + - Map diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index b191f42a75..af39f58804 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -50,17 +50,12 @@ Clone the Iceberg repository. git clone git@github.com:apache/iceberg.git ``` -It will be necessary to make some small changes to Iceberg: +It will be necessary to make some changes to Iceberg. -- Update Gradle files to change Comet version to `0.10.0-SNAPSHOT`. -- Replace `import org.apache.comet.shaded.arrow.c.CometSchemaImporter;` with `import org.apache.comet.CometSchemaImporter;` -- Modify `SparkBatchQueryScan` so that it implements the `SupportsComet` interface -- Stop shading Parquet by commenting out the following lines in the iceberg-spark build: +For Iceberg version 1.8.1, the diff file `dev/diffs/iceberg/1.8.1.diff` can be applied. -``` -// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' -// relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' -``` +See the GitHub workflow `.github/workflows/iceberg_spark_test.yml` for a full example of integrating Comet and +Iceberg and running Iceberg's Spark SQL tests with Comet enabled. Perform a clean build @@ -93,7 +88,7 @@ $SPARK_HOME/bin/spark-shell \ --conf spark.sql.iceberg.parquet.reader-type=COMET \ --conf spark.comet.explainFallback.enabled=true \ --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g + --conf spark.memory.offHeap.size=2g ``` Create an Iceberg table. Note that Comet will not accelerate this part. diff --git a/docs/source/user-guide/installation.md b/docs/source/user-guide/installation.md index 0e8412915b..4d5849dd42 100644 --- a/docs/source/user-guide/installation.md +++ b/docs/source/user-guide/installation.md @@ -51,7 +51,7 @@ use only and should not be used in production yet. | Spark Version | Java Version | Scala Version | Comet Tests in CI | Spark SQL Tests in CI | | -------------- | ------------ | ------------- | ----------------- |-----------------------| -| 4.0.0-preview1 | 17 | 2.13 | Yes | Yes | +| 4.0.0 | 17 | 2.13 | Yes | Yes | Note that Comet may not fully work with proprietary forks of Apache Spark such as the Spark versions offered by Cloud Service Providers. diff --git a/docs/source/user-guide/overview.md b/docs/source/user-guide/overview.md index 92dfe2bb94..f011885645 100644 --- a/docs/source/user-guide/overview.md +++ b/docs/source/user-guide/overview.md @@ -30,14 +30,6 @@ The following diagram provides an overview of Comet's architecture. ![Comet Overview](../_static/images/comet-overview.png) -Comet aims to support: - -- a native Parquet implementation, including both reader and writer -- full implementation of Spark operators, including - Filter/Project/Aggregation/Join/Exchange etc. -- full implementation of Spark built-in expressions. -- a UDF framework for users to migrate their existing UDF to native - ## Architecture The following diagram shows how Comet integrates with Apache Spark. From 7e5222c12763efb6278038be917d561e37c6f8db Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 06:54:03 -0600 Subject: [PATCH 07/13] update --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 195cd9d693..c3159820b3 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,8 @@ Apache DataFusion Comet is a high-performance accelerator for Apache Spark, buil performance of Apache Spark workloads while leveraging commodity hardware and seamlessly integrating with the Spark ecosystem without requiring any code changes. +Comet also accelerates Apache Iceberg, when performing Parquet scans from Spark. + [Apache DataFusion]: https://datafusion.apache.org # Benefits of Using Comet From 94d8643d332722b840662a50b1f17b47efbe6e14 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 07:13:06 -0600 Subject: [PATCH 08/13] update --- docs/source/contributor-guide/benchmarking_aws_ec2.md | 4 ++-- docs/source/user-guide/datasources.md | 2 +- docs/source/user-guide/installation.md | 11 ++++++----- docs/source/user-guide/kubernetes.md | 6 +++--- docs/source/user-guide/source.md | 2 +- 5 files changed, 13 insertions(+), 12 deletions(-) diff --git a/docs/source/contributor-guide/benchmarking_aws_ec2.md b/docs/source/contributor-guide/benchmarking_aws_ec2.md index 0ec33bf7ed..bf0b89f67d 100644 --- a/docs/source/contributor-guide/benchmarking_aws_ec2.md +++ b/docs/source/contributor-guide/benchmarking_aws_ec2.md @@ -179,8 +179,8 @@ $SPARK_HOME/bin/spark-submit \ Install Comet JAR from Maven: ```shell -wget https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.7.0/comet-spark-spark3.5_2.12-0.7.0.jar -P $SPARK_HOME/jars -export COMET_JAR=$SPARK_HOME/jars/comet-spark-spark3.5_2.12-0.7.0.jar +wget https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.9.0/comet-spark-spark3.5_2.12-0.9.0.jar -P $SPARK_HOME/jars +export COMET_JAR=$SPARK_HOME/jars/comet-spark-spark3.5_2.12-0.9.0.jar ``` Run the following command (the `--data` parameter will need to be updated to point to your S3 bucket): diff --git a/docs/source/user-guide/datasources.md b/docs/source/user-guide/datasources.md index 0e3988e58f..45a1e18b02 100644 --- a/docs/source/user-guide/datasources.md +++ b/docs/source/user-guide/datasources.md @@ -94,7 +94,7 @@ root | |-- lastName: string (nullable = true) | |-- ageInYears: integer (nullable = true) -25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.7.0 initialized +25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized == Physical Plan == * CometColumnarToRow (2) +- CometNativeScan: (1) diff --git a/docs/source/user-guide/installation.md b/docs/source/user-guide/installation.md index 4d5849dd42..ecad423dbd 100644 --- a/docs/source/user-guide/installation.md +++ b/docs/source/user-guide/installation.md @@ -61,12 +61,13 @@ Cloud Service Providers. Comet jar files are available in [Maven Central](https://central.sonatype.com/namespace/org.apache.datafusion) for amd64 and arm64 architectures for Linux. For Apple macOS, it is currently necessary to build from source. -Here are the direct links for downloading the Comet 0.7.0 jar file. +Here are the direct links for downloading the Comet 0.9.0 jar file. -- [Comet plugin for Spark 3.4 / Scala 2.12](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.4_2.12/0.7.0/comet-spark-spark3.4_2.12-0.7.0.jar) -- [Comet plugin for Spark 3.4 / Scala 2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.4_2.13/0.7.0/comet-spark-spark3.4_2.13-0.7.0.jar) -- [Comet plugin for Spark 3.5 / Scala 2.12](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.7.0/comet-spark-spark3.5_2.12-0.7.0.jar) -- [Comet plugin for Spark 3.5 / Scala 2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.13/0.7.0/comet-spark-spark3.5_2.13-0.7.0.jar) +- [Comet plugin for Spark 3.4 / Scala 2.12](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.4_2.12/0.9.0/comet-spark-spark3.4_2.12-0.9.0.jar) +- [Comet plugin for Spark 3.4 / Scala 2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.4_2.13/0.9.0/comet-spark-spark3.4_2.13-0.9.0.jar) +- [Comet plugin for Spark 3.5 / Scala 2.12](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.9.0/comet-spark-spark3.5_2.12-0.9.0.jar) +- [Comet plugin for Spark 3.5 / Scala 2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.13/0.9.0/comet-spark-spark3.5_2.13-0.9.0.jar) +- [Comet plugin for Spark 4.0 / Scala 2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark4.0_2.13/0.9.0/comet-spark-spark4.0_2.13-0.9.0.jar) ## Building from source diff --git a/docs/source/user-guide/kubernetes.md b/docs/source/user-guide/kubernetes.md index aa6ae37ae1..f695f1ce7f 100644 --- a/docs/source/user-guide/kubernetes.md +++ b/docs/source/user-guide/kubernetes.md @@ -66,13 +66,13 @@ metadata: spec: type: Scala mode: cluster - image: apache/datafusion-comet:0.7.0-spark3.5.5-scala2.12-java11 + image: apache/datafusion-comet:0.9.0-spark3.5.5-scala2.12-java11 imagePullPolicy: IfNotPresent mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar sparkConf: - "spark.executor.extraClassPath": "/opt/spark/jars/comet-spark-spark3.5_2.12-0.7.0.jar" - "spark.driver.extraClassPath": "/opt/spark/jars/comet-spark-spark3.5_2.12-0.7.0.jar" + "spark.executor.extraClassPath": "/opt/spark/jars/comet-spark-spark3.5_2.12-0.9.0.jar" + "spark.driver.extraClassPath": "/opt/spark/jars/comet-spark-spark3.5_2.12-0.9.0.jar" "spark.plugins": "org.apache.spark.CometPlugin" "spark.comet.enabled": "true" "spark.comet.exec.enabled": "true" diff --git a/docs/source/user-guide/source.md b/docs/source/user-guide/source.md index b7038d341c..e49323b26d 100644 --- a/docs/source/user-guide/source.md +++ b/docs/source/user-guide/source.md @@ -27,7 +27,7 @@ Official source releases can be downloaded from https://dist.apache.org/repos/di ```console # Pick the latest version -export COMET_VERSION=0.7.0 +export COMET_VERSION=0.9.0 # Download the tarball curl -O "https://dist.apache.org/repos/dist/release/datafusion/datafusion-comet-$COMET_VERSION/apache-datafusion-comet-$COMET_VERSION.tar.gz" # Unpack From d7b9606b1f07e671c2b6d5a7f07c26d2a47848be Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 07:17:29 -0600 Subject: [PATCH 09/13] update --- docs/source/user-guide/kubernetes.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/user-guide/kubernetes.md b/docs/source/user-guide/kubernetes.md index f695f1ce7f..aa6ae37ae1 100644 --- a/docs/source/user-guide/kubernetes.md +++ b/docs/source/user-guide/kubernetes.md @@ -66,13 +66,13 @@ metadata: spec: type: Scala mode: cluster - image: apache/datafusion-comet:0.9.0-spark3.5.5-scala2.12-java11 + image: apache/datafusion-comet:0.7.0-spark3.5.5-scala2.12-java11 imagePullPolicy: IfNotPresent mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar sparkConf: - "spark.executor.extraClassPath": "/opt/spark/jars/comet-spark-spark3.5_2.12-0.9.0.jar" - "spark.driver.extraClassPath": "/opt/spark/jars/comet-spark-spark3.5_2.12-0.9.0.jar" + "spark.executor.extraClassPath": "/opt/spark/jars/comet-spark-spark3.5_2.12-0.7.0.jar" + "spark.driver.extraClassPath": "/opt/spark/jars/comet-spark-spark3.5_2.12-0.7.0.jar" "spark.plugins": "org.apache.spark.CometPlugin" "spark.comet.enabled": "true" "spark.comet.exec.enabled": "true" From 0bf8b3496f22f7798e1eab6191e6e01932c5276b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 07:24:09 -0600 Subject: [PATCH 10/13] update --- docs/source/user-guide/compatibility.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index d94dd65062..84c4aab0e9 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -66,13 +66,13 @@ The `native_datafusion` and `native_iceberg_compat` scans share the following li rather than signed. By default, Comet will fall back to `native_comet` when scanning Parquet files containing `byte` or `short` types (regardless of the logical type). This behavior can be disabled by setting `spark.comet.scan.allowIncompatible=true`. -- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] - No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. The `native_datafusion` scan has some additional limitations: - Bucketed scans are not supported - No support for row indexes +- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] - There are failures in the Spark SQL test suite [#1545] - Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with Spark From ccb12087142a9fe5726e2d4c08729a07ea6e881c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 08:07:35 -0600 Subject: [PATCH 11/13] update --- docs/source/user-guide/iceberg.md | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index af39f58804..6f91f02047 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -44,19 +44,15 @@ export COMET_JAR=`pwd`/spark/target/comet-spark-spark3.5_2.12-0.10.0-SNAPSHOT.ja ## Build Iceberg -Clone the Iceberg repository. +Clone the Iceberg repository and apply code changes needed by Comet ```shell git clone git@github.com:apache/iceberg.git +cd iceberg +git checkout apache-iceberg-1.8.1 +git apply ../datafusion-comet/dev/diffs/iceberg/1.8.1.diff ``` -It will be necessary to make some changes to Iceberg. - -For Iceberg version 1.8.1, the diff file `dev/diffs/iceberg/1.8.1.diff` can be applied. - -See the GitHub workflow `.github/workflows/iceberg_spark_test.yml` for a full example of integrating Comet and -Iceberg and running Iceberg's Spark SQL tests with Comet enabled. - Perform a clean build ```shell From 5f606df47144fad929425146f7c228b9e7b09068 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 08:11:04 -0600 Subject: [PATCH 12/13] update --- docs/source/user-guide/iceberg.md | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index 6f91f02047..b57ac857c1 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -65,7 +65,7 @@ Perform a clean build Set `ICEBERG_JAR` environment variable. ```shell -export ICEBERG_JAR=`pwd`/spark/v3.5/spark-runtime/build/libs/iceberg-spark-runtime-3.5_2.12-1.10.0-SNAPSHOT.jar +export ICEBERG_JAR=`pwd`/spark/v3.5/spark-runtime/build/libs/iceberg-spark-runtime-3.5_2.12-1.9.0-SNAPSHOT.jar ``` Launch Spark Shell: @@ -104,12 +104,6 @@ This should produce the following output: ``` scala> spark.sql(s"SELECT * from t1").show() -25/04/28 07:29:37 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized -25/04/28 07:29:37 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging): - CollectLimit -+- Project [COMET: toprettystring is not supported] - +- CometScanWrapper - +---+---+ | c0| c1| +---+---+ @@ -136,3 +130,12 @@ scala> spark.sql(s"SELECT * from t1").show() +---+---+ only showing top 20 rows ``` + +Confirm that the query was accelerated by Comet: + +```shell +scala> spark.sql(s"SELECT * from t1").explain() +== Physical Plan == +*(1) CometColumnarToRow ++- CometBatchScan spark_catalog.default.t1[c0#26, c1#27] spark_catalog.default.t1 (branch=null) [filters=, groupedBy=] RuntimeFilters: [] +``` \ No newline at end of file From ecd938fc31f2a3015d1bd45626b421425840d7ae Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Jul 2025 08:11:17 -0600 Subject: [PATCH 13/13] update --- docs/source/user-guide/iceberg.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index b57ac857c1..bc507e3a65 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -133,7 +133,7 @@ only showing top 20 rows Confirm that the query was accelerated by Comet: -```shell +``` scala> spark.sql(s"SELECT * from t1").explain() == Physical Plan == *(1) CometColumnarToRow