diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala index fcb64e30645e..6282b214d44e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala @@ -59,6 +59,9 @@ case class MinEvaluator(idx: Int, dataField: DataField, evolutions: SimpleStatsE override def update(dataSplit: DataSplit): Unit = { val other = dataSplit.minValue(idx, dataField, evolutions) + if (other == null) { + return + } if (_result == null || CompareUtils.compareLiteral(dataField.`type`(), _result, other) > 0) { _result = other; } @@ -80,6 +83,9 @@ case class MaxEvaluator(idx: Int, dataField: DataField, evolutions: SimpleStatsE override def update(dataSplit: DataSplit): Unit = { val other = dataSplit.maxValue(idx, dataField, evolutions) + if (other == null) { + return + } if (_result == null || CompareUtils.compareLiteral(dataField.`type`(), _result, other) < 0) { _result = other } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala index af99a67b24dc..766fac386ffb 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala @@ -294,4 +294,17 @@ class PushDownAggregatesTest extends PaimonSparkTestBase with AdaptiveSparkPlanH Seq(Row(1, "t1")) ) } + + // https://github.com/apache/paimon/issues/6610 + test("Push down aggregate: aggregate a column in one partition is all null and another is not") { + withTable("T") { + spark.sql("CREATE TABLE T (c1 INT, c2 LONG) PARTITIONED BY(day STRING)") + + spark.sql("INSERT INTO T VALUES (1, 2, '2025-11-10')") + spark.sql("INSERT INTO T VALUES (null, 2, '2025-11-11')") + + runAndCheckAggregate("SELECT MIN(c1) FROM T", Row(1) :: Nil, 0) + runAndCheckAggregate("SELECT MAX(c1) FROM T", Row(1) :: Nil, 0) + } + } }