diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 84d285fc2509d..b2b912d8add20 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -85,6 +85,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::LexOrdering; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; +use datafusion_physical_plan::unnest::ListUnnest; use datafusion_sql::utils::window_expr_common_partition_keys; use async_trait::async_trait; @@ -848,9 +849,16 @@ impl DefaultPhysicalPlanner { }) => { let input = children.one()?; let schema = SchemaRef::new(schema.as_ref().to_owned().into()); + let list_column_indices = list_type_columns + .iter() + .map(|(index, unnesting)| ListUnnest { + index_in_input_schema: *index, + depth: unnesting.depth, + }) + .collect(); Arc::new(UnnestExec::new( input, - list_type_columns.clone(), + list_column_indices, struct_type_columns.clone(), schema, options.clone(), diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 171ef9561e55c..3520ab8fed2b3 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1391,7 +1391,7 @@ async fn unnest_with_redundant_columns() -> Result<()> { let optimized_plan = df.clone().into_optimized_plan()?; let expected = vec![ "Projection: shapes.shape_id [shape_id:UInt32]", - " Unnest: lists[shape_id2] structs[] [shape_id:UInt32, shape_id2:UInt32;N]", + " Unnest: lists[shape_id2|depth=1] structs[] [shape_id:UInt32, shape_id2:UInt32;N]", " Aggregate: groupBy=[[shapes.shape_id]], aggr=[[array_agg(shapes.shape_id) AS shape_id2]] [shape_id:UInt32, shape_id2:List(Field { name: \"item\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} });N]", " TableScan: shapes projection=[shape_id] [shape_id:UInt32]", ]; diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index c141324962037..02a2edb98016d 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -2346,8 +2346,7 @@ impl fmt::Display for Expr { }, Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"), Expr::Unnest(Unnest { expr }) => { - // TODO: use Display instead of Debug, there is non-unique expression name in projection issue. - write!(f, "UNNEST({expr:?})") + write!(f, "UNNEST({expr})") } } } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index f4dad085422b1..c38ce2d02fd24 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -54,6 +54,8 @@ use datafusion_common::{ TableReference, ToDFSchema, UnnestOptions, }; +use super::plan::{ColumnUnnestList, ColumnUnnestType}; + /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -1173,7 +1175,7 @@ impl LogicalPlanBuilder { ) -> Result { unnest_with_options( Arc::unwrap_or_clone(self.plan), - vec![column.into()], + vec![(column.into(), ColumnUnnestType::Inferred)], options, ) .map(Self::new) @@ -1184,6 +1186,26 @@ impl LogicalPlanBuilder { self, columns: Vec, options: UnnestOptions, + ) -> Result { + unnest_with_options( + Arc::unwrap_or_clone(self.plan), + columns + .into_iter() + .map(|c| (c, ColumnUnnestType::Inferred)) + .collect(), + options, + ) + .map(Self::new) + } + + /// Unnest the given columns with the given [`UnnestOptions`] + /// if one column is a list type, it can be recursively and simultaneously + /// unnested into the desired recursion levels + /// e.g select unnest(list_col,depth=1), unnest(list_col,depth=2) + pub fn unnest_columns_recursive_with_options( + self, + columns: Vec<(Column, ColumnUnnestType)>, + options: UnnestOptions, ) -> Result { unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options) .map(Self::new) @@ -1561,7 +1583,61 @@ impl TableSource for LogicalTableSource { /// Create a [`LogicalPlan::Unnest`] plan pub fn unnest(input: LogicalPlan, columns: Vec) -> Result { - unnest_with_options(input, columns, UnnestOptions::default()) + let unnestings = columns + .into_iter() + .map(|c| (c, ColumnUnnestType::Inferred)) + .collect(); + unnest_with_options(input, unnestings, UnnestOptions::default()) +} + +pub fn get_unnested_list_datatype_recursive( + data_type: &DataType, + depth: usize, +) -> Result { + match data_type { + DataType::List(field) + | DataType::FixedSizeList(field, _) + | DataType::LargeList(field) => { + if depth == 1 { + return Ok(field.data_type().clone()); + } + return get_unnested_list_datatype_recursive(field.data_type(), depth - 1); + } + _ => {} + }; + + internal_err!("trying to unnest on invalid data type {:?}", data_type) +} + +/// Infer the unnest type based on the data type: +/// - list type: infer to unnest(list(col, depth=1)) +/// - struct type: infer to unnest(struct) +fn infer_unnest_type( + col_name: &String, + data_type: &DataType, +) -> Result { + match data_type { + DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => { + Ok(ColumnUnnestType::List(vec![ColumnUnnestList { + output_column: Column::from_name(col_name), + depth: 1, + }])) + } + DataType::Struct(_) => Ok(ColumnUnnestType::Struct), + _ => { + internal_err!("trying to unnest on invalid data type {:?}", data_type) + } + } +} + +pub fn get_struct_unnested_columns( + col_name: &String, + inner_fields: &Fields, +) -> Vec { + inner_fields + .iter() + .map(|f| Column::from_name(format!("{}.{}", col_name, f.name()))) + .collect() } // Based on data type, either struct or a variant of list @@ -1570,19 +1646,20 @@ pub fn unnest(input: LogicalPlan, columns: Vec) -> Result { // For example, given a column with name "a", // - List(Element) returns ["a"] with data type Element // - Struct(field1, field2) returns ["a.field1","a.field2"] +// For list data type, an argument depth is used to specify +// the recursion level pub fn get_unnested_columns( col_name: &String, data_type: &DataType, + depth: usize, ) -> Result)>> { let mut qualified_columns = Vec::with_capacity(1); match data_type { - DataType::List(field) - | DataType::FixedSizeList(field, _) - | DataType::LargeList(field) => { + DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => { + let data_type = get_unnested_list_datatype_recursive(data_type, depth)?; let new_field = Arc::new(Field::new( - col_name, - field.data_type().clone(), + col_name, data_type, // Unnesting may produce NULLs even if the list is not null. // For example: unnset([1], []) -> 1, null true, @@ -1611,52 +1688,126 @@ pub fn get_unnested_columns( } /// Create a [`LogicalPlan::Unnest`] plan with options +/// This function receive a list of columns to be unnested +/// because multiple unnest can be performed on the same column (e.g unnest with different depth) +/// The new schema will contains post-unnest fields replacing the original field +/// +/// For example: +/// Input schema as +/// ```text +/// +---------------------+-------------------+ +/// | col1 | col2 | +/// +---------------------+-------------------+ +/// | Struct(INT64,INT32) | List(List(Int64)) | +/// +---------------------+-------------------+ +/// ``` +/// +/// +/// +/// Then unnesting columns with: +/// - (col1,Struct) +/// - (col2,List(\[depth=1,depth=2\])) +/// +/// will generate a new schema as +/// ```text +/// +---------+---------+---------------------+---------------------+ +/// | col1.c0 | col1.c1 | unnest_col2_depth_1 | unnest_col2_depth_2 | +/// +---------+---------+---------------------+---------------------+ +/// | Int64 | Int32 | List(Int64) | Int64 | +/// +---------+---------+---------------------+---------------------+ +/// ``` pub fn unnest_with_options( input: LogicalPlan, - columns: Vec, + columns_to_unnest: Vec<(Column, ColumnUnnestType)>, options: UnnestOptions, ) -> Result { - let mut list_columns = Vec::with_capacity(columns.len()); - let mut struct_columns = Vec::with_capacity(columns.len()); - let column_by_original_index = columns + let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![]; + let mut struct_columns = vec![]; + let indices_to_unnest = columns_to_unnest .iter() - .map(|c| Ok((input.schema().index_of_column(c)?, c))) - .collect::>>()?; + .map(|col_unnesting| { + Ok(( + input.schema().index_of_column(&col_unnesting.0)?, + col_unnesting, + )) + }) + .collect::>>()?; let input_schema = input.schema(); let mut dependency_indices = vec![]; // Transform input schema into new schema - // e.g int, unnest([]int), unnest(struct(varchar,varchar)) - // becomes int, int, varchar, varchar + // Given this comprehensive example + // + // input schema: + // 1.col1_unnest_placeholder: list[list[int]], + // 2.col1: list[list[int]] + // 3.col2: list[int] + // with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1) + // output schema: + // 1.unnest_col1_depth_2: int + // 2.unnest_col1_depth_1: list[int] + // 3.col1: list[list[int]] + // 4.unnest_col2_depth_1: int + // Meaning the placeholder column will be replaced by its unnested variation(s), note + // the plural. let fields = input_schema .iter() .enumerate() .map(|(index, (original_qualifier, original_field))| { - match column_by_original_index.get(&index) { - Some(&column_to_unnest) => { - let flatten_columns = get_unnested_columns( - &column_to_unnest.name, - original_field.data_type(), - )?; - match original_field.data_type() { - DataType::List(_) - | DataType::FixedSizeList(_, _) - | DataType::LargeList(_) => list_columns.push(index), - DataType::Struct(_) => struct_columns.push(index), - _ => { - panic!( - "not reachable, should be caught by get_unnested_columns" - ) - } + match indices_to_unnest.get(&index) { + Some((column_to_unnest, unnest_type)) => { + let mut inferred_unnest_type = unnest_type.clone(); + if let ColumnUnnestType::Inferred = unnest_type { + inferred_unnest_type = infer_unnest_type( + &column_to_unnest.name, + original_field.data_type(), + )?; } + let transformed_columns: Vec<(Column, Arc)> = + match inferred_unnest_type { + ColumnUnnestType::Struct => { + struct_columns.push(index); + get_unnested_columns( + &column_to_unnest.name, + original_field.data_type(), + 1, + )? + } + ColumnUnnestType::List(unnest_lists) => { + list_columns.extend( + unnest_lists + .iter() + .map(|ul| (index, ul.to_owned().clone())), + ); + unnest_lists + .iter() + .map( + |ColumnUnnestList { + output_column, + depth, + }| { + get_unnested_columns( + &output_column.name, + original_field.data_type(), + *depth, + ) + }, + ) + .collect::)>>>>()? + .into_iter() + .flatten() + .collect::>() + } + _ => return internal_err!("Invalid unnest type"), + }; // new columns dependent on the same original index dependency_indices - .extend(std::iter::repeat(index).take(flatten_columns.len())); - Ok(flatten_columns + .extend(std::iter::repeat(index).take(transformed_columns.len())); + Ok(transformed_columns .iter() - .map(|col: &(Column, Arc)| { - (col.0.relation.to_owned(), col.1.to_owned()) + .map(|(col, data_type)| { + (col.relation.to_owned(), data_type.to_owned()) }) .collect()) } @@ -1682,7 +1833,7 @@ pub fn unnest_with_options( Ok(LogicalPlan::Unnest(Unnest { input: Arc::new(input), - exec_columns: columns, + exec_columns: columns_to_unnest, list_type_columns: list_columns, struct_type_columns: struct_columns, dependency_indices, @@ -1693,6 +1844,7 @@ pub fn unnest_with_options( #[cfg(test)] mod tests { + use super::*; use crate::logical_plan::StringifiedPlan; use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery}; @@ -2039,7 +2191,7 @@ mod tests { .build()?; let expected = "\ - Unnest: lists[test_table.strings] structs[]\ + Unnest: lists[test_table.strings|depth=1] structs[]\ \n TableScan: test_table"; assert_eq!(expected, format!("{plan}")); @@ -2075,8 +2227,8 @@ mod tests { let expected = "\ Unnest: lists[] structs[test_table.struct_singular]\ - \n Unnest: lists[test_table.structs] structs[]\ - \n Unnest: lists[test_table.strings] structs[]\ + \n Unnest: lists[test_table.structs|depth=1] structs[]\ + \n Unnest: lists[test_table.strings|depth=1] structs[]\ \n TableScan: test_table"; assert_eq!(expected, format!("{plan}")); @@ -2084,17 +2236,18 @@ mod tests { let field = plan.schema().field_with_name(None, "structs").unwrap(); assert!(matches!(field.data_type(), DataType::Struct(_))); - // Unnesting multiple fields at the same time + // Unnesting multiple fields at the same time, using infer syntax let cols = vec!["strings", "structs", "struct_singular"] .into_iter() .map(|c| c.into()) .collect(); + let plan = nested_table_scan("test_table")? .unnest_columns_with_options(cols, UnnestOptions::default())? .build()?; let expected = "\ - Unnest: lists[test_table.strings, test_table.structs] structs[test_table.struct_singular]\ + Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]\ \n TableScan: test_table"; assert_eq!(expected, format!("{plan}")); @@ -2102,6 +2255,57 @@ mod tests { let plan = nested_table_scan("test_table")?.unnest_column("missing"); assert!(plan.is_err()); + // Simultaneously unnesting a list (with different depth) and a struct column + let plan = nested_table_scan("test_table")? + .unnest_columns_recursive_with_options( + vec![ + ( + "stringss".into(), + ColumnUnnestType::List(vec![ + ColumnUnnestList { + output_column: Column::from_name("stringss_depth_1"), + depth: 1, + }, + ColumnUnnestList { + output_column: Column::from_name("stringss_depth_2"), + depth: 2, + }, + ]), + ), + ("struct_singular".into(), ColumnUnnestType::Inferred), + ], + UnnestOptions::default(), + )? + .build()?; + + let expected = "\ + Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]\ + \n TableScan: test_table"; + assert_eq!(expected, format!("{plan}")); + + // Check output columns has correct type + let field = plan + .schema() + .field_with_name(None, "stringss_depth_1") + .unwrap(); + assert_eq!( + &DataType::new_list(DataType::Utf8, false), + field.data_type() + ); + let field = plan + .schema() + .field_with_name(None, "stringss_depth_2") + .unwrap(); + assert_eq!(&DataType::Utf8, field.data_type()); + // unnesting struct is still correct + for field_name in &["a", "b"] { + let field = plan + .schema() + .field_with_name(None, &format!("struct_singular.{}", field_name)) + .unwrap(); + assert_eq!(&DataType::UInt32, field.data_type()); + } + Ok(()) } @@ -2117,6 +2321,7 @@ mod tests { false, ); let string_field = Field::new("item", DataType::Utf8, false); + let strings_field = Field::new_list("item", string_field.clone(), false); let schema = Schema::new(vec![ Field::new("scalar", DataType::UInt32, false), Field::new_list("strings", string_field, false), @@ -2129,6 +2334,7 @@ mod tests { ])), false, ), + Field::new_list("stringss", strings_field, false), ]); table_scan(Some(table_name), &schema, None) diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 5a881deb54e17..26d54803d4036 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -643,8 +643,14 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { let input_columns = plan.schema().columns(); let list_type_columns = list_col_indices .iter() - .map(|i| &input_columns[*i]) - .collect::>(); + .map(|(i, unnest_info)| { + format!( + "{}|depth={:?}", + &input_columns[*i].to_string(), + unnest_info.depth + ) + }) + .collect::>(); let struct_type_columns = struct_col_indices .iter() .map(|i| &input_columns[*i]) diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 5b5a842fa4cf8..a189d4635e001 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -35,11 +35,11 @@ pub use ddl::{ }; pub use dml::{DmlStatement, WriteOp}; pub use plan::{ - projection_schema, Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, - DistinctOn, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, - JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection, - RecursiveQuery, Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, - TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, + projection_schema, Aggregate, Analyze, ColumnUnnestList, ColumnUnnestType, CrossJoin, + DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, Extension, Filter, Join, + JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, + Projection, RecursiveQuery, Repartition, Sort, StringifiedPlan, Subquery, + SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, }; pub use statement::{ SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6a88382061cdb..443d23804adb2 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1974,8 +1974,10 @@ impl LogicalPlan { let input_columns = plan.schema().columns(); let list_type_columns = list_col_indices .iter() - .map(|i| &input_columns[*i]) - .collect::>(); + .map(|(i,unnest_info)| + format!("{}|depth={}", &input_columns[*i].to_string(), + unnest_info.depth)) + .collect::>(); let struct_type_columns = struct_col_indices .iter() .map(|i| &input_columns[*i]) @@ -3298,6 +3300,70 @@ pub enum Partitioning { DistributeBy(Vec), } +/// Represents the unnesting operation on a column based on the context (a known struct +/// column, a list column, or let the planner infer the unnesting type). +/// +/// The inferred unnesting type works for both struct and list column, but the unnesting +/// will only be done once (depth = 1). In case recursion is needed on a multi-dimensional +/// list type, use [`ColumnUnnestList`] +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)] +pub enum ColumnUnnestType { + // Unnesting a list column, a vector of ColumnUnnestList is used because + // a column can be unnested at different levels, resulting different output columns + List(Vec), + // for struct, there can only be one unnest performed on one column at a time + Struct, + // Infer the unnest type based on column schema + // If column is a list column, the unnest depth will be 1 + // This value is to support sugar syntax of old api in Dataframe (unnest(either_list_or_struct_column)) + Inferred, +} + +impl fmt::Display for ColumnUnnestType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ColumnUnnestType::List(lists) => { + let list_strs: Vec = + lists.iter().map(|list| list.to_string()).collect(); + write!(f, "List([{}])", list_strs.join(", ")) + } + ColumnUnnestType::Struct => write!(f, "Struct"), + ColumnUnnestType::Inferred => write!(f, "Inferred"), + } + } +} + +/// Represent the unnesting operation on a list column, such as the recursion depth and +/// the output column name after unnesting +/// +/// Example: given `ColumnUnnestList { output_column: "output_name", depth: 2 }` +/// +/// ```text +/// input output_name +/// ┌─────────┐ ┌─────────┐ +/// │{{1,2}} │ │ 1 │ +/// ├─────────┼─────►├─────────┤ +/// │{{3}} │ │ 2 │ +/// ├─────────┤ ├─────────┤ +/// │{{4},{5}}│ │ 3 │ +/// └─────────┘ ├─────────┤ +/// │ 4 │ +/// ├─────────┤ +/// │ 5 │ +/// └─────────┘ +/// ``` +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)] +pub struct ColumnUnnestList { + pub output_column: Column, + pub depth: usize, +} + +impl fmt::Display for ColumnUnnestList { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}|depth={}", self.output_column, self.depth) + } +} + /// Unnest a column that contains a nested list type. See /// [`UnnestOptions`] for more details. #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -3305,10 +3371,10 @@ pub struct Unnest { /// The incoming logical plan pub input: Arc, /// Columns to run unnest on, can be a list of (List/Struct) columns - pub exec_columns: Vec, + pub exec_columns: Vec<(Column, ColumnUnnestType)>, /// refer to the indices(in the input schema) of columns /// that have type list to run unnest on - pub list_type_columns: Vec, + pub list_type_columns: Vec<(usize, ColumnUnnestList)>, /// refer to the indices (in the input schema) of columns /// that have type struct to run unnest on pub struct_type_columns: Vec, @@ -3329,10 +3395,10 @@ impl PartialOrd for Unnest { /// The incoming logical plan pub input: &'a Arc, /// Columns to run unnest on, can be a list of (List/Struct) columns - pub exec_columns: &'a Vec, + pub exec_columns: &'a Vec<(Column, ColumnUnnestType)>, /// refer to the indices(in the input schema) of columns /// that have type list to run unnest on - pub list_type_columns: &'a Vec, + pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>, /// refer to the indices (in the input schema) of columns /// that have type struct to run unnest on pub struct_type_columns: &'a Vec, diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 0964fb6018793..8ba68697bd4d7 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -497,7 +497,7 @@ impl LogicalPlan { let exprs = columns .iter() - .map(|c| Expr::Column(c.clone())) + .map(|(c, _)| Expr::Column(c.clone())) .collect::>(); exprs.iter().apply_until_stop(f) } diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index a306ff7d2dbc3..b3f4e8c7d9dc1 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -743,7 +743,9 @@ impl OptimizerRule for PushDownFilter { let mut accum: HashSet = HashSet::new(); expr_to_columns(&predicate, &mut accum)?; - if unnest.exec_columns.iter().any(|c| accum.contains(c)) { + if unnest.list_type_columns.iter().any(|(_, unnest_list)| { + accum.contains(&unnest_list.output_column) + }) { unnest_predicates.push(predicate); } else { non_unnest_predicates.push(predicate); diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index b99d0d8388709..50af6b4960a50 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -17,6 +17,7 @@ //! Define a plan for unnesting values in columns that contain a list type. +use std::cmp::{self, Ordering}; use std::collections::HashMap; use std::{any::Any, sync::Arc}; @@ -42,7 +43,6 @@ use datafusion_common::{ exec_datafusion_err, exec_err, internal_err, Result, UnnestOptions, }; use datafusion_execution::TaskContext; -use datafusion_expr::ColumnarValue; use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; @@ -63,7 +63,7 @@ pub struct UnnestExec { /// The schema once the unnest is applied schema: SchemaRef, /// indices of the list-typed columns in the input schema - list_column_indices: Vec, + list_column_indices: Vec, /// indices of the struct-typed columns in the input schema struct_column_indices: Vec, /// Options @@ -78,7 +78,7 @@ impl UnnestExec { /// Create a new [UnnestExec]. pub fn new( input: Arc, - list_column_indices: Vec, + list_column_indices: Vec, struct_column_indices: Vec, schema: SchemaRef, options: UnnestOptions, @@ -116,7 +116,7 @@ impl UnnestExec { } /// indices of the list-typed columns in the input schema - pub fn list_column_indices(&self) -> &[usize] { + pub fn list_column_indices(&self) -> &[ListUnnest] { &self.list_column_indices } @@ -245,8 +245,10 @@ struct UnnestStream { input: SendableRecordBatchStream, /// Unnested schema schema: Arc, - /// The unnest columns - list_type_columns: Vec, + /// represents all unnest operations to be applied to the input (input index, depth) + /// e.g unnest(col1),unnest(unnest(col1)) where col1 has index 1 in original input schema + /// then list_type_columns = [ListUnnest{1,1},ListUnnest{1,2}] + list_type_columns: Vec, struct_column_indices: HashSet, /// Options options: UnnestOptions, @@ -358,61 +360,292 @@ fn flatten_struct_cols( Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?) } +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub struct ListUnnest { + pub index_in_input_schema: usize, + pub depth: usize, +} + +/// This function is used to execute the unnesting on multiple columns all at once, but +/// one level at a time, and is called n times, where n is the highest recursion level among +/// the unnest exprs in the query. +/// +/// For example giving the following query: +/// ```sql +/// select unnest(colA, max_depth:=3) as P1, unnest(colA,max_depth:=2) as P2, unnest(colB, max_depth:=1) as P3 from temp; +/// ``` +/// Then the total times this function being called is 3 +/// +/// It needs to be aware of which level the current unnesting is, because if there exists +/// multiple unnesting on the same column, but with different recursion levels, say +/// **unnest(colA, max_depth:=3)** and **unnest(colA, max_depth:=2)**, then the unnesting +/// of expr **unnest(colA, max_depth:=3)** will start at level 3, while unnesting for expr +/// **unnest(colA, max_depth:=2)** has to start at level 2 +/// +/// Set *colA* as a 3-dimension columns and *colB* as an array (1-dimension). As stated, +/// this function is called with the descending order of recursion depth +/// +/// Depth = 3 +/// - colA(3-dimension) unnest into temp column temp_P1(2_dimension) (unnesting of P1 starts +/// from this level) +/// - colA(3-dimension) having indices repeated by the unnesting operation above +/// - colB(1-dimension) having indices repeated by the unnesting operation above +/// +/// Depth = 2 +/// - temp_P1(2-dimension) unnest into temp column temp_P1(1-dimension) +/// - colA(3-dimension) unnest into temp column temp_P2(2-dimension) (unnesting of P2 starts +/// from this level) +/// - colB(1-dimension) having indices repeated by the unnesting operation above +/// +/// Depth = 1 +/// - temp_P1(1-dimension) unnest into P1 +/// - temp_P2(2-dimension) unnest into P2 +/// - colB(1-dimension) unnest into P3 (unnesting of P3 starts from this level) +/// +/// The returned array will has the same size as the input batch +/// and only contains original columns that are not being unnested. +fn list_unnest_at_level( + batch: &[ArrayRef], + list_type_unnests: &[ListUnnest], + temp_unnested_arrs: &mut HashMap, + level_to_unnest: usize, + options: &UnnestOptions, +) -> Result<(Vec, usize)> { + // extract unnestable columns at this level + let (arrs_to_unnest, list_unnest_specs): (Vec>, Vec<_>) = + list_type_unnests + .iter() + .filter_map(|unnesting| { + if level_to_unnest == unnesting.depth { + return Some(( + Arc::clone(&batch[unnesting.index_in_input_schema]), + *unnesting, + )); + } + // this means the unnesting on this item has started at higher level + // and need to continue until depth reaches 1 + if level_to_unnest < unnesting.depth { + return Some(( + Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()), + *unnesting, + )); + } + None + }) + .unzip(); + + // filter out so that list_arrays only contain column with the highest depth + // at the same time, during iteration remove this depth so next time we don't have to unnest them again + let longest_length = find_longest_length(&arrs_to_unnest, options)?; + let unnested_length = longest_length.as_primitive::(); + let total_length = if unnested_length.is_empty() { + 0 + } else { + sum(unnested_length).ok_or_else(|| { + exec_datafusion_err!("Failed to calculate the total unnested length") + })? as usize + }; + if total_length == 0 { + return Ok((vec![], 0)); + } + + // Unnest all the list arrays + let unnested_temp_arrays = + unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?; + + // Create the take indices array for other columns + let take_indices = create_take_indicies(unnested_length, total_length); + + // dimension of arrays in batch is untouch, but the values are repeated + // as the side effect of unnesting + let ret = repeat_arrs_from_indices(batch, &take_indices)?; + unnested_temp_arrays + .into_iter() + .zip(list_unnest_specs.iter()) + .for_each(|(flatten_arr, unnesting)| { + temp_unnested_arrs.insert(*unnesting, flatten_arr); + }); + Ok((ret, total_length)) +} +struct UnnestingResult { + arr: ArrayRef, + depth: usize, +} + /// For each row in a `RecordBatch`, some list/struct columns need to be unnested. /// - For list columns: We will expand the values in each list into multiple rows, /// taking the longest length among these lists, and shorter lists are padded with NULLs. /// - For struct columns: We will expand the struct columns into multiple subfield columns. /// /// For columns that don't need to be unnested, repeat their values until reaching the longest length. +/// +/// Note: unnest has a big difference in behavior between Postgres and DuckDB +/// +/// Take this example +/// +/// 1. Postgres +/// ```ignored +/// create table temp ( +/// i integer[][][], j integer[] +/// ) +/// insert into temp values ('{{{1,2},{3,4}},{{5,6},{7,8}}}', '{1,2}'); +/// select unnest(i), unnest(j) from temp; +/// ``` +/// +/// Result +/// ```text +/// 1 1 +/// 2 2 +/// 3 +/// 4 +/// 5 +/// 6 +/// 7 +/// 8 +/// ``` +/// 2. DuckDB +/// ```ignore +/// create table temp (i integer[][][], j integer[]); +/// insert into temp values ([[[1,2],[3,4]],[[5,6],[7,8]]], [1,2]); +/// select unnest(i,recursive:=true), unnest(j,recursive:=true) from temp; +/// ``` +/// Result: +/// ```text +/// +/// ┌────────────────────────────────────────────────┬────────────────────────────────────────────────┐ +/// │ unnest(i, "recursive" := CAST('t' AS BOOLEAN)) │ unnest(j, "recursive" := CAST('t' AS BOOLEAN)) │ +/// │ int32 │ int32 │ +/// ├────────────────────────────────────────────────┼────────────────────────────────────────────────┤ +/// │ 1 │ 1 │ +/// │ 2 │ 2 │ +/// │ 3 │ 1 │ +/// │ 4 │ 2 │ +/// │ 5 │ 1 │ +/// │ 6 │ 2 │ +/// │ 7 │ 1 │ +/// │ 8 │ 2 │ +/// └────────────────────────────────────────────────┴────────────────────────────────────────────────┘ +/// ``` +/// +/// The following implementation refer to DuckDB's implementation fn build_batch( batch: &RecordBatch, schema: &SchemaRef, - list_type_columns: &[usize], + list_type_columns: &[ListUnnest], struct_column_indices: &HashSet, options: &UnnestOptions, ) -> Result { let transformed = match list_type_columns.len() { 0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices), _ => { - let list_arrays: Vec = list_type_columns + let mut temp_unnested_result = HashMap::new(); + let max_recursion = list_type_columns .iter() - .map(|index| { - ColumnarValue::Array(Arc::clone(batch.column(*index))) - .into_array(batch.num_rows()) - }) - .collect::>()?; - - let longest_length = find_longest_length(&list_arrays, options)?; - let unnested_length = longest_length.as_primitive::(); - let total_length = if unnested_length.is_empty() { - 0 - } else { - sum(unnested_length).ok_or_else(|| { - exec_datafusion_err!("Failed to calculate the total unnested length") - })? as usize - }; - if total_length == 0 { - return Ok(RecordBatch::new_empty(Arc::clone(schema))); + .fold(0, |highest_depth, ListUnnest { depth, .. }| { + cmp::max(highest_depth, *depth) + }); + + // This arr always has the same column count with the input batch + let mut flatten_arrs = vec![]; + + // original batch has the same columns + // all unnesting results are written to temp_batch + for depth in (1..=max_recursion).rev() { + let input = match depth == max_recursion { + true => batch.columns(), + false => &flatten_arrs, + }; + let (temp_result, num_rows) = list_unnest_at_level( + input, + list_type_columns, + &mut temp_unnested_result, + depth, + options, + )?; + if num_rows == 0 { + return Ok(RecordBatch::new_empty(Arc::clone(schema))); + } + flatten_arrs = temp_result; } - - // Unnest all the list arrays - let unnested_arrays = - unnest_list_arrays(&list_arrays, unnested_length, total_length)?; - let unnested_array_map: HashMap<_, _> = unnested_arrays - .into_iter() - .zip(list_type_columns.iter()) - .map(|(array, column)| (*column, array)) + let unnested_array_map: HashMap> = + temp_unnested_result.into_iter().fold( + HashMap::new(), + |mut acc, + ( + ListUnnest { + index_in_input_schema, + depth, + }, + flattened_array, + )| { + acc.entry(index_in_input_schema).or_default().push( + UnnestingResult { + arr: flattened_array, + depth, + }, + ); + acc + }, + ); + let output_order: HashMap = list_type_columns + .iter() + .enumerate() + .map(|(order, unnest_def)| (*unnest_def, order)) .collect(); - // Create the take indices array for other columns - let take_indicies = create_take_indicies(unnested_length, total_length); + // one original column may be unnested multiple times into separate columns + let mut multi_unnested_per_original_index = unnested_array_map + .into_iter() + .map( + // each item in unnested_columns is the result of unnesting the same input column + // we need to sort them to conform with the original expression order + // e.g unnest(unnest(col)) must goes before unnest(col) + |(original_index, mut unnested_columns)| { + unnested_columns.sort_by( + |UnnestingResult { depth: depth1, .. }, + UnnestingResult { depth: depth2, .. }| + -> Ordering { + output_order + .get(&ListUnnest { + depth: *depth1, + index_in_input_schema: original_index, + }) + .unwrap() + .cmp( + output_order + .get(&ListUnnest { + depth: *depth2, + index_in_input_schema: original_index, + }) + .unwrap(), + ) + }, + ); + ( + original_index, + unnested_columns + .into_iter() + .map(|result| result.arr) + .collect::>(), + ) + }, + ) + .collect::>(); + + let ret = flatten_arrs + .into_iter() + .enumerate() + .flat_map(|(col_idx, arr)| { + // convert original column into its unnested version(s) + // Plural because one column can be unnested with different recursion level + // and into separate output columns + match multi_unnested_per_original_index.remove(&col_idx) { + Some(unnested_arrays) => unnested_arrays, + None => vec![arr], + } + }) + .collect::>(); - // vertical expansion because of list unnest - let ret = flatten_list_cols_from_indices( - batch, - &unnested_array_map, - &take_indicies, - )?; flatten_struct_cols(&ret, schema, struct_column_indices) } }; @@ -628,10 +861,10 @@ fn create_take_indicies( builder.finish() } -/// Create the final batch given the unnested column arrays and a `indices` array +/// Create the batch given an arrays and a `indices` array /// that is used by the take kernel to copy values. /// -/// For example if we have the following `RecordBatch`: +/// For example if we have the following batch: /// /// ```ignore /// c1: [1], null, [2, 3, 4], null, [5, 6] @@ -659,27 +892,23 @@ fn create_take_indicies( /// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd' /// ``` /// -fn flatten_list_cols_from_indices( - batch: &RecordBatch, - unnested_list_arrays: &HashMap, +fn repeat_arrs_from_indices( + batch: &[ArrayRef], indices: &PrimitiveArray, ) -> Result>> { - let arrays = batch - .columns() + batch .iter() - .enumerate() - .map(|(col_idx, arr)| match unnested_list_arrays.get(&col_idx) { - Some(unnested_array) => Ok(Arc::clone(unnested_array)), - None => Ok(kernels::take::take(arr, indices, None)?), - }) - .collect::>>()?; - Ok(arrays) + .map(|arr| Ok(kernels::take::take(arr, indices, None)?)) + .collect::>() } #[cfg(test)] mod tests { use super::*; - use arrow::datatypes::Field; + use arrow::{ + datatypes::{Field, Int32Type}, + util::pretty::pretty_format_batches, + }; use arrow_array::{GenericListArray, OffsetSizeTrait, StringArray}; use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer}; @@ -765,6 +994,139 @@ mod tests { Ok(()) } + #[test] + fn test_build_batch_list_arr_recursive() -> datafusion_common::Result<()> { + // col1 | col2 + // [[1,2,3],null,[4,5]] | ['a','b'] + // [[7,8,9,10], null, [11,12,13]] | ['c','d'] + // null | ['e'] + let list_arr1 = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + None, + Some(vec![Some(4), Some(5)]), + Some(vec![Some(7), Some(8), Some(9), Some(10)]), + None, + Some(vec![Some(11), Some(12), Some(13)]), + ]); + + let list_arr1_ref = Arc::new(list_arr1) as ArrayRef; + let offsets = OffsetBuffer::from_lengths([3, 3, 0]); + let mut nulls = BooleanBufferBuilder::new(3); + nulls.append(true); + nulls.append(true); + nulls.append(false); + // list> + let col1_field = Field::new_list_field( + DataType::List(Arc::new(Field::new_list_field( + list_arr1_ref.data_type().to_owned(), + true, + ))), + true, + ); + let col1 = ListArray::new( + Arc::new(Field::new_list_field( + list_arr1_ref.data_type().to_owned(), + true, + )), + offsets, + list_arr1_ref, + Some(NullBuffer::new(nulls.finish())), + ); + + let list_arr2 = StringArray::from(vec![ + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + ]); + + let offsets = OffsetBuffer::from_lengths([2, 2, 1]); + let mut nulls = BooleanBufferBuilder::new(3); + nulls.append_n(3, true); + let col2_field = Field::new( + "col2", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + ); + let col2 = GenericListArray::::new( + Arc::new(Field::new_list_field(DataType::Utf8, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(list_arr2), + Some(NullBuffer::new(nulls.finish())), + ); + // convert col1 and col2 to a record batch + let schema = Arc::new(Schema::new(vec![col1_field, col2_field])); + let out_schema = Arc::new(Schema::new(vec![ + Field::new( + "col1_unnest_placeholder_depth_1", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true), + Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef], + ) + .unwrap(); + let list_type_columns = vec![ + ListUnnest { + index_in_input_schema: 0, + depth: 1, + }, + ListUnnest { + index_in_input_schema: 0, + depth: 2, + }, + ListUnnest { + index_in_input_schema: 1, + depth: 1, + }, + ]; + let ret = build_batch( + &batch, + &out_schema, + list_type_columns.as_ref(), + &HashSet::default(), + &UnnestOptions { + preserve_nulls: true, + }, + )?; + let actual = + format!("{}", pretty_format_batches(vec![ret].as_ref())?).to_lowercase(); + let expected = r#" ++---------------------------------+---------------------------------+---------------------------------+ +| col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 | ++---------------------------------+---------------------------------+---------------------------------+ +| [1, 2, 3] | 1 | a | +| | 2 | b | +| [4, 5] | 3 | | +| [1, 2, 3] | | a | +| | | b | +| [4, 5] | | | +| [1, 2, 3] | 4 | a | +| | 5 | b | +| [4, 5] | | | +| [7, 8, 9, 10] | 7 | c | +| | 8 | d | +| [11, 12, 13] | 9 | | +| | 10 | | +| [7, 8, 9, 10] | | c | +| | | d | +| [11, 12, 13] | | | +| [7, 8, 9, 10] | 11 | c | +| | 12 | d | +| [11, 12, 13] | 13 | | +| | | e | ++---------------------------------+---------------------------------+---------------------------------+ + "# + .trim(); + assert_eq!(actual, expected); + Ok(()) + } + #[test] fn test_unnest_list_array() -> datafusion_common::Result<()> { // [A, B, C], [], NULL, [D], NULL, [NULL, F] diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 645df14a03374..1204c843fdb12 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -262,13 +262,35 @@ message CopyToNode { message UnnestNode { LogicalPlanNode input = 1; - repeated datafusion_common.Column exec_columns = 2; - repeated uint64 list_type_columns = 3; + repeated ColumnUnnestExec exec_columns = 2; + repeated ColumnUnnestListItem list_type_columns = 3; repeated uint64 struct_type_columns = 4; repeated uint64 dependency_indices = 5; datafusion_common.DfSchema schema = 6; UnnestOptions options = 7; } +message ColumnUnnestListItem { + uint32 input_index = 1; + ColumnUnnestListRecursion recursion = 2; +} + +message ColumnUnnestListRecursions { + repeated ColumnUnnestListRecursion recursions = 2; +} + +message ColumnUnnestListRecursion { + datafusion_common.Column output_column = 1; + uint32 depth = 2; +} + +message ColumnUnnestExec { + datafusion_common.Column column = 1; + oneof UnnestType { + ColumnUnnestListRecursions list = 2; + datafusion_common.EmptyMessage struct = 3; + datafusion_common.EmptyMessage inferred = 4; + } +} message UnnestOptions { bool preserve_nulls = 1; @@ -758,11 +780,16 @@ message ParquetSinkExecNode { message UnnestExecNode { PhysicalPlanNode input = 1; datafusion_common.Schema schema = 2; - repeated uint64 list_type_columns = 3; + repeated ListUnnest list_type_columns = 3; repeated uint64 struct_type_columns = 4; UnnestOptions options = 5; } +message ListUnnest { + uint32 index_in_input_schema = 1; + uint32 depth = 2; +} + message PhysicalExtensionNode { bytes node = 1; repeated PhysicalPlanNode inputs = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index e493f761b51f7..0614e33b7a4b8 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -2321,6 +2321,458 @@ impl<'de> serde::Deserialize<'de> for ColumnIndex { deserializer.deserialize_struct("datafusion.ColumnIndex", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for ColumnUnnestExec { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.column.is_some() { + len += 1; + } + if self.unnest_type.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.ColumnUnnestExec", len)?; + if let Some(v) = self.column.as_ref() { + struct_ser.serialize_field("column", v)?; + } + if let Some(v) = self.unnest_type.as_ref() { + match v { + column_unnest_exec::UnnestType::List(v) => { + struct_ser.serialize_field("list", v)?; + } + column_unnest_exec::UnnestType::Struct(v) => { + struct_ser.serialize_field("struct", v)?; + } + column_unnest_exec::UnnestType::Inferred(v) => { + struct_ser.serialize_field("inferred", v)?; + } + } + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ColumnUnnestExec { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "column", + "list", + "struct", + "inferred", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Column, + List, + Struct, + Inferred, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "column" => Ok(GeneratedField::Column), + "list" => Ok(GeneratedField::List), + "struct" => Ok(GeneratedField::Struct), + "inferred" => Ok(GeneratedField::Inferred), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ColumnUnnestExec; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.ColumnUnnestExec") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut column__ = None; + let mut unnest_type__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Column => { + if column__.is_some() { + return Err(serde::de::Error::duplicate_field("column")); + } + column__ = map_.next_value()?; + } + GeneratedField::List => { + if unnest_type__.is_some() { + return Err(serde::de::Error::duplicate_field("list")); + } + unnest_type__ = map_.next_value::<::std::option::Option<_>>()?.map(column_unnest_exec::UnnestType::List) +; + } + GeneratedField::Struct => { + if unnest_type__.is_some() { + return Err(serde::de::Error::duplicate_field("struct")); + } + unnest_type__ = map_.next_value::<::std::option::Option<_>>()?.map(column_unnest_exec::UnnestType::Struct) +; + } + GeneratedField::Inferred => { + if unnest_type__.is_some() { + return Err(serde::de::Error::duplicate_field("inferred")); + } + unnest_type__ = map_.next_value::<::std::option::Option<_>>()?.map(column_unnest_exec::UnnestType::Inferred) +; + } + } + } + Ok(ColumnUnnestExec { + column: column__, + unnest_type: unnest_type__, + }) + } + } + deserializer.deserialize_struct("datafusion.ColumnUnnestExec", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for ColumnUnnestListItem { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.input_index != 0 { + len += 1; + } + if self.recursion.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.ColumnUnnestListItem", len)?; + if self.input_index != 0 { + struct_ser.serialize_field("inputIndex", &self.input_index)?; + } + if let Some(v) = self.recursion.as_ref() { + struct_ser.serialize_field("recursion", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ColumnUnnestListItem { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "input_index", + "inputIndex", + "recursion", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + InputIndex, + Recursion, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "inputIndex" | "input_index" => Ok(GeneratedField::InputIndex), + "recursion" => Ok(GeneratedField::Recursion), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ColumnUnnestListItem; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.ColumnUnnestListItem") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut input_index__ = None; + let mut recursion__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::InputIndex => { + if input_index__.is_some() { + return Err(serde::de::Error::duplicate_field("inputIndex")); + } + input_index__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Recursion => { + if recursion__.is_some() { + return Err(serde::de::Error::duplicate_field("recursion")); + } + recursion__ = map_.next_value()?; + } + } + } + Ok(ColumnUnnestListItem { + input_index: input_index__.unwrap_or_default(), + recursion: recursion__, + }) + } + } + deserializer.deserialize_struct("datafusion.ColumnUnnestListItem", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for ColumnUnnestListRecursion { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.output_column.is_some() { + len += 1; + } + if self.depth != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.ColumnUnnestListRecursion", len)?; + if let Some(v) = self.output_column.as_ref() { + struct_ser.serialize_field("outputColumn", v)?; + } + if self.depth != 0 { + struct_ser.serialize_field("depth", &self.depth)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ColumnUnnestListRecursion { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "output_column", + "outputColumn", + "depth", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + OutputColumn, + Depth, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "outputColumn" | "output_column" => Ok(GeneratedField::OutputColumn), + "depth" => Ok(GeneratedField::Depth), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ColumnUnnestListRecursion; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.ColumnUnnestListRecursion") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut output_column__ = None; + let mut depth__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::OutputColumn => { + if output_column__.is_some() { + return Err(serde::de::Error::duplicate_field("outputColumn")); + } + output_column__ = map_.next_value()?; + } + GeneratedField::Depth => { + if depth__.is_some() { + return Err(serde::de::Error::duplicate_field("depth")); + } + depth__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(ColumnUnnestListRecursion { + output_column: output_column__, + depth: depth__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.ColumnUnnestListRecursion", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for ColumnUnnestListRecursions { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.recursions.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.ColumnUnnestListRecursions", len)?; + if !self.recursions.is_empty() { + struct_ser.serialize_field("recursions", &self.recursions)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ColumnUnnestListRecursions { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "recursions", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Recursions, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "recursions" => Ok(GeneratedField::Recursions), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ColumnUnnestListRecursions; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.ColumnUnnestListRecursions") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut recursions__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Recursions => { + if recursions__.is_some() { + return Err(serde::de::Error::duplicate_field("recursions")); + } + recursions__ = Some(map_.next_value()?); + } + } + } + Ok(ColumnUnnestListRecursions { + recursions: recursions__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.ColumnUnnestListRecursions", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for CopyToNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -8763,6 +9215,119 @@ impl<'de> serde::Deserialize<'de> for ListRange { deserializer.deserialize_struct("datafusion.ListRange", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for ListUnnest { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.index_in_input_schema != 0 { + len += 1; + } + if self.depth != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.ListUnnest", len)?; + if self.index_in_input_schema != 0 { + struct_ser.serialize_field("indexInInputSchema", &self.index_in_input_schema)?; + } + if self.depth != 0 { + struct_ser.serialize_field("depth", &self.depth)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ListUnnest { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "index_in_input_schema", + "indexInInputSchema", + "depth", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + IndexInInputSchema, + Depth, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "indexInInputSchema" | "index_in_input_schema" => Ok(GeneratedField::IndexInInputSchema), + "depth" => Ok(GeneratedField::Depth), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ListUnnest; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.ListUnnest") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut index_in_input_schema__ = None; + let mut depth__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::IndexInInputSchema => { + if index_in_input_schema__.is_some() { + return Err(serde::de::Error::duplicate_field("indexInInputSchema")); + } + index_in_input_schema__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Depth => { + if depth__.is_some() { + return Err(serde::de::Error::duplicate_field("depth")); + } + depth__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(ListUnnest { + index_in_input_schema: index_in_input_schema__.unwrap_or_default(), + depth: depth__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.ListUnnest", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for ListingTableScanNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -19403,7 +19968,7 @@ impl serde::Serialize for UnnestExecNode { struct_ser.serialize_field("schema", v)?; } if !self.list_type_columns.is_empty() { - struct_ser.serialize_field("listTypeColumns", &self.list_type_columns.iter().map(ToString::to_string).collect::>())?; + struct_ser.serialize_field("listTypeColumns", &self.list_type_columns)?; } if !self.struct_type_columns.is_empty() { struct_ser.serialize_field("structTypeColumns", &self.struct_type_columns.iter().map(ToString::to_string).collect::>())?; @@ -19505,10 +20070,7 @@ impl<'de> serde::Deserialize<'de> for UnnestExecNode { if list_type_columns__.is_some() { return Err(serde::de::Error::duplicate_field("listTypeColumns")); } - list_type_columns__ = - Some(map_.next_value::>>()? - .into_iter().map(|x| x.0).collect()) - ; + list_type_columns__ = Some(map_.next_value()?); } GeneratedField::StructTypeColumns => { if struct_type_columns__.is_some() { @@ -19576,7 +20138,7 @@ impl serde::Serialize for UnnestNode { struct_ser.serialize_field("execColumns", &self.exec_columns)?; } if !self.list_type_columns.is_empty() { - struct_ser.serialize_field("listTypeColumns", &self.list_type_columns.iter().map(ToString::to_string).collect::>())?; + struct_ser.serialize_field("listTypeColumns", &self.list_type_columns)?; } if !self.struct_type_columns.is_empty() { struct_ser.serialize_field("structTypeColumns", &self.struct_type_columns.iter().map(ToString::to_string).collect::>())?; @@ -19694,10 +20256,7 @@ impl<'de> serde::Deserialize<'de> for UnnestNode { if list_type_columns__.is_some() { return Err(serde::de::Error::duplicate_field("listTypeColumns")); } - list_type_columns__ = - Some(map_.next_value::>>()? - .into_iter().map(|x| x.0).collect()) - ; + list_type_columns__ = Some(map_.next_value()?); } GeneratedField::StructTypeColumns => { if struct_type_columns__.is_some() { diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 8d0c65f5690cb..07d9105b591a2 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -396,9 +396,9 @@ pub struct UnnestNode { #[prost(message, optional, boxed, tag = "1")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(message, repeated, tag = "2")] - pub exec_columns: ::prost::alloc::vec::Vec, - #[prost(uint64, repeated, tag = "3")] - pub list_type_columns: ::prost::alloc::vec::Vec, + pub exec_columns: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "3")] + pub list_type_columns: ::prost::alloc::vec::Vec, #[prost(uint64, repeated, tag = "4")] pub struct_type_columns: ::prost::alloc::vec::Vec, #[prost(uint64, repeated, tag = "5")] @@ -408,6 +408,44 @@ pub struct UnnestNode { #[prost(message, optional, tag = "7")] pub options: ::core::option::Option, } +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ColumnUnnestListItem { + #[prost(uint32, tag = "1")] + pub input_index: u32, + #[prost(message, optional, tag = "2")] + pub recursion: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ColumnUnnestListRecursions { + #[prost(message, repeated, tag = "2")] + pub recursions: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ColumnUnnestListRecursion { + #[prost(message, optional, tag = "1")] + pub output_column: ::core::option::Option, + #[prost(uint32, tag = "2")] + pub depth: u32, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ColumnUnnestExec { + #[prost(message, optional, tag = "1")] + pub column: ::core::option::Option, + #[prost(oneof = "column_unnest_exec::UnnestType", tags = "2, 3, 4")] + pub unnest_type: ::core::option::Option, +} +/// Nested message and enum types in `ColumnUnnestExec`. +pub mod column_unnest_exec { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum UnnestType { + #[prost(message, tag = "2")] + List(super::ColumnUnnestListRecursions), + #[prost(message, tag = "3")] + Struct(super::super::datafusion_common::EmptyMessage), + #[prost(message, tag = "4")] + Inferred(super::super::datafusion_common::EmptyMessage), + } +} #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct UnnestOptions { #[prost(bool, tag = "1")] @@ -1100,13 +1138,20 @@ pub struct UnnestExecNode { pub input: ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(message, optional, tag = "2")] pub schema: ::core::option::Option, - #[prost(uint64, repeated, tag = "3")] - pub list_type_columns: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "3")] + pub list_type_columns: ::prost::alloc::vec::Vec, #[prost(uint64, repeated, tag = "4")] pub struct_type_columns: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "5")] pub options: ::core::option::Option, } +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ListUnnest { + #[prost(uint32, tag = "1")] + pub index_in_input_schema: u32, + #[prost(uint32, tag = "2")] + pub depth: u32, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalExtensionNode { #[prost(bytes = "vec", tag = "1")] diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index db94563b7adfa..03e5f90a4dcc0 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -19,8 +19,12 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; +use crate::protobuf::column_unnest_exec::UnnestType; use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan; -use crate::protobuf::{CustomTableScanNode, SortExprNodeCollection}; +use crate::protobuf::{ + ColumnUnnestExec, ColumnUnnestListItem, ColumnUnnestListRecursion, + ColumnUnnestListRecursions, CustomTableScanNode, SortExprNodeCollection, +}; use crate::{ convert_required, into_required, protobuf::{ @@ -65,7 +69,8 @@ use datafusion_expr::{ DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr, WindowUDF, }; -use datafusion_expr::{AggregateUDF, Unnest}; +use datafusion_expr::{AggregateUDF, ColumnUnnestList, ColumnUnnestType, Unnest}; +use datafusion_proto_common::EmptyMessage; use self::to_proto::{serialize_expr, serialize_exprs}; use crate::logical_plan::to_proto::serialize_sorts; @@ -865,11 +870,50 @@ impl AsLogicalPlan for LogicalPlanNode { into_logical_plan!(unnest.input, ctx, extension_codec)?; Ok(datafusion_expr::LogicalPlan::Unnest(Unnest { input: Arc::new(input), - exec_columns: unnest.exec_columns.iter().map(|c| c.into()).collect(), + exec_columns: unnest + .exec_columns + .iter() + .map(|c| { + ( + c.column.as_ref().unwrap().to_owned().into(), + match c.unnest_type.as_ref().unwrap() { + UnnestType::Inferred(_) => ColumnUnnestType::Inferred, + UnnestType::Struct(_) => ColumnUnnestType::Struct, + UnnestType::List(l) => ColumnUnnestType::List( + l.recursions + .iter() + .map(|ul| ColumnUnnestList { + output_column: ul + .output_column + .as_ref() + .unwrap() + .to_owned() + .into(), + depth: ul.depth as usize, + }) + .collect(), + ), + }, + ) + }) + .collect(), list_type_columns: unnest .list_type_columns .iter() - .map(|c| *c as usize) + .map(|c| { + let recursion_item = c.recursion.as_ref().unwrap(); + ( + c.input_index as _, + ColumnUnnestList { + output_column: recursion_item + .output_column + .as_ref() + .unwrap() + .into(), + depth: recursion_item.depth as _, + }, + ) + }) .collect(), struct_type_columns: unnest .struct_type_columns @@ -1541,15 +1585,50 @@ impl AsLogicalPlan for LogicalPlanNode { input, extension_codec, )?; + let proto_unnest_list_items = list_type_columns + .iter() + .map(|(index, ul)| ColumnUnnestListItem { + input_index: *index as _, + recursion: Some(ColumnUnnestListRecursion { + output_column: Some(ul.output_column.to_owned().into()), + depth: ul.depth as _, + }), + }) + .collect(); Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Unnest(Box::new( protobuf::UnnestNode { input: Some(Box::new(input)), - exec_columns: exec_columns.iter().map(|c| c.into()).collect(), - list_type_columns: list_type_columns + exec_columns: exec_columns .iter() - .map(|c| *c as u64) + .map(|(col, unnesting)| ColumnUnnestExec { + column: Some(col.into()), + unnest_type: Some(match unnesting { + ColumnUnnestType::Inferred => { + UnnestType::Inferred(EmptyMessage {}) + } + ColumnUnnestType::Struct => { + UnnestType::Struct(EmptyMessage {}) + } + ColumnUnnestType::List(list) => { + UnnestType::List(ColumnUnnestListRecursions { + recursions: list + .iter() + .map(|ul| ColumnUnnestListRecursion { + output_column: Some( + ul.output_column + .to_owned() + .into(), + ), + depth: ul.depth as _, + }) + .collect(), + }) + } + }), + }) .collect(), + list_type_columns: proto_unnest_list_items, struct_type_columns: struct_type_columns .iter() .map(|c| *c as u64) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 6abfc71288f9d..9a6850cb21089 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -58,7 +58,7 @@ use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; -use datafusion::physical_plan::unnest::UnnestExec; +use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec}; use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use datafusion::physical_plan::{ ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr, @@ -78,7 +78,9 @@ use crate::physical_plan::to_proto::{ use crate::protobuf::physical_aggregate_expr_node::AggregateFunction; use crate::protobuf::physical_expr_node::ExprType; use crate::protobuf::physical_plan_node::PhysicalPlanType; -use crate::protobuf::{self, proto_error, window_agg_exec_node}; +use crate::protobuf::{ + self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest, +}; use crate::{convert_required, into_required}; use self::from_proto::parse_protobuf_partitioning; @@ -1123,7 +1125,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { Ok(Arc::new(UnnestExec::new( input, - unnest.list_type_columns.iter().map(|c| *c as _).collect(), + unnest + .list_type_columns + .iter() + .map(|c| ListUnnest { + index_in_input_schema: c.index_in_input_schema as _, + depth: c.depth as _, + }) + .collect(), unnest.struct_type_columns.iter().map(|c| *c as _).collect(), Arc::new(convert_required!(unnest.schema)?), into_required!(unnest.options)?, @@ -2000,7 +2009,10 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { list_type_columns: exec .list_column_indices() .iter() - .map(|c| *c as _) + .map(|c| ProtoListUnnest { + index_in_input_schema: c.index_in_input_schema as _, + depth: c.depth as _, + }) .collect(), struct_type_columns: exec .struct_column_indices() diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index f4b32e662ea9c..56290782b00f8 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -72,7 +72,7 @@ use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; -use datafusion::physical_plan::unnest::UnnestExec; +use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec}; use datafusion::physical_plan::windows::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowAggExec, }; @@ -1373,6 +1373,25 @@ fn roundtrip_unnest() -> Result<()> { Arc::new(Schema::new(vec![fa, fb0, fc1, fc2, fd0, fe1, fe2, fe3])); let input = Arc::new(EmptyExec::new(input_schema)); let options = UnnestOptions::default(); - let unnest = UnnestExec::new(input, vec![1, 3], vec![2, 4], output_schema, options); + let unnest = UnnestExec::new( + input, + vec![ + ListUnnest { + index_in_input_schema: 1, + depth: 1, + }, + ListUnnest { + index_in_input_schema: 1, + depth: 2, + }, + ListUnnest { + index_in_input_schema: 3, + depth: 2, + }, + ], + vec![2, 4], + output_schema, + options, + ); roundtrip_test(Arc::new(unnest)) } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 500932b468099..c93d9e6fc4357 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use crate::utils::{ check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs, - resolve_columns, resolve_positions_to_exprs, transform_bottom_unnests, + resolve_columns, resolve_positions_to_exprs, rewrite_recursive_unnests_bottom_up, }; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; @@ -158,6 +158,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { &combined_schema, planner_context, )?; + // aliases from the projection can conflict with same-named expressions in the input let mut alias_map = alias_map.clone(); for f in base_plan.schema().fields() { @@ -300,6 +301,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut intermediate_plan = input; let mut intermediate_select_exprs = select_exprs; + // Each expr in select_exprs can contains multiple unnest stage // The transformation happen bottom up, one at a time for each iteration // Only exaust the loop if no more unnest transformation is found @@ -314,7 +316,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 // - unnest(array_col) will be transformed into unnest(array_col).element // - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1 - let outer_projection_exprs = transform_bottom_unnests( + let outer_projection_exprs = rewrite_recursive_unnests_bottom_up( &intermediate_plan, &mut unnest_columns, &mut inner_projection_exprs, @@ -326,22 +328,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // The original expr does not contain any unnest if i == 0 { return LogicalPlanBuilder::from(intermediate_plan) - .project(inner_projection_exprs)? + .project(intermediate_select_exprs)? .build(); } break; } else { - let columns = unnest_columns.into_iter().map(|col| col.into()).collect(); // Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL let unnest_options = UnnestOptions::new().with_preserve_nulls(false); + let plan = LogicalPlanBuilder::from(intermediate_plan) .project(inner_projection_exprs)? - .unnest_columns_with_options(columns, unnest_options)? + .unnest_columns_recursive_with_options( + unnest_columns, + unnest_options, + )? .build()?; intermediate_plan = plan; intermediate_select_exprs = outer_projection_exprs; } } + LogicalPlanBuilder::from(intermediate_plan) .project(intermediate_select_exprs)? .build() @@ -368,8 +374,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } - /// Try converting Unnest(Expr) of group by to Unnest/Projection + /// Try converting Unnest(Expr) of group by to Unnest/Projection. /// Return the new input and group_by_exprs of Aggregate. + /// Select exprs can be different from agg exprs, for example: fn try_process_group_by_unnest( &self, agg: Aggregate, @@ -406,7 +413,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut unnest_columns = vec![]; let mut inner_projection_exprs = vec![]; - let outer_projection_exprs = transform_bottom_unnests( + let outer_projection_exprs = rewrite_recursive_unnests_bottom_up( &intermediate_plan, &mut unnest_columns, &mut inner_projection_exprs, @@ -416,7 +423,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if unnest_columns.is_empty() { break; } else { - let columns = unnest_columns.into_iter().map(|col| col.into()).collect(); let unnest_options = UnnestOptions::new().with_preserve_nulls(false); let mut projection_exprs = match &aggr_expr_using_columns { @@ -441,7 +447,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { intermediate_plan = LogicalPlanBuilder::from(intermediate_plan) .project(projection_exprs)? - .unnest_columns_with_options(columns, unnest_options)? + .unnest_columns_recursive_with_options( + unnest_columns, + unnest_options, + )? .build()?; intermediate_select_exprs = outer_projection_exprs; diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 2531795a16305..656e4b851aa86 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -17,21 +17,27 @@ //! SQL Utility Functions +use std::collections::HashMap; +use std::vec; + use arrow_schema::{ DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE, }; use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, }; use datafusion_common::{ - exec_err, internal_err, plan_err, Column, DataFusionError, Result, ScalarValue, + exec_err, internal_err, plan_err, Column, DFSchemaRef, DataFusionError, Result, + ScalarValue, }; -use datafusion_expr::builder::get_unnested_columns; +use datafusion_expr::builder::get_struct_unnested_columns; use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction}; use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs}; -use datafusion_expr::{expr_vec_fmt, Expr, ExprSchemable, LogicalPlan}; +use datafusion_expr::{ + col, expr_vec_fmt, ColumnUnnestList, ColumnUnnestType, Expr, ExprSchemable, + LogicalPlan, +}; use sqlparser::ast::{Ident, Value}; -use std::collections::HashMap; /// Make a best-effort attempt at resolving all columns in the expression tree pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { @@ -287,16 +293,16 @@ pub(crate) fn value_to_string(value: &Value) -> Option { } } -pub(crate) fn transform_bottom_unnests( +pub(crate) fn rewrite_recursive_unnests_bottom_up( input: &LogicalPlan, - unnest_placeholder_columns: &mut Vec, + unnest_placeholder_columns: &mut Vec<(Column, ColumnUnnestType)>, inner_projection_exprs: &mut Vec, original_exprs: &[Expr], ) -> Result> { Ok(original_exprs .iter() .map(|expr| { - transform_bottom_unnest( + rewrite_recursive_unnest_bottom_up( input, unnest_placeholder_columns, inner_projection_exprs, @@ -309,6 +315,269 @@ pub(crate) fn transform_bottom_unnests( .collect::>()) } +/* +This is only usedful when used with transform down up +A full example of how the transformation works: + */ +struct RecursiveUnnestRewriter<'a> { + input_schema: &'a DFSchemaRef, + root_expr: &'a Expr, + // useful to detect which child expr is a part of/ not a part of unnest operation + top_most_unnest: Option, + consecutive_unnest: Vec>, + inner_projection_exprs: &'a mut Vec, + columns_unnestings: &'a mut Vec<(Column, ColumnUnnestType)>, + transformed_root_exprs: Option>, +} +impl<'a> RecursiveUnnestRewriter<'a> { + /// This struct stores the history of expr + /// during its tree-traversal with a notation of + /// \[None,**Unnest(exprA)**,**Unnest(exprB)**,None,None\] + /// then this function will returns \[**Unnest(exprA)**,**Unnest(exprB)**\] + /// + /// The first item will be the inner most expr + fn get_latest_consecutive_unnest(&self) -> Vec { + self.consecutive_unnest + .iter() + .rev() + .skip_while(|item| item.is_none()) + .take_while(|item| item.is_some()) + .to_owned() + .cloned() + .map(|item| item.unwrap()) + .collect() + } + + fn transform( + &mut self, + level: usize, + alias_name: String, + expr_in_unnest: &Expr, + struct_allowed: bool, + ) -> Result> { + let inner_expr_name = expr_in_unnest.schema_name().to_string(); + + // Full context, we are trying to plan the execution as InnerProjection->Unnest->OuterProjection + // inside unnest execution, each column inside the inner projection + // will be transformed into new columns. Thus we need to keep track of these placeholding column names + // let placeholder_name = unnest_expr.display_name()?; + let placeholder_name = format!("unnest_placeholder({})", inner_expr_name); + let post_unnest_name = + format!("unnest_placeholder({},depth={})", inner_expr_name, level); + // This is due to the fact that unnest transformation should keep the original + // column name as is, to comply with group by and order by + // let post_unnest_alias = print_unnest(&inner_expr_name, level); + let placeholder_column = Column::from_name(placeholder_name.clone()); + + let (data_type, _) = expr_in_unnest.data_type_and_nullable(self.input_schema)?; + + match data_type { + DataType::Struct(inner_fields) => { + if !struct_allowed { + return internal_err!("unnest on struct can only be applied at the root level of select expression"); + } + push_projection_dedupl( + self.inner_projection_exprs, + expr_in_unnest.clone().alias(placeholder_name.clone()), + ); + self.columns_unnestings.push(( + Column::from_name(placeholder_name.clone()), + ColumnUnnestType::Struct, + )); + Ok( + get_struct_unnested_columns(&placeholder_name, &inner_fields) + .into_iter() + .map(Expr::Column) + .collect(), + ) + } + DataType::List(_) + | DataType::FixedSizeList(_, _) + | DataType::LargeList(_) => { + push_projection_dedupl( + self.inner_projection_exprs, + expr_in_unnest.clone().alias(placeholder_name.clone()), + ); + + // let post_unnest_column = Column::from_name(post_unnest_name); + let post_unnest_expr = col(post_unnest_name.clone()).alias(alias_name); + match self + .columns_unnestings + .iter_mut() + .find(|(inner_col, _)| inner_col == &placeholder_column) + { + // there is not unnesting done on this column yet + None => { + self.columns_unnestings.push(( + Column::from_name(placeholder_name.clone()), + ColumnUnnestType::List(vec![ColumnUnnestList { + output_column: Column::from_name(post_unnest_name), + depth: level, + }]), + )); + } + // some unnesting(at some level) has been done on this column + // e.g select unnest(column3), unnest(unnest(column3)) + Some((_, unnesting)) => match unnesting { + ColumnUnnestType::List(list) => { + let unnesting = ColumnUnnestList { + output_column: Column::from_name(post_unnest_name), + depth: level, + }; + if !list.contains(&unnesting) { + list.push(unnesting); + } + } + _ => { + return internal_err!("not reached"); + } + }, + } + Ok(vec![post_unnest_expr]) + } + _ => { + internal_err!("unnest on non-list or struct type is not supported") + } + } + } +} + +impl<'a> TreeNodeRewriter for RecursiveUnnestRewriter<'a> { + type Node = Expr; + + /// This downward traversal needs to keep track of: + /// - Whether or not some unnest expr has been visited from the top util the current node + /// - If some unnest expr has been visited, maintain a stack of such information, this + /// is used to detect if some recursive unnest expr exists (e.g **unnest(unnest(unnest(3d column))))** + fn f_down(&mut self, expr: Expr) -> Result> { + if let Expr::Unnest(ref unnest_expr) = expr { + let (data_type, _) = + unnest_expr.expr.data_type_and_nullable(self.input_schema)?; + self.consecutive_unnest.push(Some(unnest_expr.clone())); + // if expr inside unnest is a struct, do not consider + // the next unnest as consecutive unnest (if any) + // meaning unnest(unnest(struct_arr_col)) can't + // be interpreted as unest(struct_arr_col, depth:=2) + // but has to be split into multiple unnest logical plan instead + // a.k.a: + // - unnest(struct_col) + // unnest(struct_arr_col) as struct_col + + if let DataType::Struct(_) = data_type { + self.consecutive_unnest.push(None); + } + if self.top_most_unnest.is_none() { + self.top_most_unnest = Some(unnest_expr.clone()); + } + + Ok(Transformed::no(expr)) + } else { + self.consecutive_unnest.push(None); + Ok(Transformed::no(expr)) + } + } + + /// The rewriting only happens when the traversal has reached the top-most unnest expr + /// within a sequence of consecutive unnest exprs. + /// node, for example given a stack of expr + /// + /// For example an expr of **unnest(unnest(column1)) + unnest(unnest(unnest(column2)))** + /// ```text + /// ┌──────────────────┐ + /// │ binaryexpr │ + /// │ │ + /// └──────────────────┘ + /// f_down / / │ │ + /// / / f_up │ │ + /// / / f_down│ │f_up + /// unnest │ │ + /// │ │ + /// f_down / / f_up(rewriting) │ │ + /// / / + /// / / unnest + /// unnest + /// f_down / / f_up(rewriting) + /// f_down / /f_up / / + /// / / / / + /// / / unnest + /// column1 + /// f_down / /f_up + /// / / + /// / / + /// column2 + /// ``` + /// + fn f_up(&mut self, expr: Expr) -> Result> { + if let Expr::Unnest(ref traversing_unnest) = expr { + if traversing_unnest == self.top_most_unnest.as_ref().unwrap() { + self.top_most_unnest = None; + } + // find inside consecutive_unnest, the sequence of continous unnest exprs + + // Get the latest consecutive unnest exprs + // and check if current upward traversal is the returning to the root expr + // for example given a expr `unnest(unnest(col))` then the traversal happens like: + // down(unnest) -> down(unnest) -> down(col) -> up(col) -> up(unnest) -> up(unnest) + // the result of such traversal is unnest(col, depth:=2) + let unnest_stack = self.get_latest_consecutive_unnest(); + + // This traversal has reached the top most unnest again + // e.g Unnest(top) -> Unnest(2nd) -> Column(bottom) + // -> Unnest(2nd) -> Unnest(top) a.k.a here + // Thus + // Unnest(Unnest(some_col)) is rewritten into Unnest(some_col, depth:=2) + if traversing_unnest == unnest_stack.last().unwrap() { + let most_inner = unnest_stack.first().unwrap(); + let inner_expr = most_inner.expr.as_ref(); + // unnest(unnest(struct_arr_col)) is not allow to be done recursively + // it needs to be splitted into multiple unnest logical plan + // unnest(struct_arr) + // unnest(struct_arr_col) as struct_arr + // instead of unnest(struct_arr_col, depth = 2) + + let unnest_recursion = unnest_stack.len(); + let struct_allowed = (&expr == self.root_expr) && unnest_recursion == 1; + + let mut transformed_exprs = self.transform( + unnest_recursion, + expr.schema_name().to_string(), + inner_expr, + struct_allowed, + )?; + if struct_allowed { + self.transformed_root_exprs = Some(transformed_exprs.clone()); + } + return Ok(Transformed::new( + transformed_exprs.swap_remove(0), + true, + TreeNodeRecursion::Continue, + )); + } + } else { + self.consecutive_unnest.push(None); + } + + // For column exprs that are not descendants of any unnest node + // retain their projection + // e.g given expr tree unnest(col_a) + col_b, we have to retain projection of col_b + // this condition can be checked by maintaining an Option + if matches!(&expr, Expr::Column(_)) && self.top_most_unnest.is_none() { + push_projection_dedupl(self.inner_projection_exprs, expr.clone()); + } + + Ok(Transformed::no(expr)) + } +} + +fn push_projection_dedupl(projection: &mut Vec, expr: Expr) { + let schema_name = expr.schema_name().to_string(); + if !projection + .iter() + .any(|e| e.schema_name().to_string() == schema_name) + { + projection.push(expr); + } +} /// The context is we want to rewrite unnest() into InnerProjection->Unnest->OuterProjection /// Given an expression which contains unnest expr as one of its children, /// Try transform depends on unnest type @@ -318,37 +587,22 @@ pub(crate) fn transform_bottom_unnests( /// The transformed exprs will be used in the outer projection /// If along the path from root to bottom, there are multiple unnest expressions, the transformation /// is done only for the bottom expression -pub(crate) fn transform_bottom_unnest( +pub(crate) fn rewrite_recursive_unnest_bottom_up( input: &LogicalPlan, - unnest_placeholder_columns: &mut Vec, + unnest_placeholder_columns: &mut Vec<(Column, ColumnUnnestType)>, inner_projection_exprs: &mut Vec, original_expr: &Expr, ) -> Result> { - let mut transform = - |unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result> { - // Full context, we are trying to plan the execution as InnerProjection->Unnest->OuterProjection - // inside unnest execution, each column inside the inner projection - // will be transformed into new columns. Thus we need to keep track of these placeholding column names - let placeholder_name = unnest_expr.schema_name().to_string(); - - unnest_placeholder_columns.push(placeholder_name.clone()); - // Add alias for the argument expression, to avoid naming conflicts - // with other expressions in the select list. For example: `select unnest(col1), col1 from t`. - // this extra projection is used to unnest transforming - inner_projection_exprs - .push(expr_in_unnest.clone().alias(placeholder_name.clone())); - let schema = input.schema(); - - let (data_type, _) = expr_in_unnest.data_type_and_nullable(schema)?; - - let outer_projection_columns = - get_unnested_columns(&placeholder_name, &data_type)?; - let expr = outer_projection_columns - .iter() - .map(|col| Expr::Column(col.0.clone())) - .collect::>(); - Ok(expr) - }; + let mut rewriter = RecursiveUnnestRewriter { + input_schema: input.schema(), + root_expr: original_expr, + top_most_unnest: None, + consecutive_unnest: vec![], + inner_projection_exprs, + columns_unnestings: unnest_placeholder_columns, + transformed_root_exprs: None, + }; + // This transformation is only done for list unnest // struct unnest is done at the root level, and at the later stage // because the syntax of TreeNode only support transform into 1 Expr, while @@ -362,68 +616,151 @@ pub(crate) fn transform_bottom_unnest( data: transformed_expr, transformed, tnr: _, - } = original_expr.clone().transform_up(|expr: Expr| { - let is_root_expr = &expr == original_expr; - // Root expr is transformed separately - if is_root_expr { - return Ok(Transformed::no(expr)); - } - if let Expr::Unnest(Unnest { expr: ref arg }) = expr { - let (data_type, _) = arg.data_type_and_nullable(input.schema())?; - - if let DataType::Struct(_) = data_type { - return internal_err!("unnest on struct can only be applied at the root level of select expression"); - } - - let mut transformed_exprs = transform(&expr, arg)?; - // root_expr.push(transformed_exprs[0].clone()); - Ok(Transformed::new( - transformed_exprs.swap_remove(0), - true, - TreeNodeRecursion::Stop, - )) - } else { - Ok(Transformed::no(expr)) - } - })?; + } = original_expr.clone().rewrite(&mut rewriter)?; if !transformed { - // Because root expr need to transform separately - // unnest struct is only possible here - // The transformation looks like - // - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 - if let Expr::Unnest(Unnest { expr: ref arg }) = transformed_expr { - return transform(&transformed_expr, arg); - } - if matches!(&transformed_expr, Expr::Column(_)) { - inner_projection_exprs.push(transformed_expr.clone()); + push_projection_dedupl(inner_projection_exprs, transformed_expr.clone()); Ok(vec![transformed_expr]) } else { // We need to evaluate the expr in the inner projection, // outer projection just select its name let column_name = transformed_expr.schema_name().to_string(); - inner_projection_exprs.push(transformed_expr); + push_projection_dedupl(inner_projection_exprs, transformed_expr); Ok(vec![Expr::Column(Column::from_name(column_name))]) } } else { + if let Some(transformed_root_exprs) = rewriter.transformed_root_exprs { + return Ok(transformed_root_exprs); + } Ok(vec![transformed_expr]) } } -// write test for recursive_transform_unnest #[cfg(test)] mod tests { use std::{ops::Add, sync::Arc}; use arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; use arrow_schema::Fields; - use datafusion_common::{DFSchema, Result}; - use datafusion_expr::{col, lit, unnest, EmptyRelation, LogicalPlan}; + use datafusion_common::{Column, DFSchema, Result}; + use datafusion_expr::{ + col, lit, unnest, ColumnUnnestType, EmptyRelation, LogicalPlan, + }; use datafusion_functions::core::expr_ext::FieldAccessor; use datafusion_functions_aggregate::expr_fn::count; - use crate::utils::{resolve_positions_to_exprs, transform_bottom_unnest}; + use crate::utils::{resolve_positions_to_exprs, rewrite_recursive_unnest_bottom_up}; + fn column_unnests_eq(l: Vec<(&str, &str)>, r: &[(Column, ColumnUnnestType)]) { + let r_formatted: Vec = + r.iter().map(|i| format!("{}|{}", i.0, i.1)).collect(); + let l_formatted: Vec = + l.iter().map(|i| format!("{}|{}", i.0, i.1)).collect(); + assert_eq!(l_formatted, r_formatted); + } + + #[test] + fn test_transform_bottom_unnest_recursive() -> Result<()> { + let schema = Schema::new(vec![ + Field::new( + "3d_col", + ArrowDataType::List(Arc::new(Field::new( + "2d_col", + ArrowDataType::List(Arc::new(Field::new( + "elements", + ArrowDataType::Int64, + true, + ))), + true, + ))), + true, + ), + Field::new("i64_col", ArrowDataType::Int64, true), + ]); + + let dfschema = DFSchema::try_from(schema)?; + + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(dfschema), + }); + + let mut unnest_placeholder_columns = vec![]; + let mut inner_projection_exprs = vec![]; + + // unnest(unnest(3d_col)) + unnest(unnest(3d_col)) + let original_expr = unnest(unnest(col("3d_col"))) + .add(unnest(unnest(col("3d_col")))) + .add(col("i64_col")); + let transformed_exprs = rewrite_recursive_unnest_bottom_up( + &input, + &mut unnest_placeholder_columns, + &mut inner_projection_exprs, + &original_expr, + )?; + // only the bottom most unnest exprs are transformed + assert_eq!( + transformed_exprs, + vec![col("unnest_placeholder(3d_col,depth=2)") + .alias("UNNEST(UNNEST(3d_col))") + .add( + col("unnest_placeholder(3d_col,depth=2)") + .alias("UNNEST(UNNEST(3d_col))") + ) + .add(col("i64_col"))] + ); + column_unnests_eq( + vec![( + "unnest_placeholder(3d_col)", + "List([unnest_placeholder(3d_col,depth=2)|depth=2])", + )], + &unnest_placeholder_columns, + ); + + // still reference struct_col in original schema but with alias, + // to avoid colliding with the projection on the column itself if any + assert_eq!( + inner_projection_exprs, + vec![ + col("3d_col").alias("unnest_placeholder(3d_col)"), + col("i64_col") + ] + ); + + // unnest(3d_col) as 2d_col + let original_expr_2 = unnest(col("3d_col")).alias("2d_col"); + let transformed_exprs = rewrite_recursive_unnest_bottom_up( + &input, + &mut unnest_placeholder_columns, + &mut inner_projection_exprs, + &original_expr_2, + )?; + + assert_eq!( + transformed_exprs, + vec![ + (col("unnest_placeholder(3d_col,depth=1)").alias("UNNEST(3d_col)")) + .alias("2d_col") + ] + ); + column_unnests_eq( + vec![("unnest_placeholder(3d_col)", + "List([unnest_placeholder(3d_col,depth=2)|depth=2, unnest_placeholder(3d_col,depth=1)|depth=1])"), + ], + &unnest_placeholder_columns, + ); + // still reference struct_col in original schema but with alias, + // to avoid colliding with the projection on the column itself if any + assert_eq!( + inner_projection_exprs, + vec![ + col("3d_col").alias("unnest_placeholder(3d_col)"), + col("i64_col") + ] + ); + + Ok(()) + } #[test] fn test_transform_bottom_unnest() -> Result<()> { @@ -460,7 +797,7 @@ mod tests { // unnest(struct_col) let original_expr = unnest(col("struct_col")); - let transformed_exprs = transform_bottom_unnest( + let transformed_exprs = rewrite_recursive_unnest_bottom_up( &input, &mut unnest_placeholder_columns, &mut inner_projection_exprs, @@ -469,34 +806,45 @@ mod tests { assert_eq!( transformed_exprs, vec![ - col("UNNEST(struct_col).field1"), - col("UNNEST(struct_col).field2"), + col("unnest_placeholder(struct_col).field1"), + col("unnest_placeholder(struct_col).field2"), ] ); - assert_eq!(unnest_placeholder_columns, vec!["UNNEST(struct_col)"]); + column_unnests_eq( + vec![("unnest_placeholder(struct_col)", "Struct")], + &unnest_placeholder_columns, + ); // still reference struct_col in original schema but with alias, // to avoid colliding with the projection on the column itself if any assert_eq!( inner_projection_exprs, - vec![col("struct_col").alias("UNNEST(struct_col)"),] + vec![col("struct_col").alias("unnest_placeholder(struct_col)"),] ); // unnest(array_col) + 1 let original_expr = unnest(col("array_col")).add(lit(1i64)); - let transformed_exprs = transform_bottom_unnest( + let transformed_exprs = rewrite_recursive_unnest_bottom_up( &input, &mut unnest_placeholder_columns, &mut inner_projection_exprs, &original_expr, )?; - assert_eq!( - unnest_placeholder_columns, - vec!["UNNEST(struct_col)", "UNNEST(array_col)"] + column_unnests_eq( + vec![ + ("unnest_placeholder(struct_col)", "Struct"), + ( + "unnest_placeholder(array_col)", + "List([unnest_placeholder(array_col,depth=1)|depth=1])", + ), + ], + &unnest_placeholder_columns, ); // only transform the unnest children assert_eq!( transformed_exprs, - vec![col("UNNEST(array_col)").add(lit(1i64))] + vec![col("unnest_placeholder(array_col,depth=1)") + .alias("UNNEST(array_col)") + .add(lit(1i64))] ); // keep appending to the current vector @@ -505,8 +853,8 @@ mod tests { assert_eq!( inner_projection_exprs, vec![ - col("struct_col").alias("UNNEST(struct_col)"), - col("array_col").alias("UNNEST(array_col)") + col("struct_col").alias("unnest_placeholder(struct_col)"), + col("array_col").alias("unnest_placeholder(array_col)") ] ); @@ -544,7 +892,7 @@ mod tests { // An expr with multiple unnest let original_expr = unnest(unnest(col("struct_col").field("matrix"))); - let transformed_exprs = transform_bottom_unnest( + let transformed_exprs = rewrite_recursive_unnest_bottom_up( &input, &mut unnest_placeholder_columns, &mut inner_projection_exprs, @@ -553,17 +901,25 @@ mod tests { // Only the inner most/ bottom most unnest is transformed assert_eq!( transformed_exprs, - vec![unnest(col("UNNEST(struct_col[matrix])"))] + vec![col("unnest_placeholder(struct_col[matrix],depth=2)") + .alias("UNNEST(UNNEST(struct_col[matrix]))")] ); - assert_eq!( - unnest_placeholder_columns, - vec!["UNNEST(struct_col[matrix])"] + // TODO: add a test case where + // unnest -> field access -> unnest + + column_unnests_eq( + vec![( + "unnest_placeholder(struct_col[matrix])", + "List([unnest_placeholder(struct_col[matrix],depth=2)|depth=2])", + )], + &unnest_placeholder_columns, ); + assert_eq!( inner_projection_exprs, vec![col("struct_col") .field("matrix") - .alias("UNNEST(struct_col[matrix])"),] + .alias("unnest_placeholder(struct_col[matrix])"),] ); Ok(()) diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 5ef70d2abe5e6..76f19a23a0aec 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -478,11 +478,11 @@ fn test_unnest_logical_plan() -> Result<()> { }; let sql_to_rel = SqlToRel::new(&context); let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap(); - - let expected = "Projection: UNNEST(unnest_table.struct_col).field1, UNNEST(unnest_table.struct_col).field2, UNNEST(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col\ - \n Unnest: lists[UNNEST(unnest_table.array_col)] structs[UNNEST(unnest_table.struct_col)]\ - \n Projection: unnest_table.struct_col AS UNNEST(unnest_table.struct_col), unnest_table.array_col AS UNNEST(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col\ - \n TableScan: unnest_table"; + let expected = r#" +Projection: unnest_placeholder(unnest_table.struct_col).field1, unnest_placeholder(unnest_table.struct_col).field2, unnest_placeholder(unnest_table.array_col,depth=1) AS UNNEST(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col + Unnest: lists[unnest_placeholder(unnest_table.array_col)|depth=1] structs[unnest_placeholder(unnest_table.struct_col)] + Projection: unnest_table.struct_col AS unnest_placeholder(unnest_table.struct_col), unnest_table.array_col AS unnest_placeholder(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col + TableScan: unnest_table"#.trim_start(); assert_eq!(format!("{plan}"), expected); diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 7d0262952b313..6196087dd5302 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4055,9 +4055,9 @@ logical_plan 03)----TableScan: join_t1 projection=[t1_id, t1_name] 04)--SubqueryAlias: series 05)----Subquery: -06)------Projection: UNNEST(generate_series(Int64(1),outer_ref(t1.t1_int))) AS i -07)--------Unnest: lists[UNNEST(generate_series(Int64(1),outer_ref(t1.t1_int)))] structs[] -08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t1.t1_int) AS Int64)) AS UNNEST(generate_series(Int64(1),outer_ref(t1.t1_int))) +06)------Projection: unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)),depth=1) AS i +07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))|depth=1] structs[] +08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t1.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int))) 09)------------EmptyRelation @@ -4077,9 +4077,9 @@ logical_plan 03)----TableScan: join_t1 projection=[t1_id, t1_name] 04)--SubqueryAlias: series 05)----Subquery: -06)------Projection: UNNEST(generate_series(Int64(1),outer_ref(t2.t1_int))) AS i -07)--------Unnest: lists[UNNEST(generate_series(Int64(1),outer_ref(t2.t1_int)))] structs[] -08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t2.t1_int) AS Int64)) AS UNNEST(generate_series(Int64(1),outer_ref(t2.t1_int))) +06)------Projection: unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)),depth=1) AS i +07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))|depth=1] structs[] +08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t2.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int))) 09)------------EmptyRelation diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt index 2d74c1fc69945..86aa07b04ce1d 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -36,9 +36,9 @@ query TT explain select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2; ---- logical_plan -01)Projection: UNNEST(v.column2) AS uc2 -02)--Unnest: lists[UNNEST(v.column2)] structs[] -03)----Projection: v.column2 AS UNNEST(v.column2), v.column1 +01)Projection: unnest_placeholder(v.column2,depth=1) AS uc2 +02)--Unnest: lists[unnest_placeholder(v.column2)|depth=1] structs[] +03)----Projection: v.column2 AS unnest_placeholder(v.column2), v.column1 04)------Filter: v.column1 = Int64(2) 05)--------TableScan: v projection=[column1, column2] @@ -53,11 +53,11 @@ query TT explain select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3; ---- logical_plan -01)Projection: UNNEST(v.column2) AS uc2 -02)--Filter: UNNEST(v.column2) > Int64(3) -03)----Projection: UNNEST(v.column2) -04)------Unnest: lists[UNNEST(v.column2)] structs[] -05)--------Projection: v.column2 AS UNNEST(v.column2), v.column1 +01)Projection: unnest_placeholder(v.column2,depth=1) AS uc2 +02)--Filter: unnest_placeholder(v.column2,depth=1) > Int64(3) +03)----Projection: unnest_placeholder(v.column2,depth=1) +04)------Unnest: lists[unnest_placeholder(v.column2)|depth=1] structs[] +05)--------Projection: v.column2 AS unnest_placeholder(v.column2), v.column1 06)----------TableScan: v projection=[column1, column2] query II @@ -71,10 +71,10 @@ query TT explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2; ---- logical_plan -01)Projection: UNNEST(v.column2) AS uc2, v.column1 -02)--Filter: UNNEST(v.column2) > Int64(3) -03)----Unnest: lists[UNNEST(v.column2)] structs[] -04)------Projection: v.column2 AS UNNEST(v.column2), v.column1 +01)Projection: unnest_placeholder(v.column2,depth=1) AS uc2, v.column1 +02)--Filter: unnest_placeholder(v.column2,depth=1) > Int64(3) +03)----Unnest: lists[unnest_placeholder(v.column2)|depth=1] structs[] +04)------Projection: v.column2 AS unnest_placeholder(v.column2), v.column1 05)--------Filter: v.column1 = Int64(2) 06)----------TableScan: v projection=[column1, column2] @@ -90,10 +90,10 @@ query TT explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2; ---- logical_plan -01)Projection: UNNEST(v.column2) AS uc2, v.column1 -02)--Filter: UNNEST(v.column2) > Int64(3) OR v.column1 = Int64(2) -03)----Unnest: lists[UNNEST(v.column2)] structs[] -04)------Projection: v.column2 AS UNNEST(v.column2), v.column1 +01)Projection: unnest_placeholder(v.column2,depth=1) AS uc2, v.column1 +02)--Filter: unnest_placeholder(v.column2,depth=1) > Int64(3) OR v.column1 = Int64(2) +03)----Unnest: lists[unnest_placeholder(v.column2)|depth=1] structs[] +04)------Projection: v.column2 AS unnest_placeholder(v.column2), v.column1 05)--------TableScan: v projection=[column1, column2] statement ok @@ -112,10 +112,10 @@ query TT explain select * from (select column1, unnest(column2) as o from d) where o['a'] = 1; ---- logical_plan -01)Projection: d.column1, UNNEST(d.column2) AS o -02)--Filter: get_field(UNNEST(d.column2), Utf8("a")) = Int64(1) -03)----Unnest: lists[UNNEST(d.column2)] structs[] -04)------Projection: d.column1, d.column2 AS UNNEST(d.column2) +01)Projection: d.column1, unnest_placeholder(d.column2,depth=1) AS o +02)--Filter: get_field(unnest_placeholder(d.column2,depth=1), Utf8("a")) = Int64(1) +03)----Unnest: lists[unnest_placeholder(d.column2)|depth=1] structs[] +04)------Projection: d.column1, d.column2 AS unnest_placeholder(d.column2) 05)--------TableScan: d projection=[column1, column2] diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index afa576d127468..63ca74e9714c7 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -165,6 +165,34 @@ select unnest(column1), column1 from unnest_table; 6 [6] 12 [12] +# unnest at different level at the same time +query II +select unnest([1,2,3]), unnest(unnest([[1,2,3]])); +---- +1 1 +2 2 +3 3 + +# binary expr linking different unnest exprs +query II +select unnest([1,2,3]) + unnest([1,2,3]), unnest([1,2,3]) + unnest([4,5]); +---- +2 5 +4 7 +6 NULL + + +# binary expr linking different recursive unnest exprs +query III +select unnest(unnest([[1,2,3]])) + unnest(unnest([[1,2,3]])), unnest(unnest([[1,2,3]])) + unnest([4,5]), unnest([4,5]); +---- +2 5 4 +4 7 5 +6 NULL NULL + + + + ## unnest as children of other expr query I? select unnest(column1) + 1 , column1 from unnest_table; @@ -497,10 +525,45 @@ select unnest(column1) from (select * from (values([1,2,3]), ([4,5,6])) limit 1 6 ## FIXME: https://github.com/apache/datafusion/issues/11198 -query error DataFusion error: Error during planning: Projections require unique expression names but the expression "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 0 and "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 1 have the same name. Consider aliasing \("AS"\) one of them. +query error DataFusion error: Error during planning: Projections require unique expression names but the expression "UNNEST\(unnest_table.column1\)" at position 0 and "UNNEST\(unnest_table.column1\)" at position 1 have the same name. Consider aliasing \("AS"\) one of them. select unnest(column1), unnest(column1) from unnest_table; +## the same unnest expr is referened multiple times (unnest is the bottom-most expr) +query ??II +select unnest(column2), unnest(unnest(column2)), unnest(unnest(unnest(column2))), unnest(unnest(unnest(column2))) + 1 from recursive_unnest_table; +---- +[[1], [2]] [1] 1 2 +[[1, 1]] [2] NULL NULL +[[1], [2]] [1, 1] 2 3 +[[1, 1]] NULL NULL NULL +[[1], [2]] [1] 1 2 +[[1, 1]] [2] 1 2 +[[1], [2]] [1, 1] NULL NULL +[[1, 1]] NULL NULL NULL +[[3, 4], [5]] [3, 4] 3 4 +[[, 6], , [7, 8]] [5] 4 5 +[[3, 4], [5]] [, 6] 5 6 +[[, 6], , [7, 8]] NULL NULL NULL +NULL [7, 8] NULL NULL +[[3, 4], [5]] [3, 4] NULL NULL +[[, 6], , [7, 8]] [5] 6 7 +[[3, 4], [5]] [, 6] NULL NULL +[[, 6], , [7, 8]] NULL NULL NULL +NULL [7, 8] NULL NULL +[[3, 4], [5]] NULL 7 8 +[[, 6], , [7, 8]] NULL 8 9 + +## the same composite expr (unnest(field_access(unnest(col)))) which containing unnest is referened multiple times +query ??II +select unnest(column3), unnest(column3)['c0'], unnest(unnest(column3)['c0']), unnest(unnest(column3)['c0']) + unnest(unnest(column3)['c0']) from recursive_unnest_table; +---- +{c0: [1], c1: [[1, 2]]} [1] 1 2 +{c0: [2], c1: [[3], [4]]} [2] 2 4 + + + + ## unnest list followed by unnest struct query ??? select unnest(unnest(column3)), column3 from recursive_unnest_table; @@ -508,6 +571,24 @@ select unnest(unnest(column3)), column3 from recursive_unnest_table; [1] [[1, 2]] [{c0: [1], c1: [[1, 2]]}] [2] [[3], [4]] [{c0: [2], c1: [[3], [4]]}] + +query TT +explain select unnest(unnest(column3)), column3 from recursive_unnest_table; +---- +logical_plan +01)Unnest: lists[] structs[unnest_placeholder(UNNEST(recursive_unnest_table.column3))] +02)--Projection: unnest_placeholder(recursive_unnest_table.column3,depth=1) AS UNNEST(recursive_unnest_table.column3) AS unnest_placeholder(UNNEST(recursive_unnest_table.column3)), recursive_unnest_table.column3 +03)----Unnest: lists[unnest_placeholder(recursive_unnest_table.column3)|depth=1] structs[] +04)------Projection: recursive_unnest_table.column3 AS unnest_placeholder(recursive_unnest_table.column3), recursive_unnest_table.column3 +05)--------TableScan: recursive_unnest_table projection=[column3] +physical_plan +01)UnnestExec +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----ProjectionExec: expr=[unnest_placeholder(recursive_unnest_table.column3,depth=1)@0 as unnest_placeholder(UNNEST(recursive_unnest_table.column3)), column3@1 as column3] +04)------UnnestExec +05)--------ProjectionExec: expr=[column3@0 as unnest_placeholder(recursive_unnest_table.column3), column3@0 as column3] +06)----------MemoryExec: partitions=1, partition_sizes=[1] + ## unnest->field_access->unnest->unnest query I? select unnest(unnest(unnest(column3)['c1'])), column3 from recursive_unnest_table; @@ -534,27 +615,39 @@ NULL [[[3, 4], [5]], [[, 6], , [7, 8]]] 8 [[[3, 4], [5]], [[, 6], , [7, 8]]] +query I?? +select unnest(unnest(unnest(column3)['c1'])), unnest(unnest(column3)['c1']), column3 from recursive_unnest_table; +---- +1 [1, 2] [{c0: [1], c1: [[1, 2]]}] +2 NULL [{c0: [1], c1: [[1, 2]]}] +3 [3] [{c0: [2], c1: [[3], [4]]}] +NULL [4] [{c0: [2], c1: [[3], [4]]}] +4 [3] [{c0: [2], c1: [[3], [4]]}] +NULL [4] [{c0: [2], c1: [[3], [4]]}] +## demonstrate where recursive unnest is impossible +## and need multiple unnesting logical plans +## e.g unnest -> field_access -> unnest query TT explain select unnest(unnest(unnest(column3)['c1'])), column3 from recursive_unnest_table; ---- logical_plan -01)Unnest: lists[UNNEST(UNNEST(UNNEST(recursive_unnest_table.column3)[c1]))] structs[] -02)--Projection: UNNEST(UNNEST(recursive_unnest_table.column3)[c1]) AS UNNEST(UNNEST(UNNEST(recursive_unnest_table.column3)[c1])), recursive_unnest_table.column3 -03)----Unnest: lists[UNNEST(UNNEST(recursive_unnest_table.column3)[c1])] structs[] -04)------Projection: get_field(UNNEST(recursive_unnest_table.column3), Utf8("c1")) AS UNNEST(UNNEST(recursive_unnest_table.column3)[c1]), recursive_unnest_table.column3 -05)--------Unnest: lists[UNNEST(recursive_unnest_table.column3)] structs[] -06)----------Projection: recursive_unnest_table.column3 AS UNNEST(recursive_unnest_table.column3), recursive_unnest_table.column3 -07)------------TableScan: recursive_unnest_table projection=[column3] +01)Projection: unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1],depth=2) AS UNNEST(UNNEST(UNNEST(recursive_unnest_table.column3)[c1])), recursive_unnest_table.column3 +02)--Unnest: lists[unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1])|depth=2] structs[] +03)----Projection: get_field(unnest_placeholder(recursive_unnest_table.column3,depth=1) AS UNNEST(recursive_unnest_table.column3), Utf8("c1")) AS unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), recursive_unnest_table.column3 +04)------Unnest: lists[unnest_placeholder(recursive_unnest_table.column3)|depth=1] structs[] +05)--------Projection: recursive_unnest_table.column3 AS unnest_placeholder(recursive_unnest_table.column3), recursive_unnest_table.column3 +06)----------TableScan: recursive_unnest_table projection=[column3] physical_plan -01)UnnestExec -02)--ProjectionExec: expr=[UNNEST(UNNEST(recursive_unnest_table.column3)[c1])@0 as UNNEST(UNNEST(UNNEST(recursive_unnest_table.column3)[c1])), column3@1 as column3] -03)----UnnestExec -04)------ProjectionExec: expr=[get_field(UNNEST(recursive_unnest_table.column3)@0, c1) as UNNEST(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------UnnestExec -07)------------ProjectionExec: expr=[column3@0 as UNNEST(recursive_unnest_table.column3), column3@0 as column3] -08)--------------MemoryExec: partitions=1, partition_sizes=[1] +01)ProjectionExec: expr=[unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1],depth=2)@0 as UNNEST(UNNEST(UNNEST(recursive_unnest_table.column3)[c1])), column3@1 as column3] +02)--UnnestExec +03)----ProjectionExec: expr=[get_field(unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, c1) as unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------UnnestExec +06)----------ProjectionExec: expr=[column3@0 as unnest_placeholder(recursive_unnest_table.column3), column3@0 as column3] +07)------------MemoryExec: partitions=1, partition_sizes=[1] + + ## group by unnest