Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnary;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;

Expand All @@ -37,7 +38,8 @@
/**
* Represent an olap table sink plan node that has not been bound.
*/
public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> implements Unbound {
public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE> implements Unbound, Sink {

private final List<String> nameParts;
private final List<String> colNames;
private final List<String> hints;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ?
return join;
}

public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext ctx) {
public PhysicalRelation visitPhysicalRelation(PhysicalRelation scan, CascadesContext ctx) {
scan.setMutableState(AbstractPlan.FRAGMENT_ID, frId);
return scan;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
Expand Down Expand Up @@ -236,11 +237,18 @@ public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> project
}

@Override
public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext context) {
public Plan visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation, CascadesContext context) {
// TODO: OneRowRelation will be translated to union. Union node cannot apply runtime filter now
// so, just return itself now, until runtime filter could apply on any node.
return oneRowRelation;
}

@Override
public PhysicalRelation visitPhysicalRelation(PhysicalRelation relation, CascadesContext context) {
// add all the slots in map.
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot, Pair.of(scan, slot)));
return scan;
relation.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot, Pair.of(relation, slot)));
return relation;
}

private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public PhysicalFilter visitPhysicalFilter(PhysicalFilter<? extends Plan> filter,
}

@Override
public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext context) {
public PhysicalRelation visitPhysicalRelation(PhysicalRelation scan, CascadesContext context) {
RuntimeFilterContext rfCtx = context.getRuntimeFilterContext();
List<Slot> slots = rfCtx.getTargetOnOlapScanNodeMap().get(scan.getRelationId());
if (slots != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
Expand All @@ -47,12 +46,12 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
Expand Down Expand Up @@ -92,26 +91,20 @@ public PhysicalProperties getOutputProperties(GroupExpression groupExpression) {
return groupExpression.getPlan().accept(this, new PlanContext(groupExpression));
}

@Override
public PhysicalProperties visit(Plan plan, PlanContext context) {
return PhysicalProperties.ANY;
}

/* ********************************************************************************************
* sink Node, in lexicographical order
* ******************************************************************************************** */

@Override
public PhysicalProperties visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, PlanContext context) {
public PhysicalProperties visitPhysicalSink(PhysicalSink<? extends Plan> physicalSink, PlanContext context) {
return PhysicalProperties.GATHER;
}

@Override
public PhysicalProperties visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends Plan> olapTableSink,
PlanContext context) {
return PhysicalProperties.GATHER;
}

@Override
public PhysicalProperties visit(Plan plan, PlanContext context) {
return PhysicalProperties.ANY;
}

/* ********************************************************************************************
* Leaf Plan Node, in lexicographical order
* ******************************************************************************************** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
Expand Down Expand Up @@ -63,16 +62,9 @@ public Plan visitLogicalCTEAnchor(LogicalCTEAnchor<? extends Plan, ? extends Pla
// we should keep that sink node is the top node of the plan tree.
// currently, it's one of the olap table sink and file sink.
@Override
public Plan visitLogicalOlapTableSink(LogicalOlapTableSink<? extends Plan> olapTableSink,
StatementContext context) {
Plan child = olapTableSink.child().accept(this, context);
return olapTableSink.withChildren(child);
}

@Override
public Plan visitLogicalFileSink(LogicalFileSink<? extends Plan> fileSink, StatementContext context) {
Plan child = fileSink.child().accept(this, context);
return fileSink.withChildren(child);
public Plan visitLogicalSink(LogicalSink<? extends Plan> logicalSink, StatementContext context) {
Plan child = logicalSink.child().accept(this, context);
return logicalSink.withChildren(child);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.trees.plans.logical.OutputPrunable;
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
Expand Down Expand Up @@ -162,13 +161,8 @@ public Plan visitLogicalIntersect(LogicalIntersect intersect, PruneContext conte
}

@Override
public Plan visitLogicalOlapTableSink(LogicalOlapTableSink<? extends Plan> olapTableSink, PruneContext context) {
return skipPruneThisAndFirstLevelChildren(olapTableSink);
}

@Override
public Plan visitLogicalFileSink(LogicalFileSink<? extends Plan> fileSink, PruneContext context) {
return skipPruneThisAndFirstLevelChildren(fileSink);
public Plan visitLogicalSink(LogicalSink<? extends Plan> logicalSink, PruneContext context) {
return skipPruneThisAndFirstLevelChildren(logicalSink);
}

// the backend not support filter(project(agg)), so we can not prune the key set in the agg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;

Expand Down Expand Up @@ -77,14 +76,7 @@ public LogicalCTEProducer<Plan> visitLogicalCTEProducer(LogicalCTEProducer<? ext
// we should keep that sink node is the top node of the plan tree.
// currently, it's one of the olap table sink and file sink.
@Override
public Plan visitLogicalOlapTableSink(LogicalOlapTableSink<? extends Plan> olapTableSink,
List<LogicalCTEProducer<Plan>> producers) {
return olapTableSink.withChildren(rewriteRoot(olapTableSink.child(), producers));
}

@Override
public Plan visitLogicalFileSink(LogicalFileSink<? extends Plan> fileSink,
List<LogicalCTEProducer<Plan>> producers) {
return fileSink.withChildren(rewriteRoot(fileSink.child(), producers));
public Plan visitLogicalSink(LogicalSink<? extends Plan> logicalSink, List<LogicalCTEProducer<Plan>> producers) {
return logicalSink.withChildren(rewriteRoot(logicalSink.child(), producers));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,19 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
Expand All @@ -89,7 +88,6 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
Expand All @@ -99,13 +97,13 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
Expand Down Expand Up @@ -242,12 +240,7 @@ we record the lowest expression cost as group cost to avoid missing this group.
}

@Override
public Statistics visitLogicalOlapTableSink(LogicalOlapTableSink<? extends Plan> olapTableSink, Void context) {
return groupExpression.childStatistics(0);
}

@Override
public Statistics visitLogicalFileSink(LogicalFileSink<? extends Plan> fileSink, Void context) {
public Statistics visitLogicalSink(LogicalSink<? extends Plan> logicalSink, Void context) {
return groupExpression.childStatistics(0);
}

Expand Down Expand Up @@ -379,12 +372,7 @@ public Statistics visitLogicalWindow(LogicalWindow<? extends Plan> window, Void
}

@Override
public Statistics visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends Plan> olapTableSink, Void context) {
return groupExpression.childStatistics(0);
}

@Override
public Statistics visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, Void context) {
public Statistics visitPhysicalSink(PhysicalSink<? extends Plan> physicalSink, Void context) {
return groupExpression.childStatistics(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
Expand All @@ -48,13 +47,13 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
Expand Down Expand Up @@ -375,17 +374,9 @@ public Plan visitLogicalWindow(LogicalWindow<? extends Plan> window, DeepCopierC
}

@Override
public Plan visitLogicalOlapTableSink(LogicalOlapTableSink<? extends Plan> olapTableSink,
DeepCopierContext context) {
Plan child = olapTableSink.child().accept(this, context);
return new LogicalOlapTableSink<>(olapTableSink.getDatabase(), olapTableSink.getTargetTable(),
olapTableSink.getCols(), olapTableSink.getPartitionIds(), child);
}

@Override
public Plan visitLogicalFileSink(LogicalFileSink<? extends Plan> fileSink, DeepCopierContext context) {
Plan child = fileSink.child().accept(this, context);
return fileSink.withChildren(child);
public Plan visitLogicalSink(LogicalSink<? extends Plan> logicalSink, DeepCopierContext context) {
Plan child = logicalSink.child().accept(this, context);
return logicalSink.withChildren(child);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.algebra;

/**
* traits for all sink
*/
public interface Sink {
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;

import com.google.common.base.Preconditions;
Expand All @@ -37,7 +38,8 @@
/**
* logicalFileSink for select into outfile
*/
public class LogicalFileSink<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> {
public class LogicalFileSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE> implements Sink {

private final String filePath;
private final String format;
private final Map<String, String> properties;
Expand Down
Loading