-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Describe the bug
When you have a column that is created using array_agg and the column name is the same as the column name of the original data, calling schema() produces a result that is based on the original data rather than the aggregate data.
To be more clear, if you call something like .aggregate(vec![col("a"), col("b")], vec![array_agg(col("c")).alias("c")]) and you print the schema, followed by a select_columns("c") you will get a different schema displayed. This only happens if the columnc above is a computed value, even if it is as simple as an expression col("c").alias("c").
The minimal example below shows this.
To Reproduce
The following code demonstrates that selecting the column changes the displayed schema.
The input csv is a simple table:
a,b,c
1,2,3
4,5,6
7,8,9
10,11,12
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
let mut df = ctx.read_csv("/Users/tsaucer/working/testing_ballista/lead_lag/example.csv", CsvReadOptions::default()).await?;
println!("Original data:");
df.clone().show().await?;
df = df
.clone()
.with_column("c", col("c"))?
.aggregate(vec![col("a"), col("b")], vec![array_agg(col("c")).alias("c")])?;
println!("\nBefore select\n{}", df.schema().field(2));
println!("{}", df.logical_plan().display_indent());
df = df.select(vec![col("a"), col("b"), col("c")])?;
println!("\nAfter select\n{}", df.schema().field(2));
println!("{}", df.logical_plan().display_indent());
println!("\nFinal data:");
df.show().await?;
Ok(())
}
Produces the following output:
Original data:
+----+----+----+
| a | b | c |
+----+----+----+
| 1 | 2 | 3 |
| 4 | 5 | 6 |
| 7 | 8 | 9 |
| 10 | 11 | 12 |
+----+----+----+
Before select
Field { name: "c", data_type: List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }
Aggregate: groupBy=[[?table?.a, ?table?.b]], aggr=[[ARRAY_AGG(c) AS c]]
Projection: ?table?.a, ?table?.b, ?table?.c AS c
TableScan: ?table?
After select
Field { name: "c", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }
Projection: ?table?.a, ?table?.b, c
Aggregate: groupBy=[[?table?.a, ?table?.b]], aggr=[[ARRAY_AGG(c) AS c]]
Projection: ?table?.a, ?table?.b, ?table?.c AS c
TableScan: ?table?
Final data:
+----+----+------+
| a | b | c |
+----+----+------+
| 7 | 8 | [9] |
| 4 | 5 | [6] |
| 1 | 2 | [3] |
| 10 | 11 | [12] |
+----+----+------+
You can see the final table shown is correct, but the displayed schema is not.
Expected behavior
Schema should remain invariant under trivial select of col or select_columns.
Additional context
No response