From dcbdf4e4b155792652234a8578f3ed33b0519fc6 Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Wed, 15 Nov 2023 14:52:28 +0800 Subject: [PATCH 1/3] [fix](nereids)fix bug that query infomation_schema.rowsets fe send fragment to one of muilti be. --- .../glue/translator/PhysicalPlanTranslator.java | 15 ++++++++++++--- .../planner/BackendPartitionedSchemaScanNode.java | 7 +++++++ .../org/apache/doris/planner/SchemaScanNode.java | 2 +- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 53ffb36e1e57ed..79b16cd46db600 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -140,6 +140,7 @@ import org.apache.doris.planner.AggregationNode; import org.apache.doris.planner.AnalyticEvalNode; import org.apache.doris.planner.AssertNumRowsNode; +import org.apache.doris.planner.BackendPartitionedSchemaScanNode; import org.apache.doris.planner.CTEScanNode; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataStreamSink; @@ -725,13 +726,21 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT Table table = schemaScan.getTable(); List slots = ImmutableList.copyOf(schemaScan.getOutput()); TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); - SchemaScanNode scanNode = new SchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor); + SchemaScanNode scanNode = null; + if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable( + table.getName())) { + scanNode = new BackendPartitionedSchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor); + } else { + scanNode = new SchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor); + } + SchemaScanNode finalScanNode = scanNode; context.getRuntimeTranslator().ifPresent( runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(schemaScan.getRelationId()) - .forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context) + .forEach(expr -> runtimeFilterGenerator + .translateRuntimeFilterTarget(expr, finalScanNode, context) ) ); - scanNode.finalizeForNereids(); + Utils.execWithUncheckedException(scanNode::finalizeForNereids); context.addScanNode(scanNode); PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan); context.addPlanFragment(planFragment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java index 53f48c60e8fa59..592fc3c96cfe9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java @@ -88,6 +88,13 @@ public void finalize(Analyzer analyzer) throws UserException { createScanRangeLocations(); } + @Override + public void finalizeForNereids() throws UserException { + computeColumnsFilter(); + computePartitionInfo(); + createScanRangeLocations(); + } + @Override public List getScanRangeLocations(long maxScanRangeLength) { return scanRangeLocations; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java index 595d09792fe99b..1e7d339bc40955 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java @@ -85,7 +85,7 @@ public void finalize(Analyzer analyzer) throws UserException { } @Override - public void finalizeForNereids() { + public void finalizeForNereids() throws UserException { // Convert predicates to MySQL columns and filters. frontendIP = FrontendOptions.getLocalHostAddress(); frontendPort = Config.rpc_port; From 52e7319a11de87246bc7913e6ae6fcaa49c8b3ea Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Wed, 15 Nov 2023 15:58:29 +0800 Subject: [PATCH 2/3] add comment --- .../doris/nereids/glue/translator/PhysicalPlanTranslator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 79b16cd46db600..5d92125263e1b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -726,6 +726,9 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT Table table = schemaScan.getTable(); List slots = ImmutableList.copyOf(schemaScan.getOutput()); TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); + + // For the information_schema.rowsets table, the scan fragment needs to be sent to all BEs. + // For other information_schema tables, the scan fragment only needs to be sent to one of the BEs. SchemaScanNode scanNode = null; if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable( table.getName())) { From 4ea18bd481ef2849ed9372818cc0f6d36ed46136 Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Wed, 15 Nov 2023 19:45:38 +0800 Subject: [PATCH 3/3] fix format --- .../src/main/java/org/apache/doris/planner/SchemaScanNode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java index 1e7d339bc40955..4a8a488dfc3963 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java @@ -85,7 +85,7 @@ public void finalize(Analyzer analyzer) throws UserException { } @Override - public void finalizeForNereids() throws UserException { + public void finalizeForNereids() throws UserException { // Convert predicates to MySQL columns and filters. frontendIP = FrontendOptions.getLocalHostAddress(); frontendPort = Config.rpc_port;