Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids.analyzer;

import org.apache.doris.nereids.exceptions.UnboundException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.UnboundLogicalProperties;
Expand Down Expand Up @@ -143,6 +144,6 @@ public LogicalProperties computeLogicalProperties() {

@Override
public List<Slot> computeOutput() {
return child().getOutput();
throw new UnboundException("output");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.analyzer;

import org.apache.doris.nereids.exceptions.UnboundException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;

import com.google.common.base.Preconditions;

import java.util.List;
import java.util.Optional;

/**
* unbound result sink
*/
public class UnboundResultSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE> implements Unbound, Sink {

public UnboundResultSink(CHILD_TYPE child) {
super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, child);
}

public UnboundResultSink(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, groupExpression, logicalProperties, child);
}

@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "UnboundResultSink only accepts one child");
return new UnboundResultSink<>(groupExpression, Optional.of(getLogicalProperties()), children.get(0));
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitUnboundResultSink(this, context);
}

@Override
public List<? extends Expression> getExpressions() {
throw new UnsupportedOperationException(this.getClass().getSimpleName() + " don't support getExpression()");
}

@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundResultSink<>(groupExpression, Optional.of(getLogicalProperties()), child());
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "UnboundResultSink only accepts one child");
return new UnboundResultSink<>(groupExpression, logicalProperties, children.get(0));

}

@Override
public List<Slot> computeOutput() {
throw new UnboundException("output");
}

@Override
public String toString() {
return Utils.toSqlString("UnboundResultSink[" + id.asInt() + "]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
Expand Down Expand Up @@ -318,6 +319,12 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> d
* sink Node, in lexicographical order
* ******************************************************************************************** */

@Override
public PlanFragment visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink,
PlanTranslatorContext context) {
return physicalResultSink.child().accept(this, context);
}

@Override
public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends Plan> olapTableSink,
PlanTranslatorContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundStar;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
Expand Down Expand Up @@ -311,7 +312,12 @@ public LogicalPlan visitSingleStatement(SingleStatementContext ctx) {
@Override
public LogicalPlan visitStatementDefault(StatementDefaultContext ctx) {
LogicalPlan plan = plan(ctx.query());
return withExplain(withOutFile(plan, ctx.outFileClause()), ctx.explain());
if (ctx.outFileClause() != null) {
plan = withOutFile(plan, ctx.outFileClause());
} else {
plan = new UnboundResultSink<>(plan);
}
return withExplain(plan, ctx.explain());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
Expand Down Expand Up @@ -102,6 +103,12 @@ public Void visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends Plan> ola
return null;
}

@Override
public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
return null;
}

/* ********************************************************************************************
* Other Node, in lexicographical order
* ******************************************************************************************** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.doris.nereids.rules.implementation.LogicalPartitionTopNToPhysicalPartitionTopN;
import org.apache.doris.nereids.rules.implementation.LogicalProjectToPhysicalProject;
import org.apache.doris.nereids.rules.implementation.LogicalRepeatToPhysicalRepeat;
import org.apache.doris.nereids.rules.implementation.LogicalResultSinkToPhysicalResultSink;
import org.apache.doris.nereids.rules.implementation.LogicalSchemaScanToPhysicalSchemaScan;
import org.apache.doris.nereids.rules.implementation.LogicalSortToPhysicalQuickSort;
import org.apache.doris.nereids.rules.implementation.LogicalTVFRelationToPhysicalTVFRelation;
Expand Down Expand Up @@ -161,6 +162,7 @@ public class RuleSet {
.add(new LogicalGenerateToPhysicalGenerate())
.add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
.add(new LogicalFileSinkToPhysicalFileSink())
.add(new LogicalResultSinkToPhysicalResultSink())
.build();

public static final List<Rule> ZIG_ZAG_TREE_JOIN_REORDER = planRuleFactories()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public enum RuleType {
// binding rules

// **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in the rewrite rules. ****
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
BINDING_NON_LEAF_LOGICAL_PLAN(RuleTypeClass.REWRITE),
BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE),
BINDING_RELATION(RuleTypeClass.REWRITE),
Expand Down Expand Up @@ -299,6 +300,7 @@ public enum RuleType {
LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ASSERT_NUM_ROWS_TO_PHYSICAL_ASSERT_NUM_ROWS(RuleTypeClass.IMPLEMENTATION),
STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSetOperation;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
Expand Down Expand Up @@ -555,6 +556,14 @@ protected boolean condition(Rule rule, Plan plan) {
checkSameNameSlot(subQueryAlias.child(0).getOutput(), subQueryAlias.getAlias());
return subQueryAlias;
})
),
RuleType.BINDING_RESULT_SINK.build(
unboundResultSink().then(sink -> {
List<NamedExpression> outputExprs = sink.child().getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList());
return new LogicalResultSink<>(outputExprs, sink.child());
})
)
).stream().map(ruleCondition).collect(ImmutableList.toImmutableList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.Unbound;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.pattern.MatchingContext;
Expand Down Expand Up @@ -238,6 +239,10 @@ private Plan parseAndAnalyzeHiveView(TableIf table, CascadesContext cascadesCont

private Plan parseAndAnalyzeView(String viewSql, CascadesContext parentContext) {
LogicalPlan parsedViewPlan = new NereidsParser().parseSingle(viewSql);
// TODO: use a good to do this, such as eliminate UnboundResultSink
if (parsedViewPlan instanceof UnboundResultSink) {
parsedViewPlan = (LogicalPlan) ((UnboundResultSink<?>) parsedViewPlan).child();
}
CascadesContext viewContext = CascadesContext.initContext(
parentContext.getStatementContext(), parsedViewPlan, PhysicalProperties.ANY);
viewContext.newAnalyzer().analyze();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.implementation;

import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;

import java.util.Optional;

/**
* implement result sink.
*/
public class LogicalResultSinkToPhysicalResultSink extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalResultSink().thenApply(ctx -> {
LogicalResultSink<? extends Plan> sink = ctx.root;
return new PhysicalResultSink<>(
sink.getOutputExprs(),
Optional.empty(),
sink.getLogicalProperties(),
sink.child());
}).toRule(RuleType.LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public Plan visitLogicalCTEAnchor(LogicalCTEAnchor<? extends Plan, ? extends Pla
// currently, it's one of the olap table sink and file sink.
@Override
public Plan visitLogicalSink(LogicalSink<? extends Plan> logicalSink, StatementContext context) {
Plan child = logicalSink.child().accept(this, context);
return logicalSink.withChildren(child);
return super.visit(logicalSink, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;

Expand Down Expand Up @@ -72,11 +71,4 @@ public LogicalCTEProducer<Plan> visitLogicalCTEProducer(LogicalCTEProducer<? ext
producers.addAll(childProducers);
return newProducer;
}

// we should keep that sink node is the top node of the plan tree.
// currently, it's one of the olap table sink and file sink.
@Override
public Plan visitLogicalSink(LogicalSink<? extends Plan> logicalSink, List<LogicalCTEProducer<Plan>> producers) {
return logicalSink.withChildren(rewriteRoot(logicalSink.child(), producers));
}
}
Loading