Skip to content

Conversation

@zhengshiJ
Copy link
Contributor

@zhengshiJ zhengshiJ commented Aug 3, 2022

…put them into the query tree

Proposed changes

Issue Number:

Problem summary

1.Convert subqueries to Apply nodes.
2.Convert ApplyNode to ordinary join.

Detailed design:

There are three types of current subexpressions, scalarSubquery, inSubquery, and Exists. The scalarSubquery refers to the returned data as 1 row and 1 column.

Subquery replacement

before:
scalarSubquery:  filter(t1.a = scalarSubquery(output b));
inSubquery:  filter(inSubquery);   inSubquery = (t1.a in select ***);
exists:  filter(exists);   exists = (select ***);

end:
scalarSubquery:  filter(t1.a = b);
inSubquery:  filter(True);
exists:  filter(True);

Subquery Transformation Rules

PushApplyUnderFilter
 * before:
 *             Apply
 *          /              \
 * Input(output:b)    Filter(Correlated predicate/UnCorrelated predicate)
 *
 * after:
 *          Filter(Correlated predicate)
 *                      |
 *                  Apply
 *                /            \
 *      Input(output:b)    Filter(UnCorrelated predicate)
PushApplyUnderProject
 * before:
 *            Apply
 *         /              \
 * Input(output:b)    Project(output:a)
 *
 * after:
 *          Project(b,(if the Subquery is Scalar add 'a' as the output column))
 *          /               \
 * Input(output:b)      Apply
ApplyPullFilterOnAgg
 * before:
 *             Apply
 *          /              \
 * Input(output:b)    agg(output:fn,c; group by:null)
 *                              |
 *              Filter(Correlated predicate(Input.e = this.f)/UnCorrelated predicate)
 *
 * end:
 *          Apply(Correlated predicate(Input.e = this.f))
 *         /              \
 * Input(output:b)    agg(output:fn,this.f; group by:this.f)
 *                              |
 *                    Filter(UnCorrelated predicate)
ApplyPullFilterOnProjectUnderAgg
 * before:
 *              apply
 *         /              \
 * Input(output:b)        agg
 *                         |
 *                  Project(output:a)
 *                         |
 *              Filter(correlated predicate(Input.e = this.f)/Unapply predicate)
 *                          |
 *                         child
 *              apply
 *         /              \
 * Input(output:b)        agg
 *                         |
 *              Filter(correlated predicate(Input.e = this.f)/Unapply predicate)
 *                         |
 *                  Project(output:a,this.f, Unapply predicate(slots))
 *                          |
 *                         child

ScalarToJoin
 * UnCorrelated -> CROSS_JOIN
 * Correlated -> LEFT_OUTER_JOIN
InToJoin
 * Not In -> LEFT_ANTI_JOIN
 * In -> LEFT_SEMI_JOIN
existsToJoin
 * Exists
 *    Correlated -> LEFT_SEMI_JOIN
 *      correlated                  LEFT_SEMI_JOIN(Correlated Predicate)
 *      /       \         -->       /           \
 *    input    queryPlan          input        queryPlan
 *
 *    UnCorrelated -> CROSS_JOIN(limit(1))
 *      uncorrelated                    CROSS_JOIN
 *      /           \          -->      /       \
 *    input        queryPlan          input    limit(1)
 *                                               |
 *                                             queryPlan
 *
 * Not Exists
 *    Correlated -> LEFT_ANTI_JOIN
 *      correlated                  LEFT_ANTI_JOIN(Correlated Predicate)
 *       /       \         -->       /           \
 *     input    queryPlan          input        queryPlan
 *
 *   UnCorrelated -> CROSS_JOIN(Count(*))
 *                                    Filter(count(*) = 0)
 *                                          |
 *         apply                       Cross_Join
 *      /       \         -->       /           \
 *    input    queryPlan          input       agg(output:count(*))
 *                                               |
 *                                             limit(1)
 *                                               |
 *                                             queryPlan

Checklist(Required)

  1. Does it affect the original behavior:
    • Yes
    • No
    • I don't know
  2. Has unit tests been added:
    • Yes
    • No
    • No Need
  3. Has document been added or modified:
    • Yes
    • No
    • No Need
  4. Does it need to update dependencies:
    • Yes
    • No
  5. Are there any changes that cannot be rolled back:
    • Yes (If Yes, please explain WHY)
    • No

Further comments

If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...

return logicalPlanToSubquery.get(plan);
}

public void setLogicalPlanToSubquery(LogicalPlan plan, SubqueryExpr subqueryExpr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void setLogicalPlanToSubquery(LogicalPlan plan, SubqueryExpr subqueryExpr) {
public void setLogicalPlanToSubqueryIfAbsent(LogicalPlan plan, SubqueryExpr subqueryExpr) {

filter.child(), ctx.plannerContext));
})
),
RuleType.ANALYZE_AGGREGATE_SUBQUERY.build(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort and aggregate should not has subqueryExpr? And currently doris not support select subquery.
So you don't need to process LogicalProject, LogicalAggregate and LogicalSort.

);
}

private List<SubqueryExpr> extractSubquery(Expression expression) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need to add extractSubquery function and getAllSubquery function, the shortly function is TreeNode::collect

Suggested change
private List<SubqueryExpr> extractSubquery(Expression expression) {
expression.collect(SubqueryExpr.class::isInstance)

private LogicalPlan addScalarSubqueryCorrelatedJoins(ScalarSubquery scalarSubquery,
LogicalPlan childPlan, PlannerContext ctx) {
LogicalPlan enforce = new LogicalEnforceSingleRow<>(scalarSubquery.getQueryPlan());
scalarSubquery.setAnalyzed(true);
Copy link
Contributor

@924060929 924060929 Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Immutable expression should not has mutable field. if we need some mutable state, we should depend the expression type to compute state, and change it when replace children

e.g.
UnboundExression.isAnalyzed() = false.
other expression.isAnalyzed() = Suppliers.memoized(() -> children().allMatch(Expression::isAnalyzed)).

ctx.getSubquery(exists.getQueryPlan()).getCorrelateSlots());
}

private LogicalPlan appendApplyNode(SubqueryExpr subqueryExpr, LogicalPlan childPlan,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function not necessary, because the body is only one line and method parameter num is same, and this function is not about 'append'

public Slot visitUnboundSlot(UnboundSlot unboundSlot, Void context) {
public Slot visitUnboundSlot(UnboundSlot unboundSlot, PlannerContext context) {
List<Slot> tmpBound = bindSlot(unboundSlot, getScope().getSlots());
boolean hasCorrelate = false;
Copy link
Contributor

@924060929 924060929 Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this position should has a variable named as boolean foundInThisScope = !tmpBound.isEmpty();
and if foundInThisScope is false, then find in outerScope, if found in outerScope, then set boolean foundInOuterScope = true.

and then is the switch statement, you can declare boolean hasCorrelate = foundInOuterScope or use foundInOuterScope directly

if (tmpBound.size() == 0) {
hasCorrelate = true;
}
Optional<List<Slot>> boundedOpt = getScope()
Copy link
Contributor

@924060929 924060929 Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why compute bind slot in thisScope again, I think it should be

if (!foundInThisScope && getScope.getOuterScope().isPresent()) {
   Optional<List<Slot>> boundedOpt = getScope()
        .getOuterScope()
        .get()
        .toScopeLink()
        .stream()
        ...
}

return checkSlots.contains(slot);
}

public void addSlot(Slot slot) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkSlots is final, so can not addSlot

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and checkSlots can rename to slotSet

@jacktengg
Copy link
Contributor

Please rebase master to re-run P0 test

@wangshuo128
Copy link
Contributor

I suggest first supporting analyzing and unnesting one type of subquery, e.g., converting InSubuqery to joins. Thus we could have a unit test to review the framework and main steps of unnesting subqueries.

@zhengshiJ zhengshiJ marked this pull request as draft August 9, 2022 07:10
@github-actions github-actions bot added the area/planner Issues or PRs related to the query planner label Aug 16, 2022
@zhengshiJ zhengshiJ marked this pull request as ready for review August 19, 2022 07:58
@zhengshiJ zhengshiJ force-pushed the subToapp branch 5 times, most recently from 47be108 to 89d0bc5 Compare August 31, 2022 02:05
@zhengshiJ zhengshiJ force-pushed the subToapp branch 3 times, most recently from 9347fc2 to 315945b Compare September 1, 2022 07:12
/**
* Use the visitor to iterate sub expression.
*/
private static class SubExprAnalyzer<C> extends DefaultExpressionRewriter<C> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static class SubExprAnalyzer<C> extends DefaultExpressionRewriter<C> {
private static class SubExprAnalyzer extends DefaultExpressionRewriter<PlannerContext> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

assertNumRowsElement.getSubqueryString(), translateAsserTion(assertNumRowsElement.getAssertion()));
}

private static org.apache.doris.analysis.AssertNumRowsElement.Assertion translateAsserTion(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static org.apache.doris.analysis.AssertNumRowsElement.Assertion translateAsserTion(
private static org.apache.doris.analysis.AssertNumRowsElement.Assertion translateAssertion(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

case GE:
return Assertion.GE;
default:
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return null is safe? should we throw a unsupported type exception here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

/**
* Convert expressions with subqueries in filter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Convert expressions with subqueries in filter.
* The Subquery in the LogicalFilter will change to LogicalApply, so we must replace the origin Subquery
* in the LogicalFilter(the meaning is remove origin Subquery in the LogicalFilter).
*
* The replace rules are:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return replaceSubquery((SubqueryExpr) oldPredicate);
}
return newChildren.isEmpty() ? Optional.of(oldPredicate) : Optional.of(oldPredicate.withChildren(
newChildren.stream().map(expr -> expr.get()).collect(Collectors.toList())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map(expr -> expr.get())

if you not check the Optional whether is present, why not remove Optional? Or you missing filter the present Optional here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public class ApplyPullFilterOnProjectUnderAgg extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalApply(any(), logicalAggregate(logicalProject(logicalFilter())))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change any() to group()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 61 to 64
LogicalAggregate<LogicalProject<LogicalFilter<GroupPlan>>> agg = apply.right();
if (!agg.getGroupByExpressions().isEmpty()) {
return apply;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can move this condition to pattern.

logicalApply(
    group(),
    logicalAggregate(
        logicalProject(logicalFilter()))
    ).when(agg -> agg.getGroupByExpressions().isEmpty())
).when(LogicalApply::isCorrelated).then(apply -> {})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted

* |
* Filter(correlated predicate(Input.e = this.f)/Unapply predicate)
* |
* Project(output:a,child.output)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, this project maybe add some expression that origin filter need, you should note it. And it is not 'child.output'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public class PushApplyUnderProject extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalApply(any(), logicalProject()).when(LogicalApply::isCorrelated).then(apply -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change any() to group()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* input queryPlan input queryPlan
*
* UnCorrelated -> CROSS_JOIN(Count(1))
* apply Filter(count(1) = 0)
Copy link
Contributor

@924060929 924060929 Sep 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing CROSS_JOIN?
and Count(1) should change to Count(*)?

and we should add a limit(1) for performance?

  filter(equal(cnt, 0))
          |
aggregate(count(*) as cnt)
          |
      limit(1)
          |
      queryPlan

count(1) not equal to count(*), maybe wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to modify the comments here, the code uses count(*), you can add a limit

@924060929 924060929 merged commit 7f7a3a7 into apache:master Sep 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/nereids area/planner Issues or PRs related to the query planner area/vectorization kind/test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants