-
Notifications
You must be signed in to change notification settings - Fork 1.9k
add some pre-check for LogicalPlanBuilder #2054
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| /// Build the plan | ||
| pub fn build(&self) -> Result<LogicalPlan> { | ||
| Ok(self.plan.clone()) | ||
| if let Some(err) = check_plan_invalid(&self.plan) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend to return Result instead of Option<Err>.
Then, We can handle error like this.
match err {
Ok() => Ok(s),
Err() => Err(e),
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean Result<LogicalPlan, DataFusionError>?
Hm, this check function just finds the err, if it returns Ok(plan), it's a little bit strange🤔.
Another reason is we must write return root_plan.clone() in each arm of the match if use Result.
If you mean Result<(), DataFusionError>, I think it's the same as Option, right?
Or if I fail to get your idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you mean Result<(), DataFusionError>, I think it's the same as Option, right?
Yes, Essentially they are all enumeration types
A result represent either success/ Ok or failure/ Err. It's more elegant and reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can write this as well:
check_plan_invalid(&self.plan)?;Which will return the error
| } | ||
|
|
||
| /// do some checks for exprs in a logical plan | ||
| fn check_invalid_expr(expr: &Expr, schema: &DFSchemaRef) -> Option<DataFusionError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| .iter() | ||
| .filter_map(|e| check_invalid_expr(e, schema)) | ||
| .next() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, if use Result, code here looks like:
for expr in exprs {
let r = check_invalid_expr(expr, schema);
if r.is_err() {
return r;
}
}
Ok(())
or
exprs
.iter()
.map(|e| check_invalid_expr(e, schema))
.fold_ok((), |start, v| start) // I dont like it
I dont like the fold_ok((), |start, v| start)
I don't know how to make it more elegant, please give more suggestions.
@jackwener @alamb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about:
for expr in exprs {
check_invalid_expr(expr, schema)?
}
Ok(())You can also use https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.try_for_each
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
I applied the suggested changes from @jackwener which is same as yours.
jackwener
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checkout this PR locally and rewrite use Result.
It can compile successfully. You just apply those suggestion and continue working base on it.
| /// check whether the logical plan we are building is valid | ||
| fn check_plan_invalid(plan: &LogicalPlan) -> Option<DataFusionError> { | ||
| match plan { | ||
| LogicalPlan::Projection(Projection { expr, input, .. }) | ||
| | LogicalPlan::Sort(Sort { expr, input }) | ||
| | LogicalPlan::Window(Window { | ||
| window_expr: expr, | ||
| input, | ||
| .. | ||
| }) => check_plan_invalid(input) | ||
| .or_else(|| check_any_invalid_expr(expr, input.schema())), | ||
|
|
||
| LogicalPlan::Filter(Filter { | ||
| predicate: expr, | ||
| input, | ||
| }) => { | ||
| check_plan_invalid(input).or_else(|| check_invalid_expr(expr, input.schema())) | ||
| } | ||
|
|
||
| LogicalPlan::Aggregate(Aggregate { | ||
| input, | ||
| group_expr, | ||
| aggr_expr, | ||
| .. | ||
| }) => { | ||
| let schema = input.schema(); | ||
| check_plan_invalid(input) | ||
| .or_else(|| check_any_invalid_expr(group_expr, schema)) | ||
| .or_else(|| check_any_invalid_expr(aggr_expr, schema)) | ||
| } | ||
|
|
||
| LogicalPlan::Join(Join { left, right, .. }) | ||
| | LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { | ||
| check_plan_invalid(left).or_else(|| check_plan_invalid(right)) | ||
| } | ||
|
|
||
| LogicalPlan::Repartition(Repartition { input, .. }) | ||
| | LogicalPlan::Limit(Limit { input, .. }) | ||
| | LogicalPlan::Explain(Explain { plan: input, .. }) | ||
| | LogicalPlan::Analyze(Analyze { input, .. }) => check_plan_invalid(input), | ||
|
|
||
| LogicalPlan::Union(Union { inputs, .. }) => { | ||
| inputs.iter().filter_map(check_plan_invalid).next() | ||
| } | ||
|
|
||
| LogicalPlan::TableScan(TableScan { | ||
| table_name: _, | ||
| source, | ||
| projection, | ||
| projected_schema, | ||
| filters, | ||
| limit: _, | ||
| }) => { | ||
| if let Some(projection) = projection { | ||
| if projection.len() > projected_schema.fields().len() { | ||
| return Some(DataFusionError::Plan( | ||
| "projection contains columns that doesnt belong to projected schema" | ||
| .to_owned(), | ||
| )); | ||
| } | ||
| } | ||
| let schema = &source.schema().to_dfschema_ref().ok()?; | ||
| check_any_invalid_expr(filters, schema) | ||
| } | ||
|
|
||
| _ => None, | ||
| } | ||
| } | ||
|
|
||
| /// find first error in the exprs | ||
| fn check_any_invalid_expr( | ||
| exprs: &[Expr], | ||
| schema: &DFSchemaRef, | ||
| ) -> Option<DataFusionError> { | ||
| exprs | ||
| .iter() | ||
| .filter_map(|e| check_invalid_expr(e, schema)) | ||
| .next() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// check whether the logical plan we are building is valid | |
| fn check_plan_invalid(plan: &LogicalPlan) -> Option<DataFusionError> { | |
| match plan { | |
| LogicalPlan::Projection(Projection { expr, input, .. }) | |
| | LogicalPlan::Sort(Sort { expr, input }) | |
| | LogicalPlan::Window(Window { | |
| window_expr: expr, | |
| input, | |
| .. | |
| }) => check_plan_invalid(input) | |
| .or_else(|| check_any_invalid_expr(expr, input.schema())), | |
| LogicalPlan::Filter(Filter { | |
| predicate: expr, | |
| input, | |
| }) => { | |
| check_plan_invalid(input).or_else(|| check_invalid_expr(expr, input.schema())) | |
| } | |
| LogicalPlan::Aggregate(Aggregate { | |
| input, | |
| group_expr, | |
| aggr_expr, | |
| .. | |
| }) => { | |
| let schema = input.schema(); | |
| check_plan_invalid(input) | |
| .or_else(|| check_any_invalid_expr(group_expr, schema)) | |
| .or_else(|| check_any_invalid_expr(aggr_expr, schema)) | |
| } | |
| LogicalPlan::Join(Join { left, right, .. }) | |
| | LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { | |
| check_plan_invalid(left).or_else(|| check_plan_invalid(right)) | |
| } | |
| LogicalPlan::Repartition(Repartition { input, .. }) | |
| | LogicalPlan::Limit(Limit { input, .. }) | |
| | LogicalPlan::Explain(Explain { plan: input, .. }) | |
| | LogicalPlan::Analyze(Analyze { input, .. }) => check_plan_invalid(input), | |
| LogicalPlan::Union(Union { inputs, .. }) => { | |
| inputs.iter().filter_map(check_plan_invalid).next() | |
| } | |
| LogicalPlan::TableScan(TableScan { | |
| table_name: _, | |
| source, | |
| projection, | |
| projected_schema, | |
| filters, | |
| limit: _, | |
| }) => { | |
| if let Some(projection) = projection { | |
| if projection.len() > projected_schema.fields().len() { | |
| return Some(DataFusionError::Plan( | |
| "projection contains columns that doesnt belong to projected schema" | |
| .to_owned(), | |
| )); | |
| } | |
| } | |
| let schema = &source.schema().to_dfschema_ref().ok()?; | |
| check_any_invalid_expr(filters, schema) | |
| } | |
| _ => None, | |
| } | |
| } | |
| /// find first error in the exprs | |
| fn check_any_invalid_expr( | |
| exprs: &[Expr], | |
| schema: &DFSchemaRef, | |
| ) -> Option<DataFusionError> { | |
| exprs | |
| .iter() | |
| .filter_map(|e| check_invalid_expr(e, schema)) | |
| .next() | |
| } | |
| /// check whether the logical plan we are building is valid | |
| fn check_plan_invalid(plan: &LogicalPlan) -> Result<()> { | |
| match plan { | |
| LogicalPlan::Projection(Projection { expr, input, .. }) | |
| | LogicalPlan::Sort(Sort { expr, input }) | |
| | LogicalPlan::Window(Window { | |
| window_expr: expr, | |
| input, | |
| .. | |
| }) => check_plan_invalid(input) | |
| .or_else(|_| check_any_invalid_expr(expr, input.schema())), | |
| LogicalPlan::Filter(Filter { | |
| predicate: expr, | |
| input, | |
| }) => check_plan_invalid(input) | |
| .or_else(|_| check_invalid_expr(expr, input.schema())), | |
| LogicalPlan::Aggregate(Aggregate { | |
| input, | |
| group_expr, | |
| aggr_expr, | |
| .. | |
| }) => { | |
| let schema = input.schema(); | |
| check_plan_invalid(input) | |
| .or_else(|_| check_any_invalid_expr(group_expr, schema)) | |
| .or_else(|_| check_any_invalid_expr(aggr_expr, schema)) | |
| } | |
| LogicalPlan::Join(Join { left, right, .. }) | |
| | LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { | |
| check_plan_invalid(left).or_else(|_| check_plan_invalid(right)) | |
| } | |
| LogicalPlan::Repartition(Repartition { input, .. }) | |
| | LogicalPlan::Limit(Limit { input, .. }) | |
| | LogicalPlan::Explain(Explain { plan: input, .. }) | |
| | LogicalPlan::Analyze(Analyze { input, .. }) => check_plan_invalid(input), | |
| LogicalPlan::Union(Union { inputs, .. }) => { | |
| inputs.iter().map(check_plan_invalid).collect() | |
| } | |
| LogicalPlan::TableScan(TableScan { | |
| table_name: _, | |
| source, | |
| projection, | |
| projected_schema, | |
| filters, | |
| limit: _, | |
| }) => { | |
| if let Some(projection) = projection { | |
| if projection.len() > projected_schema.fields().len() { | |
| return Err(DataFusionError::Plan( | |
| "projection contains columns that doesnt belong to projected schema" | |
| .to_owned(), | |
| )); | |
| } | |
| } | |
| let schema = &source.schema().to_dfschema_ref()?; | |
| check_any_invalid_expr(filters, schema) | |
| } | |
| _ => Ok(()), | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry for the late response! Thanks @jackwener , the code looks better!
|
Sorry for the reply a little late. |
|
Thanks for your detailed suggestions! @jackwener |
|
Thank you @jackwener . The code looks better! |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks -- I think this is looking good @doki23
The only thing I would really like to see before merging is a test for the wildcard check. The others comments are simply nice to have.
| } | ||
|
|
||
| /// do some checks for exprs in a logical plan | ||
| fn check_invalid_expr(expr: &Expr, schema: &DFSchemaRef) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I may have missed it, but as written I don't think this recurses into the expr -- so while it will find nonsense like
NOT "foo"
I don't think it would find
A = 5 OR (NOT "foo")
We could use an ExpressionVisitor for thishttps://github.com/apache/arrow-datafusion/blob/634252b44cec8dc904e48926d0aa54feeb4d48af/datafusion/src/logical_plan/expr_visitor.rs#L32-L47
Here is an example of using ExpressionVisitor for something similar:
https://github.com/apache/arrow-datafusion/blob/2d6addd4c435123e934ce04564d8cc77bf101c37/datafusion/src/datasource/listing/helpers.rs#L55-L119
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that visitor pattern is better, I'll try that.
| } | ||
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also please add a test for using wildcards incorrectly -- like SELECT .. GROUP BY * and SELECT .. GROUP BY foo*?
|
|
||
| #[test] | ||
| fn select_neg_filter() { | ||
| let sql = "SELECT id, first_name, last_name \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| expand_wildcard(&qualifier_schema, plan) | ||
| } | ||
|
|
||
| /// check whether the logical plan we are building is valid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow on PR, it would be awesome to encapsulate this LogicalPlan tree walking code into some sort of visitor.
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
|
I add pre-check after optimization. Although there's no unified validation framework, but it seems that it's always valid after optimization😂. So this pr may be unnecessary. I'm going to close it temporarily if there're no other suggestions or counter examples. |
|
😂 |
Which issue does this PR close?
Closes #2041 .
Rationale for this change
do some check before building a logical plan