From cd22629344295713707f3a115c606f0d0f564c0a Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 11 Oct 2021 16:25:10 -0700 Subject: [PATCH 1/5] [SPARK-36647][SQL] Push down Aggregate (Min/Max/Count) for Parquet if filter is on partition col --- .../v2/parquet/ParquetScanBuilder.scala | 8 +++--- .../ParquetAggregatePushDownSuite.scala | 26 ++++++++++++++++++- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala index c579867623e1d..7fe88c8e846da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -130,10 +130,10 @@ case class ParquetScanBuilder( // are combined with filter or group by // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8 // SELECT COUNT(col1) FROM t GROUP BY col2 - // Todo: 1. add support if groupby column is partition col - // (https://issues.apache.org/jira/browse/SPARK-36646) - // 2. add support if filter col is partition col - // (https://issues.apache.org/jira/browse/SPARK-36647) + // However, if the filter or group by is on partition column, + // max/min/count can still be pushed down + // Todo: add support if groupby column is partition col + // (https://issues.apache.org/jira/browse/SPARK-36646) return false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala index c795bd9ff3389..b9461481e1c91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala @@ -221,7 +221,7 @@ abstract class ParquetAggregatePushDownSuite } } - test("aggregate push down - query with filter not push down") { + test("aggregate push down - aggregate with data filter cannot be pushed down") { val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19), (9, "mno", 7), (2, null, 7)) withParquetTable(data, "t") { @@ -240,6 +240,30 @@ abstract class ParquetAggregatePushDownSuite } } + test("aggregate push down - aggregate with partition filter can be pushed down") { + withTempPath { dir => + spark.range(10).selectExpr("id", "id % 3 as p") + .write.partitionBy("p").parquet(dir.getCanonicalPath) + withTempView("tmp") { + spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp"); + val enableVectorizedReader = Seq("false", "true") + for (testVectorizedReader <- enableVectorizedReader) { + withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", + vectorizedReaderEnabledKey -> testVectorizedReader) { + val max = sql("SELECT max(id) FROM tmp WHERE p = 0") + max.queryExecution.optimizedPlan.collect { + case _: DataSourceV2ScanRelation => + val expected_plan_fragment = + "PushedAggregation: [MAX(id)]" + checkKeywordsExistsInExplain(max, expected_plan_fragment) + } + checkAnswer(max, Seq(Row(9))) + } + } + } + } + } + test("aggregate push down - push down only if all the aggregates can be pushed down") { val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19), (9, "mno", 7), (2, null, 7)) From 6cc6f7b9b3b338a8abae56a6a6e83eadcf9db66e Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 12 Oct 2021 19:39:12 -0700 Subject: [PATCH 2/5] address comments --- .../ParquetAggregatePushDownSuite.scala | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala index b9461481e1c91..29e338e069781 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala @@ -129,10 +129,9 @@ abstract class ParquetAggregatePushDownSuite .write.partitionBy("p").parquet(dir.getCanonicalPath) withTempView("tmp") { spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp"); - val enableVectorizedReader = Seq("false", "true") - for (testVectorizedReader <- enableVectorizedReader) { + Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", - vectorizedReaderEnabledKey -> testVectorizedReader) { + vectorizedReaderEnabledKey -> enableVectorizedReader) { val count = sql("SELECT COUNT(p) FROM tmp") count.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => @@ -246,10 +245,9 @@ abstract class ParquetAggregatePushDownSuite .write.partitionBy("p").parquet(dir.getCanonicalPath) withTempView("tmp") { spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp"); - val enableVectorizedReader = Seq("false", "true") - for (testVectorizedReader <- enableVectorizedReader) { + Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", - vectorizedReaderEnabledKey -> testVectorizedReader) { + vectorizedReaderEnabledKey -> enableVectorizedReader) { val max = sql("SELECT max(id) FROM tmp WHERE p = 0") max.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => @@ -380,10 +378,9 @@ abstract class ParquetAggregatePushDownSuite spark.createDataFrame(rdd, schema).write.parquet(file.getCanonicalPath) withTempView("test") { spark.read.parquet(file.getCanonicalPath).createOrReplaceTempView("test") - val enableVectorizedReader = Seq("false", "true") - for (testVectorizedReader <- enableVectorizedReader) { + Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", - vectorizedReaderEnabledKey -> testVectorizedReader) { + vectorizedReaderEnabledKey -> enableVectorizedReader) { val testMinWithTS = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " + "min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " + @@ -501,10 +498,9 @@ abstract class ParquetAggregatePushDownSuite } test("aggregate push down - column name case sensitivity") { - val enableVectorizedReader = Seq("false", "true") - for (testVectorizedReader <- enableVectorizedReader) { + Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", - vectorizedReaderEnabledKey -> testVectorizedReader) { + vectorizedReaderEnabledKey -> enableVectorizedReader) { withTempPath { dir => spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").parquet(dir.getCanonicalPath) From d7ecec4766577ff0a6751e20ac8a8da1ec3810f2 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 12 Oct 2021 21:16:13 -0700 Subject: [PATCH 3/5] address comments --- .../datasources/parquet/ParquetAggregatePushDownSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala index 29e338e069781..fb3f5a8094e04 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala @@ -248,14 +248,14 @@ abstract class ParquetAggregatePushDownSuite Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", vectorizedReaderEnabledKey -> enableVectorizedReader) { - val max = sql("SELECT max(id) FROM tmp WHERE p = 0") + val max = sql("SELECT max(id), min(id), count(id) FROM tmp WHERE p = 0") max.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => val expected_plan_fragment = - "PushedAggregation: [MAX(id)]" + "[MAX(id), MIN(id), COUNT(id)]" checkKeywordsExistsInExplain(max, expected_plan_fragment) } - checkAnswer(max, Seq(Row(9))) + checkAnswer(max, Seq(Row(9, 0, 4))) } } } From 1c961383bdd30826c57653a6896bbada89763f75 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 12 Oct 2021 21:18:11 -0700 Subject: [PATCH 4/5] fix --- .../datasources/parquet/ParquetAggregatePushDownSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala index fb3f5a8094e04..4422a6a6cb7ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala @@ -252,7 +252,7 @@ abstract class ParquetAggregatePushDownSuite max.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => val expected_plan_fragment = - "[MAX(id), MIN(id), COUNT(id)]" + "PushedAggregation: [MAX(id), MIN(id), COUNT(id)]" checkKeywordsExistsInExplain(max, expected_plan_fragment) } checkAnswer(max, Seq(Row(9, 0, 4))) From 1293ae026dcdc05065c13b6e1a8496a7e2e8082c Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 18 Oct 2021 12:07:12 -0700 Subject: [PATCH 5/5] fix comment --- .../execution/datasources/v2/parquet/ParquetScanBuilder.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala index 7fe88c8e846da..d844240a1a3b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -130,8 +130,7 @@ case class ParquetScanBuilder( // are combined with filter or group by // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8 // SELECT COUNT(col1) FROM t GROUP BY col2 - // However, if the filter or group by is on partition column, - // max/min/count can still be pushed down + // However, if the filter is on partition column, max/min/count can still be pushed down // Todo: add support if groupby column is partition col // (https://issues.apache.org/jira/browse/SPARK-36646) return false