Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 172 additions & 14 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
//! projections one by one if the operator below is amenable to this. If a
//! projection reaches a source, it can even dissappear from the plan entirely.

use std::sync::Arc;

use super::output_requirements::OutputRequirementExec;
use super::PhysicalOptimizerRule;
use crate::datasource::physical_plan::CsvExec;
Expand All @@ -39,18 +41,17 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{Distribution, ExecutionPlan};

use arrow_schema::SchemaRef;

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::JoinSide;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{
Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_physical_plan::streaming::StreamingTableExec;
use datafusion_physical_plan::union::UnionExec;

use itertools::Itertools;
use std::sync::Arc;

/// This rule inspects [`ProjectionExec`]'s in the given physical plan and tries to
/// remove or swap with its child.
Expand Down Expand Up @@ -135,6 +136,8 @@ pub fn remove_unnecessary_projections(
try_swapping_with_sort_merge_join(projection, sm_join)?
} else if let Some(sym_join) = input.downcast_ref::<SymmetricHashJoinExec>() {
try_swapping_with_sym_hash_join(projection, sym_join)?
} else if let Some(ste) = input.downcast_ref::<StreamingTableExec>() {
try_swapping_with_streaming_table(projection, ste)?
} else {
// If the input plan of the projection is not one of the above, we
// conservatively assume that pushing the projection down may hurt.
Expand All @@ -149,8 +152,8 @@ pub fn remove_unnecessary_projections(
Ok(maybe_modified.map_or(Transformed::No(plan), Transformed::Yes))
}

/// Tries to swap `projection` with its input (`csv`). If possible, performs
/// the swap and returns [`CsvExec`] as the top plan. Otherwise, returns `None`.
/// Tries to embed `projection` to its input (`csv`). If possible, returns
/// [`CsvExec`] as the top plan. Otherwise, returns `None`.
fn try_swapping_with_csv(
projection: &ProjectionExec,
csv: &CsvExec,
Expand All @@ -174,8 +177,8 @@ fn try_swapping_with_csv(
})
}

/// Tries to swap `projection` with its input (`memory`). If possible, performs
/// the swap and returns [`MemoryExec`] as the top plan. Otherwise, returns `None`.
/// Tries to embed `projection` to its input (`memory`). If possible, returns
/// [`MemoryExec`] as the top plan. Otherwise, returns `None`.
fn try_swapping_with_memory(
projection: &ProjectionExec,
memory: &MemoryExec,
Expand All @@ -197,10 +200,52 @@ fn try_swapping_with_memory(
.transpose()
}

/// Tries to embed `projection` to its input (`streaming table`).
/// If possible, returns [`StreamingTableExec`] as the top plan. Otherwise,
/// returns `None`.
fn try_swapping_with_streaming_table(
projection: &ProjectionExec,
streaming_table: &StreamingTableExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if !all_alias_free_columns(projection.expr()) {
return Ok(None);
}

let streaming_table_projections = streaming_table
.projection()
.as_ref()
.map(|i| i.as_ref().to_vec());
let new_projections =
new_projections_for_columns(projection, &streaming_table_projections);

let mut lex_orderings = vec![];
for lex_ordering in streaming_table.projected_output_ordering().into_iter() {
let mut orderings = vec![];
for order in lex_ordering {
let Some(new_ordering) = update_expr(&order.expr, projection.expr(), false)?
else {
return Ok(None);
};
orderings.push(PhysicalSortExpr {
expr: new_ordering,
options: order.options,
});
}
lex_orderings.push(orderings);
}

StreamingTableExec::try_new(
streaming_table.partition_schema().clone(),
streaming_table.partitions().clone(),
Some(&new_projections),
lex_orderings,
streaming_table.is_infinite(),
)
.map(|e| Some(Arc::new(e) as _))
}

/// Unifies `projection` with its input (which is also a [`ProjectionExec`]).
/// Two consecutive projections can always merge into a single projection unless
/// the [`update_expr`] function does not support one of the expression
/// types involved in the projection.
/// Two consecutive projections can always merge into a single projection.
fn try_unifying_projections(
projection: &ProjectionExec,
child: &ProjectionExec,
Expand Down Expand Up @@ -779,10 +824,6 @@ fn new_projections_for_columns(
/// given the expressions `c@0`, `a@1` and `b@2`, and the [`ProjectionExec`] with
/// an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes
/// `a@0`, but `b@2` results in `None` since the projection does not include `b`.
///
/// If the expression contains a `PhysicalExpr` variant that this function does
/// not support, it will return `None`. An error can only be introduced if
/// `CaseExpr::try_new` returns an error.
fn update_expr(
expr: &Arc<dyn PhysicalExpr>,
projected_exprs: &[(Arc<dyn PhysicalExpr>, String)],
Expand Down Expand Up @@ -1102,10 +1143,11 @@ mod tests {
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::ExecutionPlan;

use arrow_schema::{DataType, Field, Schema, SortOptions};
use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{JoinSide, JoinType, Result, ScalarValue, Statistics};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{ColumnarValue, Operator};
use datafusion_physical_expr::expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr,
Expand All @@ -1115,8 +1157,11 @@ mod tests {
PhysicalSortRequirement, ScalarFunctionExpr,
};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::union::UnionExec;

use itertools::Itertools;

#[test]
fn test_update_matching_exprs() -> Result<()> {
let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Expand Down Expand Up @@ -1575,6 +1620,119 @@ mod tests {
Ok(())
}

#[test]
fn test_streaming_table_after_projection() -> Result<()> {
struct DummyStreamPartition {
schema: SchemaRef,
}
impl PartitionStream for DummyStreamPartition {
fn schema(&self) -> &SchemaRef {
&self.schema
}
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
unreachable!()
}
}

let streaming_table = StreamingTableExec::try_new(
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Int32, true),
])),
vec![Arc::new(DummyStreamPartition {
schema: Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Int32, true),
])),
}) as _],
Some(&vec![0_usize, 2, 4, 3]),
vec![
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("e", 2)),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: SortOptions::default(),
},
],
vec![PhysicalSortExpr {
expr: Arc::new(Column::new("d", 3)),
options: SortOptions::default(),
}],
]
.into_iter(),
true,
)?;
let projection = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("d", 3)), "d".to_string()),
(Arc::new(Column::new("e", 2)), "e".to_string()),
(Arc::new(Column::new("a", 0)), "a".to_string()),
],
Arc::new(streaming_table) as _,
)?) as _;

let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;

let result = after_optimize
.as_any()
.downcast_ref::<StreamingTableExec>()
.unwrap();
assert_eq!(
result.partition_schema(),
&Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Int32, true),
]))
);
assert_eq!(
result.projection().clone().unwrap().to_vec(),
vec![3_usize, 4, 0]
);
assert_eq!(
result.projected_schema(),
&Schema::new(vec![
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Int32, true),
Field::new("a", DataType::Int32, true),
])
);
assert_eq!(
result.projected_output_ordering().into_iter().collect_vec(),
vec![
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("e", 1)),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 2)),
options: SortOptions::default(),
},
],
vec![PhysicalSortExpr {
expr: Arc::new(Column::new("d", 0)),
options: SortOptions::default(),
}],
]
);
assert!(result.is_infinite());

Ok(())
}

#[test]
fn test_projection_after_projection() -> Result<()> {
let csv = create_simple_csv_exec();
Expand Down
29 changes: 27 additions & 2 deletions datafusion/physical-plan/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::stream::RecordBatchStreamAdapter;
use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};

use arrow::datatypes::SchemaRef;
use arrow_schema::Schema;
use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
Expand Down Expand Up @@ -70,9 +71,9 @@ impl StreamingTableExec {
) -> Result<Self> {
for x in partitions.iter() {
let partition_schema = x.schema();
if !schema.contains(partition_schema) {
if !schema.eq(partition_schema) {
debug!(
"target schema does not contain partition schema. \
"Target schema does not match with partition schema. \
Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}"
);
return plan_err!("Mismatch between schema and batches");
Expand All @@ -92,6 +93,30 @@ impl StreamingTableExec {
infinite,
})
}

pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
&self.partitions
}

pub fn partition_schema(&self) -> &SchemaRef {
self.partitions[0].schema()
}

pub fn projection(&self) -> &Option<Arc<[usize]>> {
&self.projection
}

pub fn projected_schema(&self) -> &Schema {
&self.projected_schema
}

pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
self.projected_output_ordering.clone()
}

pub fn is_infinite(&self) -> bool {
self.infinite
}
}

impl std::fmt::Debug for StreamingTableExec {
Expand Down