Skip to content

Only trigger Comet Final aggregation on Comet partial aggregation #262

@viirya

Description

@viirya

Describe the bug

Related to #250.

When enabling columnar shuffle, a partial Spark aggregation could be upstream of a Comet final aggregation, because columnar shuffle could be the scan source of Comet native operator.

But there is query failure on SQLQuerySuite's SPARK-3176 Added Parser of SQL LAST() test.

[info]   == Physical Plan ==                                                                                                                                                                                                           
[info]   AdaptiveSparkPlan isFinalPlan=true                                                                                                                                                                                            
[info]   +- == Final Plan ==                                                                                                                                                                                                           
[info]      *(2) ColumnarToRow                                                                                                                                                                                                         
[info]      +- CometHashAggregate [last#4396, valueSet#4397], Final, [last(n#93, false)]                                                                                                                                               
[info]         +- ShuffleQueryStage 0                                                                                                                                                                                                  [info]            +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10390]                                                                                                                 [info]               +- RowToColumnar                                                                                                                                                                                                  
[info]                  +- *(1) HashAggregate(keys=[], functions=[partial_last(n#93, false)], output=[last#4396, valueSet#4397])                                                                                                       
[info]                     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93]                                                                    
[info]                        +- Scan[obj#92]                                                                                                                                                                                          
[info]   +- == Initial Plan ==                                                                                                                                                                                                         
[info]      CometHashAggregate [last#4396, valueSet#4397], Final, [last(n#93, false)]                                                                                                                                                  
[info]      +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10279]                                                                                                                       
[info]         +- HashAggregate(keys=[], functions=[partial_last(n#93, false)], output=[last#4396, valueSet#4397])                                                                                                                     
[info]            +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93]                                                                                  
[info]               +- Scan[obj#92]                                                                                                                                                                                                   
[info]                                                                                                                                                                                                                                 
[info]   == Results ==                                                                                                                                                                                                                 
[info]                                                                                                                                                                                                                                 
[info]   == Results ==                                                                                                                                                                                                                 
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<last(n):int>
[info]   ![4]                        [2] (QueryTest.scala:243)

It is because the aggregation attributes (i.e., state) of Spark Last aggregation expression are different to DataFusion's Last aggregation expression.

Spark Last has:

override lazy val aggBufferAttributes: Seq[AttributeReference] = last :: valueSet :: Nil

DataFusion's Last:

fn state_fields(&self) -> Result<Vec<Field>> {
        let mut fields = vec![Field::new(
            format_state_name(&self.name, "last_value"),
            self.input_data_type.clone(),
            true,
        )];
        fields.extend(ordering_fields(
            &self.ordering_req,
            &self.order_by_data_types,
        ));
        fields.push(Field::new(
            format_state_name(&self.name, "is_set"),
            DataType::Boolean,
            true,
        ));
        Ok(fields)
    }

I think this kind of issues will be more and more. Re-implementing such aggregation expressions in Comet seems taking too much cost on developing and maintaining. These cases are only happened if partial Spark aggregation cannot be transformed to Comet. In the failed query, it is because its upstream is not Comet plan. I think these cases are not what we care about mostly.

I think we should only have Comet final aggregation if the partial aggregation is Comet. It will simplify the things.

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions