From 3fbb022a59ba2c6676c51e208a59f7a632b3b3e5 Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Wed, 15 Nov 2023 14:52:28 +0800 Subject: [PATCH] [fix](nereids)fix bug that query infomation_schema.rowsets fe send fragment to one of muilti be. --- .../glue/translator/PhysicalPlanTranslator.java | 15 ++++++++++++--- .../planner/BackendPartitionedSchemaScanNode.java | 7 +++++++ .../org/apache/doris/planner/SchemaScanNode.java | 2 +- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 58e0cd80c74ce4..28d28a10d4fc97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -139,6 +139,7 @@ import org.apache.doris.planner.AggregationNode; import org.apache.doris.planner.AnalyticEvalNode; import org.apache.doris.planner.AssertNumRowsNode; +import org.apache.doris.planner.BackendPartitionedSchemaScanNode; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataStreamSink; import org.apache.doris.planner.EmptySetNode; @@ -711,13 +712,21 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT Table table = schemaScan.getTable(); List slots = ImmutableList.copyOf(schemaScan.getOutput()); TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); - SchemaScanNode scanNode = new SchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor); + SchemaScanNode scanNode = null; + if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable( + table.getName())) { + scanNode = new BackendPartitionedSchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor); + } else { + scanNode = new SchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor); + } + SchemaScanNode finalScanNode = scanNode; context.getRuntimeTranslator().ifPresent( runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(schemaScan.getRelationId()) - .forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context) + .forEach(expr -> runtimeFilterGenerator + .translateRuntimeFilterTarget(expr, finalScanNode, context) ) ); - scanNode.finalizeForNereids(); + Utils.execWithUncheckedException(scanNode::finalizeForNereids); context.addScanNode(scanNode); PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan); context.addPlanFragment(planFragment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java index 53f48c60e8fa59..592fc3c96cfe9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java @@ -88,6 +88,13 @@ public void finalize(Analyzer analyzer) throws UserException { createScanRangeLocations(); } + @Override + public void finalizeForNereids() throws UserException { + computeColumnsFilter(); + computePartitionInfo(); + createScanRangeLocations(); + } + @Override public List getScanRangeLocations(long maxScanRangeLength) { return scanRangeLocations; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java index 595d09792fe99b..1e7d339bc40955 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java @@ -85,7 +85,7 @@ public void finalize(Analyzer analyzer) throws UserException { } @Override - public void finalizeForNereids() { + public void finalizeForNereids() throws UserException { // Convert predicates to MySQL columns and filters. frontendIP = FrontendOptions.getLocalHostAddress(); frontendPort = Config.rpc_port;