Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 124 additions & 40 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use crate::execution::physical_plan::common;
use crate::execution::physical_plan::csv::{CsvExec, CsvReadOptions};
use crate::execution::physical_plan::datasource::DatasourceExec;
use crate::execution::physical_plan::expressions::{
Alias, Avg, BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, PhysicalSortExpr,
Sum,
Avg, BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, PhysicalSortExpr, Sum,
};
use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
use crate::execution::physical_plan::limit::LimitExec;
Expand All @@ -51,10 +50,11 @@ use crate::execution::physical_plan::sort::{SortExec, SortOptions};
use crate::execution::physical_plan::udf::{ScalarFunction, ScalarFunctionExpr};
use crate::execution::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr};
use crate::execution::table_impl::TableImpl;
use crate::logicalplan::*;
use crate::logicalplan::{
Expr, FunctionMeta, FunctionType, LogicalPlan, LogicalPlanBuilder,
};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::optimizer::resolve_columns::ResolveColumnsRule;
use crate::optimizer::type_coercion::TypeCoercionRule;
use crate::sql::parser::{DFASTNode, DFParser, FileType};
use crate::sql::planner::{SchemaProvider, SqlToRel};
Expand All @@ -67,6 +67,15 @@ pub struct ExecutionContext {
scalar_functions: HashMap<String, Box<ScalarFunction>>,
}

fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
match value {
(Ok(e), Ok(e1)) => Ok((e, e1)),
(Err(e), Ok(_)) => Err(e),
(Ok(_), Err(e1)) => Err(e1),
(Err(e), Err(_)) => Err(e),
}
}

impl ExecutionContext {
/// Create a new execution context for in-memory queries
pub fn new() -> Self {
Expand Down Expand Up @@ -275,7 +284,6 @@ impl ExecutionContext {
/// Optimize the logical plan by applying optimizer rules
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let rules: Vec<Box<dyn OptimizerRule>> = vec![
Box::new(ResolveColumnsRule::new()),
Box::new(ProjectionPushDown::new()),
Box::new(TypeCoercionRule::new(&self.scalar_functions)),
];
Expand Down Expand Up @@ -356,7 +364,12 @@ impl ExecutionContext {
let input_schema = input.as_ref().schema().clone();
let runtime_expr = expr
.iter()
.map(|e| self.create_physical_expr(e, &input_schema))
.map(|e| {
tuple_err((
self.create_physical_expr(e, &input_schema),
e.name(&input_schema),
))
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
}
Expand All @@ -370,17 +383,30 @@ impl ExecutionContext {
let input = self.create_physical_plan(input, batch_size)?;
let input_schema = input.as_ref().schema().clone();

let group_expr = group_expr
let groups = group_expr
.iter()
.map(|e| self.create_physical_expr(e, &input_schema))
.map(|e| {
tuple_err((
self.create_physical_expr(e, &input_schema),
e.name(&input_schema),
))
})
.collect::<Result<Vec<_>>>()?;
let aggr_expr = aggr_expr
let aggregates = aggr_expr
.iter()
.map(|e| self.create_aggregate_expr(e, &input_schema))
.map(|e| {
tuple_err((
self.create_aggregate_expr(e, &input_schema),
e.name(&input_schema),
))
})
.collect::<Result<Vec<_>>>()?;

let initial_aggr =
HashAggregateExec::try_new(group_expr, aggr_expr, input)?;
let initial_aggr = HashAggregateExec::try_new(
groups.clone(),
aggregates.clone(),
input,
)?;

let schema = initial_aggr.schema();
let partitions = initial_aggr.partitions()?;
Expand All @@ -389,13 +415,27 @@ impl ExecutionContext {
return Ok(Arc::new(initial_aggr));
}

let (final_group, final_aggr) = initial_aggr.make_final_expr();

let merge = Arc::new(MergeExec::new(schema.clone(), partitions));

// construct the expressions for the final aggregation
let (final_group, final_aggr) = initial_aggr.make_final_expr(
groups.iter().map(|x| x.1.clone()).collect(),
aggregates.iter().map(|x| x.1.clone()).collect(),
);

// construct a second aggregation, keeping the final column name equal to the first aggregation
// and the expressions corresponding to the respective aggregate
Ok(Arc::new(HashAggregateExec::try_new(
final_group,
final_aggr,
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
.collect(),
final_aggr
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), aggregates[i].1.clone()))
.collect(),
merge,
)?))
}
Expand Down Expand Up @@ -455,12 +495,11 @@ impl ExecutionContext {
input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
match e {
Expr::Alias(expr, name) => {
let expr = self.create_physical_expr(expr, input_schema)?;
Ok(Arc::new(Alias::new(expr, &name)))
}
Expr::Column(i) => {
Ok(Arc::new(Column::new(*i, &input_schema.field(*i).name())))
Expr::Alias(expr, ..) => Ok(self.create_physical_expr(expr, input_schema)?),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're losing the alias name here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we just need to fix this so that the physical expression is created using the aliased name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, for the same reason: the physical plan does not care about names anymore, and aliases are only about naming.

Copy link
Member Author

@jorgecarleitao jorgecarleitao Jul 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi my comment was written before I read your second comment;

these pages are definitely not thread safe!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so the schema for these physical operators is based on the logical schema? I guess I just want to be sure that the alias name is preserved in the query results.

A unit test demonstrating this would be good.

Copy link
Member Author

@jorgecarleitao jorgecarleitao Jul 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I wrote my other comment because I became uncertain about my claim and wanted to double check ^_^: yes, good to have this triple checked; this is a big change)

Let's see:

  1. the schema of the logical plan is built on the column's names as per this line, which is then passed to the physical plan as per line of this discussion. So, it should be the case...
  2. line 1089 of context.rs IMO demonstrates this: we alias an expression in the logical plan, and the physical's plan schema's field of that column is the alias itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I just pulled your branch and modified one of the integration tests just to be sure, and it looks good. This really helps simplify some things, thanks!

Could you rebase?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) perfect. I have rebased this against the latest master.

Expr::Column(name) => {
// check that name exists
input_schema.field_with_name(&name)?;
Ok(Arc::new(Column::new(name)))
}
Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new(
Expand All @@ -484,7 +523,6 @@ impl ExecutionContext {
physical_args.push(self.create_physical_expr(e, input_schema)?);
}
Ok(Arc::new(ScalarFunctionExpr::new(
name,
Box::new(f.fun.clone()),
physical_args,
return_type,
Expand Down Expand Up @@ -650,6 +688,7 @@ mod tests {
use super::*;
use crate::datasource::MemTable;
use crate::execution::physical_plan::udf::ScalarUdf;
use crate::logicalplan::{aggregate_expr, col, scalar_function};
use crate::test;
use arrow::array::{ArrayRef, Int32Array};
use arrow::compute::add;
Expand All @@ -666,10 +705,12 @@ mod tests {
// there should be one batch per partition
assert_eq!(results.len(), partition_count);

// each batch should contain 2 columns and 10 rows
// each batch should contain 2 columns and 10 rows with correct field names
for batch in &results {
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 10);

assert_eq!(field_names(batch), vec!["c1", "c2"]);
}

Ok(())
Expand Down Expand Up @@ -706,7 +747,7 @@ mod tests {

let table = ctx.table("test")?;
let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan())
.project(vec![Expr::UnresolvedColumn("c2".to_string())])?
.project(vec![col("c2")])?
.build()?;

let optimized_plan = ctx.optimize(&logical_plan)?;
Expand All @@ -725,7 +766,7 @@ mod tests {
_ => assert!(false, "expect optimized_plan to be projection"),
}

let expected = "Projection: #0\
let expected = "Projection: #c2\
\n TableScan: test projection=Some([1])";
assert_eq!(format!("{:?}", optimized_plan), expected);

Expand All @@ -747,19 +788,19 @@ mod tests {
let tmp_dir = TempDir::new("execute")?;
let ctx = create_ctx(&tmp_dir, 1)?;

let schema = Arc::new(Schema::new(vec![Field::new(
"state",
DataType::Utf8,
false,
)]));
let schema = ctx.datasources.get("test").unwrap().schema();
assert_eq!(schema.field_with_name("c1")?.is_nullable(), false);

let plan = LogicalPlanBuilder::scan("default", "test", schema.as_ref(), None)?
.project(vec![col("state")])?
.project(vec![col("c1")])?
.build()?;

let plan = ctx.optimize(&plan)?;
let physical_plan = ctx.create_physical_plan(&Arc::new(plan), 1024)?;
assert_eq!(physical_plan.schema().field(0).is_nullable(), false);
assert_eq!(
physical_plan.schema().field_with_name("c1")?.is_nullable(),
false
);
Ok(())
}

Expand All @@ -783,7 +824,7 @@ mod tests {
projection: None,
projected_schema: Box::new(schema.clone()),
})
.project(vec![Expr::UnresolvedColumn("b".to_string())])?
.project(vec![col("b")])?
.build()?;
assert_fields_eq(&plan, vec!["b"]);

Expand All @@ -804,7 +845,7 @@ mod tests {
_ => assert!(false, "expect optimized_plan to be projection"),
}

let expected = "Projection: #0\
let expected = "Projection: #b\
\n InMemoryScan: projection=Some([1])";
assert_eq!(format!("{:?}", optimized_plan), expected);

Expand Down Expand Up @@ -844,6 +885,9 @@ mod tests {
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["SUM(c1)", "SUM(c2)"]);

let expected: Vec<&str> = vec!["60,220"];
let mut rows = test::format_batch(&batch);
rows.sort();
Expand All @@ -858,6 +902,9 @@ mod tests {
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["AVG(c1)", "AVG(c2)"]);

let expected: Vec<&str> = vec!["1.5,5.5"];
let mut rows = test::format_batch(&batch);
rows.sort();
Expand All @@ -872,6 +919,9 @@ mod tests {
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["MAX(c1)", "MAX(c2)"]);

let expected: Vec<&str> = vec!["3,10"];
let mut rows = test::format_batch(&batch);
rows.sort();
Expand All @@ -886,6 +936,9 @@ mod tests {
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["MIN(c1)", "MIN(c2)"]);

let expected: Vec<&str> = vec!["0,1"];
let mut rows = test::format_batch(&batch);
rows.sort();
Expand All @@ -900,6 +953,9 @@ mod tests {
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["c1", "SUM(c2)"]);

let expected: Vec<&str> = vec!["0,55", "1,55", "2,55", "3,55"];
let mut rows = test::format_batch(&batch);
rows.sort();
Expand All @@ -914,6 +970,9 @@ mod tests {
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["c1", "AVG(c2)"]);

let expected: Vec<&str> = vec!["0,5.5", "1,5.5", "2,5.5", "3,5.5"];
let mut rows = test::format_batch(&batch);
rows.sort();
Expand All @@ -928,6 +987,9 @@ mod tests {
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["c1", "MAX(c2)"]);

let expected: Vec<&str> = vec!["0,10", "1,10", "2,10", "3,10"];
let mut rows = test::format_batch(&batch);
rows.sort();
Expand All @@ -942,6 +1004,9 @@ mod tests {
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["c1", "MIN(c2)"]);

let expected: Vec<&str> = vec!["0,1", "1,1", "2,1", "3,1"];
let mut rows = test::format_batch(&batch);
rows.sort();
Expand All @@ -956,6 +1021,9 @@ mod tests {
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["COUNT(c1)", "COUNT(c2)"]);

let expected: Vec<&str> = vec!["10,10"];
let mut rows = test::format_batch(&batch);
rows.sort();
Expand All @@ -969,6 +1037,9 @@ mod tests {
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["COUNT(c1)", "COUNT(c2)"]);

let expected: Vec<&str> = vec!["40,40"];
let mut rows = test::format_batch(&batch);
rows.sort();
Expand All @@ -982,6 +1053,9 @@ mod tests {
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["c1", "COUNT(c2)"]);

let expected = vec!["0,10", "1,10", "2,10", "3,10"];
let mut rows = test::format_batch(&batch);
rows.sort();
Expand All @@ -995,16 +1069,16 @@ mod tests {
let ctx = create_ctx(&tmp_dir, 1)?;

let schema = Arc::new(Schema::new(vec![
Field::new("state", DataType::Utf8, false),
Field::new("salary", DataType::UInt32, false),
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::UInt32, false),
]));

let plan = LogicalPlanBuilder::scan("default", "test", schema.as_ref(), None)?
.aggregate(
vec![col("state")],
Copy link
Member Author

@jorgecarleitao jorgecarleitao Jul 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was necessary because the file's schema is c1, c2, not state, salary. We were able to get away with this because, since the names did not mean anything, we could read a file with a given schema using another schema's field names, as long as the indexing was correct. I believe that this should not be possible by design, as it introduces situations that are non-trivial to debug.

vec![aggregate_expr("SUM", col("salary"), DataType::UInt32)],
vec![col("c1")],
vec![aggregate_expr("SUM", col("c2"), DataType::UInt32)],
)?
.project(vec![col("state"), col_index(1).alias("total_salary")])?
.project(vec![col("c1"), col("SUM(c2)").alias("total_salary")])?
.build()?;

let plan = ctx.optimize(&plan)?;
Expand Down Expand Up @@ -1131,6 +1205,7 @@ mod tests {
let batch = &result[0];
assert_eq!(3, batch.num_columns());
assert_eq!(4, batch.num_rows());
assert_eq!(field_names(batch), vec!["a", "b", "my_add(a,b)"]);

let a = batch
.column(0)
Expand Down Expand Up @@ -1166,6 +1241,15 @@ mod tests {
ctx.collect(physical_plan.as_ref())
}

fn field_names(result: &RecordBatch) -> Vec<String> {
result
.schema()
.fields()
.iter()
.map(|x| x.name().clone())
.collect::<Vec<String>>()
}

/// Execute SQL and return results
fn execute(sql: &str, partition_count: usize) -> Result<Vec<RecordBatch>> {
let tmp_dir = TempDir::new("execute")?;
Expand Down
Loading