diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 555beb5cbb..ee18fb9504 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -106,6 +106,16 @@ object CometConf extends ShimCometConf { .getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET) .toLowerCase(Locale.ROOT)) + val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] = + conf("spark.comet.parquet.respectFilterPushdown") + .doc( + "Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be " + + "respected when running the Spark SQL test suite but the default setting " + + "results in poor performance in Comet when using the new native scans, " + + "disabled by default") + .booleanConf + .createWithDefault(false) + val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.parquet.read.parallel.io.enabled") .doc( diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index c4d318e0bd..e08aea606b 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2798,10 +2798,10 @@ index dd55fcfe42c..a1d390c93d0 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index ed2e309fa07..71ba6533c9d 100644 +index ed2e309fa07..a1fb4abe681 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -@@ -74,6 +74,31 @@ trait SharedSparkSessionBase +@@ -74,6 +74,32 @@ trait SharedSparkSessionBase // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) @@ -2810,6 +2810,7 @@ index ed2e309fa07..71ba6533c9d 100644 + conf + .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions") + .set("spark.comet.enabled", "true") ++ .set("spark.comet.parquet.respectFilterPushdown", "true") + + if (!isCometScanOnly) { + conf diff --git a/dev/diffs/3.5.6.diff b/dev/diffs/3.5.6.diff index 98de916d0f..883a16715a 100644 --- a/dev/diffs/3.5.6.diff +++ b/dev/diffs/3.5.6.diff @@ -2770,10 +2770,10 @@ index e937173a590..ca06132102d 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index ed2e309fa07..71ba6533c9d 100644 +index ed2e309fa07..a1fb4abe681 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -@@ -74,6 +74,31 @@ trait SharedSparkSessionBase +@@ -74,6 +74,32 @@ trait SharedSparkSessionBase // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) @@ -2782,6 +2782,7 @@ index ed2e309fa07..71ba6533c9d 100644 + conf + .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions") + .set("spark.comet.enabled", "true") ++ .set("spark.comet.parquet.respectFilterPushdown", "true") + + if (!isCometScanOnly) { + conf diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff index 8686b4456e..e57a245f04 100644 --- a/dev/diffs/4.0.0-preview1.diff +++ b/dev/diffs/4.0.0-preview1.diff @@ -3009,10 +3009,10 @@ index 5fbf379644f..d0575e1df69 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index ed2e309fa07..71ba6533c9d 100644 +index ed2e309fa07..a1fb4abe681 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -@@ -74,6 +74,31 @@ trait SharedSparkSessionBase +@@ -74,6 +74,32 @@ trait SharedSparkSessionBase // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) @@ -3021,6 +3021,7 @@ index ed2e309fa07..71ba6533c9d 100644 + conf + .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions") + .set("spark.comet.enabled", "true") ++ .set("spark.comet.parquet.respectFilterPushdown", "true") + + if (!isCometScanOnly) { + conf diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 96bd7cec18..6544909aae 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,6 +84,7 @@ Comet provides the following configuration settings. | spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between consecutive read ranges below which the parallel reader will try to merge the ranges. The default is 8MB. | 8388608 | | spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true | | spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 | +| spark.comet.parquet.respectFilterPushdown | Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be respected when running the Spark SQL test suite but the default setting results in poor performance in Comet when using the new native scans, disabled by default | false | | spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.scan.allowIncompatible | Some Comet scan implementations are not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true | diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala index 871ac2704d..5c2de2a6b3 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala @@ -99,7 +99,8 @@ class CometParquetFileFormat(scanImpl: String) val optionsMap = CaseInsensitiveMap[String](options) val parquetOptions = new ParquetOptions(optionsMap, sqlConf) val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead - val parquetFilterPushDown = sqlConf.parquetFilterPushDown + val parquetFilterPushDown = sqlConf.parquetFilterPushDown && + CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(sqlConf) // Comet specific configurations val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 1e3cec2852..e445cbb7c5 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -22,6 +22,7 @@ package org.apache.comet.serde import java.util.Locale import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import scala.math.min import org.apache.spark.internal.Logging @@ -2192,9 +2193,17 @@ object QueryPlanSerde extends Logging with CometExprShim { // Sink operators don't have children result.clearChildren() - if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED)) { - // TODO remove flatMap and add error handling for unsupported data filters - val dataFilters = scan.dataFilters.flatMap(exprToProto(_, scan.output)) + if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED) && + CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(conf)) { + + val dataFilters = new ListBuffer[Expr]() + for (filter <- scan.dataFilters) { + exprToProto(filter, scan.output) match { + case Some(proto) => dataFilters += proto + case _ => + logWarning(s"Unsupported data filter $filter") + } + } nativeScanBuilder.addAllDataFilters(dataFilters.asJava) } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 4801678a46..581c606115 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -26,7 +26,7 @@ import java.time.{ZoneId, ZoneOffset} import scala.collection.mutable.ListBuffer import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag -import scala.util.control.Breaks.{break, breakable} +import scala.util.control.Breaks.breakable import org.scalactic.source.Position import org.scalatest.Tag @@ -1902,10 +1902,6 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode, SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushDown.toString) { - if (scanMode == CometConf.SCAN_NATIVE_DATAFUSION && !pushDown) { - // FIXME: native_datafusion always pushdown data filters - break() - } Seq( ("_1 = true", Math.ceil(rows.toDouble / 2)), // Boolean ("_2 = 1", Math.ceil(rows.toDouble / 256)), // Byte diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 05c46e307c..a2663cf0b9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -78,6 +78,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") + conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") // set the scan impl to SCAN_NATIVE_COMET because many tests are implemented