Skip to content

WindowFunction works with SQL but not with DataFrame #1147

@aguspdana

Description

@aguspdana

Query with WindowFunction return NotImplemented error, but the equivalent SQL query return the correct result.

use datafusion::error::Result;    
use datafusion::prelude::*;    
use datafusion::physical_plan::window_functions; 
   
#[tokio::main]    
async fn main() -> Result<()> {    
    let mut ctx = sigmo_df::utils::create_ctx("log", vec![
        sigmo_df::column::Column::String("id", vec![    
            Some("a"),    
            Some("b"),    
            Some("c"),    
            Some("c")    
        ]),    
        sigmo_df::column::Column::Numeric("value", vec![    
            Some(1.0),    
            Some(2.0),    
            Some(3.0),    
            Some(4.0)    
        ])    
    ])?;    
    
    // SQL                                                                   
    let res = ctx.sql("
        SELECT                                                               
            id,
            first_value(value) over (partition by id)
        FROM log;
    ")?;
    println!("SQL Plan:\n{}", res.to_logical_plan().display());
    res.show().await?;

    // DataFrame
    let df = ctx.table("log")?;
    let first_row = datafusion::logical_plan::Expr::WindowFunction {
        fun: window_functions::WindowFunction::BuiltInWindowFunction(
            window_functions::BuiltInWindowFunction::FirstValue
        ),
        args: vec![col("value")],
        partition_by: vec![col("id")],
        order_by: vec![],
        window_frame: None
    };
    let query = df
       .select(vec![col("id"), first_row])
        .unwrap();                          
    println!("\nDataFrame Plan:\n{}", query.to_logical_plan().display());
    query.show().await?;

    Ok(())
}
SQL Plan:
Projection: #log.id, #FIRST_VALUE(log.value) PARTITION BY [#log.id]
+----+------------------------+
| id | FIRST_VALUE(log.value) |
+----+------------------------+
| a  | 1                      |
| b  | 2                      |
| c  | 3                      |
| c  | 3                      |
+----+------------------------+

DataFrame Plan:
Projection: #log.id, FIRST_VALUE(#log.value) PARTITION BY [#log.id]
Error: NotImplemented("Physical plan does not support logical expression FIRST_VALUE(#log.value) PARTITION BY [#log.id]")

I've also tried using datafusion::physical_plan::aggregates::AggregateFunction::Sum, and it also returned NotImplemented error.

The error maybe triggered at https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/physical_plan/planner.rs#L1105. But I'm not sure why the SQL query succeeded.

My project dependencies:

[dependencies]           
# arrow = "^5.1"
datafusion = { git = "https://github.com/apache/arrow-datafusion" }
tokio = { version = "1.12.0", features = ["full"] }
chrono = "0.4"

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions