Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,15 +1287,16 @@ impl TableProvider for DataFrameTableProvider {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut expr = LogicalPlanBuilder::from(self.plan.clone());
if let Some(p) = projection {
expr = expr.select(p.iter().copied())?
}

// Add filter when given
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
if let Some(filter) = filter {
expr = expr.filter(filter)?
}

if let Some(p) = projection {
expr = expr.select(p.iter().copied())?
}

// add a limit if given
if let Some(l) = limit {
expr = expr.limit(0, Some(l))?
Expand Down
27 changes: 14 additions & 13 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,20 @@ impl TableProvider for ViewTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = if let Some(projection) = projection {
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
let plan = self.logical_plan().clone();
let mut plan = LogicalPlanBuilder::from(plan);

if let Some(filter) = filter {
plan = plan.filter(filter)?;
}

let mut plan = if let Some(projection) = projection {
// avoiding adding a redundant projection (e.g. SELECT * FROM view)
let current_projection =
(0..self.logical_plan.schema().fields().len()).collect::<Vec<usize>>();
(0..plan.schema().fields().len()).collect::<Vec<usize>>();
if projection == &current_projection {
self.logical_plan().clone()
plan
} else {
let fields: Vec<Expr> = projection
.iter()
Expand All @@ -123,19 +131,11 @@ impl TableProvider for ViewTable {
)
})
.collect();
LogicalPlanBuilder::from(self.logical_plan.clone())
.project(fields)?
.build()?
plan.project(fields)?
}
} else {
self.logical_plan().clone()
plan
};
let mut plan = LogicalPlanBuilder::from(plan);
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));

if let Some(filter) = filter {
plan = plan.filter(filter)?;
}

if let Some(limit) = limit {
plan = plan.limit(0, Some(limit))?;
Expand Down Expand Up @@ -439,6 +439,7 @@ mod tests {
.select_columns(&["bool_col", "int_col"])?;

let plan = df.explain(false, false)?.collect().await?;

// Filters all the way to Parquet
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
Expand Down
11 changes: 10 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use crate::expr_rewriter::{
};
use crate::type_coercion::binary::comparison_coercion;
use crate::utils::{columnize_expr, compare_sort_expr};
use crate::{and, binary_expr, DmlStatement, Operator, WriteOp};
use crate::{
and, binary_expr, DmlStatement, Operator, TableProviderFilterPushDown, WriteOp,
};
use crate::{
logical_plan::{
Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join,
Expand Down Expand Up @@ -1402,6 +1404,13 @@ impl TableSource for LogicalTableSource {
fn schema(&self) -> SchemaRef {
self.table_schema.clone()
}

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<crate::TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
}
}

/// Create a [`LogicalPlan::Unnest`] plan
Expand Down
Loading