From ab214b3008eaeb3c71dca16526a6edaef583dc5b Mon Sep 17 00:00:00 2001 From: xieshuaihu Date: Sat, 15 Nov 2025 18:22:15 +0800 Subject: [PATCH 1/2] [spark]Handle NPE for pushdown aggregate when a datasplit has a null max/min value in --- .../paimon/spark/aggregate/AggFuncEvaluator.scala | 3 ++- .../paimon/spark/sql/PushDownAggregatesTest.scala | 13 +++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala index fcb64e30645e..4fb0c787f7c7 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala @@ -59,6 +59,7 @@ case class MinEvaluator(idx: Int, dataField: DataField, evolutions: SimpleStatsE override def update(dataSplit: DataSplit): Unit = { val other = dataSplit.minValue(idx, dataField, evolutions) + // if (_result == null || (other != null && CompareUtils.compareLiteral(dataField.`type`(), _result, other) > 0)) { if (_result == null || CompareUtils.compareLiteral(dataField.`type`(), _result, other) > 0) { _result = other; } @@ -80,7 +81,7 @@ case class MaxEvaluator(idx: Int, dataField: DataField, evolutions: SimpleStatsE override def update(dataSplit: DataSplit): Unit = { val other = dataSplit.maxValue(idx, dataField, evolutions) - if (_result == null || CompareUtils.compareLiteral(dataField.`type`(), _result, other) < 0) { + if (_result == null || (other != null && CompareUtils.compareLiteral(dataField.`type`(), _result, other) < 0)) { _result = other } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala index af99a67b24dc..351efb3b0204 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala @@ -294,4 +294,17 @@ class PushDownAggregatesTest extends PaimonSparkTestBase with AdaptiveSparkPlanH Seq(Row(1, "t1")) ) } + + // https://github.com/apache/paimon/issues/6610 + test("Push down aggregate - MIN/MAX of a column which in one partition is all null and the other is not") { + withTable("T") { + spark.sql("CREATE TABLE T (c1 INT, c2 LONG) PARTITIONED BY(day STRING)") + + spark.sql("INSERT INTO T VALUES (1, 2, '2025-11-10')") + spark.sql("INSERT INTO T VALUES (null, 2, '2025-11-11')") + + runAndCheckAggregate("SELECT MIN(c1) FROM T", Row(1) :: Nil, 0) + runAndCheckAggregate("SELECT MAX(c1) FROM T", Row(1) :: Nil, 0) + } + } } From 10cc6113b4ac5ee571a1bb510f7ddaa8041e8198 Mon Sep 17 00:00:00 2001 From: xieshuaihu Date: Sat, 15 Nov 2025 18:45:40 +0800 Subject: [PATCH 2/2] make code formatter happy --- .../apache/paimon/spark/aggregate/AggFuncEvaluator.scala | 9 +++++++-- .../apache/paimon/spark/sql/PushDownAggregatesTest.scala | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala index 4fb0c787f7c7..6282b214d44e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala @@ -59,7 +59,9 @@ case class MinEvaluator(idx: Int, dataField: DataField, evolutions: SimpleStatsE override def update(dataSplit: DataSplit): Unit = { val other = dataSplit.minValue(idx, dataField, evolutions) - // if (_result == null || (other != null && CompareUtils.compareLiteral(dataField.`type`(), _result, other) > 0)) { + if (other == null) { + return + } if (_result == null || CompareUtils.compareLiteral(dataField.`type`(), _result, other) > 0) { _result = other; } @@ -81,7 +83,10 @@ case class MaxEvaluator(idx: Int, dataField: DataField, evolutions: SimpleStatsE override def update(dataSplit: DataSplit): Unit = { val other = dataSplit.maxValue(idx, dataField, evolutions) - if (_result == null || (other != null && CompareUtils.compareLiteral(dataField.`type`(), _result, other) < 0)) { + if (other == null) { + return + } + if (_result == null || CompareUtils.compareLiteral(dataField.`type`(), _result, other) < 0) { _result = other } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala index 351efb3b0204..766fac386ffb 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala @@ -296,7 +296,7 @@ class PushDownAggregatesTest extends PaimonSparkTestBase with AdaptiveSparkPlanH } // https://github.com/apache/paimon/issues/6610 - test("Push down aggregate - MIN/MAX of a column which in one partition is all null and the other is not") { + test("Push down aggregate: aggregate a column in one partition is all null and another is not") { withTable("T") { spark.sql("CREATE TABLE T (c1 INT, c2 LONG) PARTITIONED BY(day STRING)")