From 20271f697809851e0221afc1d3d53618b6f014b9 Mon Sep 17 00:00:00 2001 From: morrySnow Date: Fri, 30 Aug 2024 14:28:33 +0800 Subject: [PATCH] [fix](Nereids) handle continuous filter or project in plan if we meet continuous project or filter in translator, we try to generate SelectNode as far as possible to avoid generate invalid plan for example ``` Filter(conjuncts 1) +-- Limit (limit 10) +-- Filter(conjuncts 2) +-- Aggregate ``` will be translated to ``` SELECT_NODE (conjuncts 1) +-- AGGREGATE_NODE (conjuncts 2) (limit 10) ``` --- .../translator/PhysicalPlanTranslator.java | 44 ++++++++++++++----- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 1201bc6e32cc72..3f6d5ccf35642b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -718,7 +718,6 @@ public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanTransla JdbcScanNode jdbcScanNode = new JdbcScanNode(context.nextPlanNodeId(), tupleDescriptor, table instanceof JdbcExternalTable); jdbcScanNode.setNereidsId(jdbcScan.getId()); - jdbcScanNode.addConjuncts(translateToLegacyConjuncts(jdbcScan.getConjuncts())); Utils.execWithUncheckedException(jdbcScanNode::init); context.addScanNode(jdbcScanNode, jdbcScan); context.getRuntimeTranslator().ifPresent( @@ -742,7 +741,6 @@ public PlanFragment visitPhysicalOdbcScan(PhysicalOdbcScan odbcScan, PlanTransla OdbcScanNode odbcScanNode = new OdbcScanNode(context.nextPlanNodeId(), tupleDescriptor, (OdbcTable) table); odbcScanNode.setNereidsId(odbcScan.getId()); - odbcScanNode.addConjuncts(translateToLegacyConjuncts(odbcScan.getConjuncts())); Utils.execWithUncheckedException(odbcScanNode::init); context.addScanNode(odbcScanNode, odbcScan); context.getRuntimeTranslator().ifPresent( @@ -1256,6 +1254,12 @@ public PlanFragment visitPhysicalFilter(PhysicalFilter filter, P MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); + if (CollectionUtils.isNotEmpty(dataStreamSink.getConjuncts()) + || CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) { + String errMsg = "generate invalid plan \n" + filter.treeString(); + LOG.warn(errMsg); + throw new AnalysisException(errMsg); + } filter.getConjuncts().stream() .map(e -> ExpressionTranslator.translate(e, context)) .forEach(dataStreamSink::addConjunct); @@ -1263,24 +1267,28 @@ public PlanFragment visitPhysicalFilter(PhysicalFilter filter, P } PlanNode planNode = inputFragment.getPlanRoot(); - Plan child = filter.child(); - while (child instanceof PhysicalLimit) { - child = ((PhysicalLimit) child).child(); - } - if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode - // this means we have filter->limit->project, need a SelectNode - || child instanceof PhysicalProject) { - // the three nodes don't support conjuncts, need create a SelectNode to filter data + // the three nodes don't support conjuncts, need create a SelectNode to filter data + if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode) { SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), planNode); selectNode.setNereidsId(filter.getId()); addConjunctsToPlanNode(filter, selectNode, context); addPlanRoot(inputFragment, selectNode, filter); } else { if (!(filter.child(0) instanceof AbstractPhysicalJoin)) { + // already have filter on this node, we should not override it, so need a new node + if (!planNode.getConjuncts().isEmpty() + // already have project on this node, filter need execute after project, so need a new node + || CollectionUtils.isNotEmpty(planNode.getProjectList()) + // already have limit on this node, filter need execute after limit, so need a new node + || planNode.hasLimit()) { + planNode = new SelectNode(context.nextPlanNodeId(), planNode); + planNode.setNereidsId(filter.getId()); + addPlanRoot(inputFragment, planNode, filter); + } addConjunctsToPlanNode(filter, planNode, context); - updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), filter); } } + updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), filter); // in ut, filter.stats may be null if (filter.getStats() != null) { inputFragment.getPlanRoot().setCardinalityAfterFilter((long) filter.getStats().getRowCount()); @@ -1864,8 +1872,15 @@ public PlanFragment visitPhysicalProject(PhysicalProject project } PlanFragment inputFragment = project.child(0).accept(this, context); - PlanNode inputPlanNode = inputFragment.getPlanRoot(); + // this means already have project on this node, filter need execute after project, so need a new node + if (CollectionUtils.isNotEmpty(inputPlanNode.getProjectList())) { + SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), inputPlanNode); + selectNode.setNereidsId(project.getId()); + addPlanRoot(inputFragment, selectNode, project); + inputPlanNode = selectNode; + } + List projectionExprs = null; List allProjectionExprs = Lists.newArrayList(); List slots = null; @@ -1903,6 +1918,11 @@ public PlanFragment visitPhysicalProject(PhysicalProject project MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); + if (CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) { + String errMsg = "generate invalid plan \n" + project.treeString(); + LOG.warn(errMsg); + throw new AnalysisException(errMsg); + } TupleDescriptor projectionTuple = generateTupleDesc(slots, null, context); dataStreamSink.setProjections(projectionExprs); dataStreamSink.setOutputTupleDesc(projectionTuple);