Skip to content
Merged
10 changes: 10 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ object CometConf extends ShimCometConf {
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
.toLowerCase(Locale.ROOT))

val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
conf("spark.comet.parquet.respectFilterPushdown")
.doc(
"Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be " +
"respected when running the Spark SQL test suite but the default setting " +
"results in poor performance in Comet when using the new native scans, " +
"disabled by default")
.booleanConf
.createWithDefault(false)

val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.parallel.io.enabled")
.doc(
Expand Down
5 changes: 3 additions & 2 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2798,10 +2798,10 @@ index dd55fcfe42c..a1d390c93d0 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..71ba6533c9d 100644
index ed2e309fa07..a1fb4abe681 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
Expand All @@ -2810,6 +2810,7 @@ index ed2e309fa07..71ba6533c9d 100644
+ conf
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
+ .set("spark.comet.enabled", "true")
+ .set("spark.comet.parquet.respectFilterPushdown", "true")
Copy link
Copy Markdown
Contributor

@kazuyukitanimura kazuyukitanimura Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add .set("spark.comet.parquet.respectFilterPushdown", "true") at a few more locations?
E.g. TestHive.scala
Could be other locations as well

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the Spark SQL tests are passing.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked, and there are no hive tests that reference PARQUET_FILTER_PUSHDOWN_ENABLED.

+
+ if (!isCometScanOnly) {
+ conf
Expand Down
5 changes: 3 additions & 2 deletions dev/diffs/3.5.6.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2770,10 +2770,10 @@ index e937173a590..ca06132102d 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..71ba6533c9d 100644
index ed2e309fa07..a1fb4abe681 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
Expand All @@ -2782,6 +2782,7 @@ index ed2e309fa07..71ba6533c9d 100644
+ conf
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
+ .set("spark.comet.enabled", "true")
+ .set("spark.comet.parquet.respectFilterPushdown", "true")
+
+ if (!isCometScanOnly) {
+ conf
Expand Down
5 changes: 3 additions & 2 deletions dev/diffs/4.0.0-preview1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -3009,10 +3009,10 @@ index 5fbf379644f..d0575e1df69 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..71ba6533c9d 100644
index ed2e309fa07..a1fb4abe681 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
Expand All @@ -3021,6 +3021,7 @@ index ed2e309fa07..71ba6533c9d 100644
+ conf
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
+ .set("spark.comet.enabled", "true")
+ .set("spark.comet.parquet.respectFilterPushdown", "true")
+
+ if (!isCometScanOnly) {
+ conf
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Comet provides the following configuration settings.
| spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between consecutive read ranges below which the parallel reader will try to merge the ranges. The default is 8MB. | 8388608 |
| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true |
| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 |
| spark.comet.parquet.respectFilterPushdown | Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be respected when running the Spark SQL test suite but the default setting results in poor performance in Comet when using the new native scans, disabled by default | false |
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.scan.allowIncompatible | Some Comet scan implementations are not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class CometParquetFileFormat(scanImpl: String)
val optionsMap = CaseInsensitiveMap[String](options)
val parquetOptions = new ParquetOptions(optionsMap, sqlConf)
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val parquetFilterPushDown = sqlConf.parquetFilterPushDown
val parquetFilterPushDown = sqlConf.parquetFilterPushDown &&
CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(sqlConf)

// Comet specific configurations
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
Expand Down
15 changes: 12 additions & 3 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package org.apache.comet.serde
import java.util.Locale

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.math.min

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -2192,9 +2193,17 @@ object QueryPlanSerde extends Logging with CometExprShim {
// Sink operators don't have children
result.clearChildren()

if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED)) {
// TODO remove flatMap and add error handling for unsupported data filters
val dataFilters = scan.dataFilters.flatMap(exprToProto(_, scan.output))
if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED) &&
CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(conf)) {

val dataFilters = new ListBuffer[Expr]()
for (filter <- scan.dataFilters) {
exprToProto(filter, scan.output) match {
case Some(proto) => dataFilters += proto
case _ =>
logWarning(s"Unsupported data filter $filter")
}
}
nativeScanBuilder.addAllDataFilters(dataFilters.asJava)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.time.{ZoneId, ZoneOffset}
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.Breaks.{break, breakable}
import scala.util.control.Breaks.breakable

import org.scalactic.source.Position
import org.scalatest.Tag
Expand Down Expand Up @@ -1902,10 +1902,6 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode,
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushDown.toString) {
if (scanMode == CometConf.SCAN_NATIVE_DATAFUSION && !pushDown) {
// FIXME: native_datafusion always pushdown data filters
break()
}
Seq(
("_1 = true", Math.ceil(rows.toDouble / 2)), // Boolean
("_2 = 1", Math.ceil(rows.toDouble / 256)), // Byte
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ abstract class CometTestBase
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
// set the scan impl to SCAN_NATIVE_COMET because many tests are implemented
Expand Down
Loading