diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 74d0de507e4c9..c0e512ffe57b2 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -20,6 +20,8 @@ //! projections one by one if the operator below is amenable to this. If a //! projection reaches a source, it can even dissappear from the plan entirely. +use std::sync::Arc; + use super::output_requirements::OutputRequirementExec; use super::PhysicalOptimizerRule; use crate::datasource::physical_plan::CsvExec; @@ -39,7 +41,6 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::{Distribution, ExecutionPlan}; use arrow_schema::SchemaRef; - use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::JoinSide; @@ -47,10 +48,10 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{ Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; +use datafusion_physical_plan::streaming::StreamingTableExec; use datafusion_physical_plan::union::UnionExec; use itertools::Itertools; -use std::sync::Arc; /// This rule inspects [`ProjectionExec`]'s in the given physical plan and tries to /// remove or swap with its child. @@ -135,6 +136,8 @@ pub fn remove_unnecessary_projections( try_swapping_with_sort_merge_join(projection, sm_join)? } else if let Some(sym_join) = input.downcast_ref::() { try_swapping_with_sym_hash_join(projection, sym_join)? + } else if let Some(ste) = input.downcast_ref::() { + try_swapping_with_streaming_table(projection, ste)? } else { // If the input plan of the projection is not one of the above, we // conservatively assume that pushing the projection down may hurt. @@ -149,8 +152,8 @@ pub fn remove_unnecessary_projections( Ok(maybe_modified.map_or(Transformed::No(plan), Transformed::Yes)) } -/// Tries to swap `projection` with its input (`csv`). If possible, performs -/// the swap and returns [`CsvExec`] as the top plan. Otherwise, returns `None`. +/// Tries to embed `projection` to its input (`csv`). If possible, returns +/// [`CsvExec`] as the top plan. Otherwise, returns `None`. fn try_swapping_with_csv( projection: &ProjectionExec, csv: &CsvExec, @@ -174,8 +177,8 @@ fn try_swapping_with_csv( }) } -/// Tries to swap `projection` with its input (`memory`). If possible, performs -/// the swap and returns [`MemoryExec`] as the top plan. Otherwise, returns `None`. +/// Tries to embed `projection` to its input (`memory`). If possible, returns +/// [`MemoryExec`] as the top plan. Otherwise, returns `None`. fn try_swapping_with_memory( projection: &ProjectionExec, memory: &MemoryExec, @@ -197,10 +200,52 @@ fn try_swapping_with_memory( .transpose() } +/// Tries to embed `projection` to its input (`streaming table`). +/// If possible, returns [`StreamingTableExec`] as the top plan. Otherwise, +/// returns `None`. +fn try_swapping_with_streaming_table( + projection: &ProjectionExec, + streaming_table: &StreamingTableExec, +) -> Result>> { + if !all_alias_free_columns(projection.expr()) { + return Ok(None); + } + + let streaming_table_projections = streaming_table + .projection() + .as_ref() + .map(|i| i.as_ref().to_vec()); + let new_projections = + new_projections_for_columns(projection, &streaming_table_projections); + + let mut lex_orderings = vec![]; + for lex_ordering in streaming_table.projected_output_ordering().into_iter() { + let mut orderings = vec![]; + for order in lex_ordering { + let Some(new_ordering) = update_expr(&order.expr, projection.expr(), false)? + else { + return Ok(None); + }; + orderings.push(PhysicalSortExpr { + expr: new_ordering, + options: order.options, + }); + } + lex_orderings.push(orderings); + } + + StreamingTableExec::try_new( + streaming_table.partition_schema().clone(), + streaming_table.partitions().clone(), + Some(&new_projections), + lex_orderings, + streaming_table.is_infinite(), + ) + .map(|e| Some(Arc::new(e) as _)) +} + /// Unifies `projection` with its input (which is also a [`ProjectionExec`]). -/// Two consecutive projections can always merge into a single projection unless -/// the [`update_expr`] function does not support one of the expression -/// types involved in the projection. +/// Two consecutive projections can always merge into a single projection. fn try_unifying_projections( projection: &ProjectionExec, child: &ProjectionExec, @@ -779,10 +824,6 @@ fn new_projections_for_columns( /// given the expressions `c@0`, `a@1` and `b@2`, and the [`ProjectionExec`] with /// an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes /// `a@0`, but `b@2` results in `None` since the projection does not include `b`. -/// -/// If the expression contains a `PhysicalExpr` variant that this function does -/// not support, it will return `None`. An error can only be introduced if -/// `CaseExpr::try_new` returns an error. fn update_expr( expr: &Arc, projected_exprs: &[(Arc, String)], @@ -1102,10 +1143,11 @@ mod tests { use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::ExecutionPlan; - use arrow_schema::{DataType, Field, Schema, SortOptions}; + use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions}; use datafusion_common::config::ConfigOptions; use datafusion_common::{JoinSide, JoinType, Result, ScalarValue, Statistics}; use datafusion_execution::object_store::ObjectStoreUrl; + use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{ColumnarValue, Operator}; use datafusion_physical_expr::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr, @@ -1115,8 +1157,11 @@ mod tests { PhysicalSortRequirement, ScalarFunctionExpr, }; use datafusion_physical_plan::joins::SymmetricHashJoinExec; + use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; use datafusion_physical_plan::union::UnionExec; + use itertools::Itertools; + #[test] fn test_update_matching_exprs() -> Result<()> { let exprs: Vec> = vec![ @@ -1575,6 +1620,119 @@ mod tests { Ok(()) } + #[test] + fn test_streaming_table_after_projection() -> Result<()> { + struct DummyStreamPartition { + schema: SchemaRef, + } + impl PartitionStream for DummyStreamPartition { + fn schema(&self) -> &SchemaRef { + &self.schema + } + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + unreachable!() + } + } + + let streaming_table = StreamingTableExec::try_new( + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), + ])), + vec![Arc::new(DummyStreamPartition { + schema: Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), + ])), + }) as _], + Some(&vec![0_usize, 2, 4, 3]), + vec![ + vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("e", 2)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }, + ], + vec![PhysicalSortExpr { + expr: Arc::new(Column::new("d", 3)), + options: SortOptions::default(), + }], + ] + .into_iter(), + true, + )?; + let projection = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("d", 3)), "d".to_string()), + (Arc::new(Column::new("e", 2)), "e".to_string()), + (Arc::new(Column::new("a", 0)), "a".to_string()), + ], + Arc::new(streaming_table) as _, + )?) as _; + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let result = after_optimize + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + result.partition_schema(), + &Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), + ])) + ); + assert_eq!( + result.projection().clone().unwrap().to_vec(), + vec![3_usize, 4, 0] + ); + assert_eq!( + result.projected_schema(), + &Schema::new(vec![ + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), + Field::new("a", DataType::Int32, true), + ]) + ); + assert_eq!( + result.projected_output_ordering().into_iter().collect_vec(), + vec![ + vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("e", 1)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 2)), + options: SortOptions::default(), + }, + ], + vec![PhysicalSortExpr { + expr: Arc::new(Column::new("d", 0)), + options: SortOptions::default(), + }], + ] + ); + assert!(result.is_infinite()); + + Ok(()) + } + #[test] fn test_projection_after_projection() -> Result<()> { let csv = create_simple_csv_exec(); diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index b0eaa2b42f424..59819c6921fb4 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -26,6 +26,7 @@ use crate::stream::RecordBatchStreamAdapter; use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; use arrow::datatypes::SchemaRef; +use arrow_schema::Schema; use datafusion_common::{internal_err, plan_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr}; @@ -70,9 +71,9 @@ impl StreamingTableExec { ) -> Result { for x in partitions.iter() { let partition_schema = x.schema(); - if !schema.contains(partition_schema) { + if !schema.eq(partition_schema) { debug!( - "target schema does not contain partition schema. \ + "Target schema does not match with partition schema. \ Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}" ); return plan_err!("Mismatch between schema and batches"); @@ -92,6 +93,30 @@ impl StreamingTableExec { infinite, }) } + + pub fn partitions(&self) -> &Vec> { + &self.partitions + } + + pub fn partition_schema(&self) -> &SchemaRef { + self.partitions[0].schema() + } + + pub fn projection(&self) -> &Option> { + &self.projection + } + + pub fn projected_schema(&self) -> &Schema { + &self.projected_schema + } + + pub fn projected_output_ordering(&self) -> impl IntoIterator { + self.projected_output_ordering.clone() + } + + pub fn is_infinite(&self) -> bool { + self.infinite + } } impl std::fmt::Debug for StreamingTableExec {