diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 7af2696f137ebc..34a9b1f8f4e639 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -664,14 +664,6 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla fileScan.getTableSnapshot().ifPresent(fileQueryScanNode::setQueryTableSnapshot); fileScan.getScanParams().ifPresent(fileQueryScanNode::setScanParams); } - // translate rf v2 target - List rfV2s = context.getRuntimeFilterV2Context() - .getRuntimeFilterV2ByTargetPlan(fileScan); - for (RuntimeFilterV2 rfV2 : rfV2s) { - Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context); - rfV2.setLegacyTargetNode(scanNode); - rfV2.setLegacyTargetExpr(targetExpr); - } return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor); } @@ -773,21 +765,7 @@ private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan fileSca } Utils.execWithUncheckedException(scanNode::init); context.addScanNode(scanNode, fileScan); - ScanNode finalScanNode = scanNode; - context.getRuntimeTranslator().ifPresent( - runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(fileScan).forEach( - expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context) - ) - ); - // translate rf v2 target - List rfV2s = context.getRuntimeFilterV2Context() - .getRuntimeFilterV2ByTargetPlan(fileScan); - for (RuntimeFilterV2 rfV2 : rfV2s) { - Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context); - rfV2.setLegacyTargetNode(scanNode); - rfV2.setLegacyTargetExpr(targetExpr); - } - context.getTopnFilterContext().translateTarget(fileScan, scanNode, context); + translateRuntimeFilter(fileScan, scanNode, context); // Create PlanFragment DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan); @@ -807,21 +785,7 @@ public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanTransla context.getNereidsIdToPlanNodeIdMap().put(jdbcScan.getId(), jdbcScanNode.getId()); Utils.execWithUncheckedException(jdbcScanNode::init); context.addScanNode(jdbcScanNode, jdbcScan); - context.getRuntimeTranslator().ifPresent( - runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(jdbcScan).forEach( - expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context) - ) - ); - // translate rf v2 target - List rfV2s = context.getRuntimeFilterV2Context() - .getRuntimeFilterV2ByTargetPlan(jdbcScan); - for (RuntimeFilterV2 rfV2 : rfV2s) { - Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context); - rfV2.setLegacyTargetNode(jdbcScanNode); - rfV2.setLegacyTargetExpr(targetExpr); - } - - context.getTopnFilterContext().translateTarget(jdbcScan, jdbcScanNode, context); + translateRuntimeFilter(jdbcScan, jdbcScanNode, context); DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), jdbcScanNode, dataPartition); context.addPlanFragment(planFragment); @@ -840,21 +804,7 @@ public PlanFragment visitPhysicalOdbcScan(PhysicalOdbcScan odbcScan, PlanTransla context.getNereidsIdToPlanNodeIdMap().put(odbcScan.getId(), odbcScanNode.getId()); Utils.execWithUncheckedException(odbcScanNode::init); context.addScanNode(odbcScanNode, odbcScan); - context.getRuntimeTranslator().ifPresent( - runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(odbcScan).forEach( - expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context) - ) - ); - // translate rf v2 target - List rfV2s = context.getRuntimeFilterV2Context() - .getRuntimeFilterV2ByTargetPlan(odbcScan); - for (RuntimeFilterV2 rfV2 : rfV2s) { - Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context); - rfV2.setLegacyTargetNode(odbcScanNode); - rfV2.setLegacyTargetExpr(targetExpr); - } - context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context); - context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context); + translateRuntimeFilter(odbcScan, odbcScanNode, context); DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), odbcScanNode, dataPartition); context.addPlanFragment(planFragment); @@ -976,25 +926,10 @@ private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTran // create scan range Utils.execWithUncheckedException(olapScanNode::init); - // TODO: process collect scan node in one place context.addScanNode(olapScanNode, olapScan); - // TODO: process translate runtime filter in one place - // use real plan node to present rf apply and rf generator - context.getRuntimeTranslator().ifPresent( - runtimeFilterTranslator -> runtimeFilterTranslator.getContext().getTargetListByScan(olapScan) - .forEach(expr -> runtimeFilterTranslator.translateRuntimeFilterTarget( - expr, olapScanNode, context) - ) - ); - // translate rf v2 target - List rfV2s = context.getRuntimeFilterV2Context() - .getRuntimeFilterV2ByTargetPlan(olapScan); - for (RuntimeFilterV2 rfV2 : rfV2s) { - Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context); - rfV2.setLegacyTargetNode(olapScanNode); - rfV2.setLegacyTargetExpr(targetExpr); - } - context.getTopnFilterContext().translateTarget(olapScan, olapScanNode, context); + + translateRuntimeFilter(olapScan, olapScanNode, context); + olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId())); // Create PlanFragment // TODO: use a util function to convert distribution to DataPartition @@ -1015,6 +950,26 @@ private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTran return planFragment; } + private void translateRuntimeFilter(PhysicalRelation physicalRelation, ScanNode scanNode, + PlanTranslatorContext context) { + if (context.getRuntimeTranslator().isPresent()) { + RuntimeFilterTranslator runtimeFilterTranslator = context.getRuntimeTranslator().get(); + for (Slot slot : runtimeFilterTranslator.getContext().getTargetListByScan(physicalRelation)) { + runtimeFilterTranslator.translateRuntimeFilterTarget(slot, scanNode, context); + } + } + + // translate rf v2 target + List rfV2s = context.getRuntimeFilterV2Context() + .getRuntimeFilterV2ByTargetPlan(physicalRelation); + for (RuntimeFilterV2 rfV2 : rfV2s) { + Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context); + rfV2.setLegacyTargetNode(scanNode); + rfV2.setLegacyTargetExpr(targetExpr); + } + context.getTopnFilterContext().translateTarget(physicalRelation, scanNode, context); + } + @Override public PlanFragment visitPhysicalDeferMaterializeOlapScan( PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, PlanTranslatorContext context) { @@ -1090,21 +1045,9 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT } scanNode.setNereidsId(schemaScan.getId()); context.getNereidsIdToPlanNodeIdMap().put(schemaScan.getId(), scanNode.getId()); - SchemaScanNode finalScanNode = scanNode; - context.getRuntimeTranslator().ifPresent( - runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(schemaScan) - .forEach(expr -> runtimeFilterGenerator - .translateRuntimeFilterTarget(expr, finalScanNode, context) - ) - ); - // translate rf v2 target - List rfV2s = context.getRuntimeFilterV2Context() - .getRuntimeFilterV2ByTargetPlan(schemaScan); - for (RuntimeFilterV2 rfV2 : rfV2s) { - Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context); - rfV2.setLegacyTargetNode(scanNode); - rfV2.setLegacyTargetExpr(targetExpr); - } + + translateRuntimeFilter(schemaScan, scanNode, context); + context.addScanNode(scanNode, schemaScan); PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan); context.addPlanFragment(planFragment); @@ -1419,17 +1362,7 @@ public PlanFragment visitPhysicalCTEConsumer(PhysicalCTEConsumer cteConsumer, } } CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor); - context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator -> - runtimeFilterTranslator.getContext().getTargetListByScan(cteConsumer).forEach( - expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(expr, cteScanNode, context))); - // translate rf v2 target - List rfV2s = context.getRuntimeFilterV2Context() - .getRuntimeFilterV2ByTargetPlan(cteConsumer); - for (RuntimeFilterV2 rfV2 : rfV2s) { - Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context); - rfV2.setLegacyTargetNode(cteScanNode); - rfV2.setLegacyTargetExpr(targetExpr); - } + translateRuntimeFilter(cteConsumer, cteScanNode, context); context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(), cteScanNode); return multiCastFragment; @@ -2800,15 +2733,8 @@ public PlanFragment visitPhysicalLazyMaterializeOlapScan(PhysicalLazyMaterialize olapScanNode.addTopnLazyMaterializeOutputColumns(((SlotReference) slot).getOriginalColumn().get()); } } - // translate rf v2 target - List rfV2s = context.getRuntimeFilterV2Context() - .getRuntimeFilterV2ByTargetPlan(lazyScan); - for (RuntimeFilterV2 rfV2 : rfV2s) { - Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context); - rfV2.setLegacyTargetNode(olapScanNode); - rfV2.setLegacyTargetExpr(targetExpr); - } - context.getTopnFilterContext().translateTarget(lazyScan, olapScanNode, context); + + translateRuntimeFilter(lazyScan, olapScanNode, context); return planFragment; } diff --git a/regression-test/suites/query_p0/topn_lazy/topn_lazy.groovy b/regression-test/suites/query_p0/topn_lazy/topn_lazy.groovy index a8cede02d68423..79faad12a1145b 100644 --- a/regression-test/suites/query_p0/topn_lazy/topn_lazy.groovy +++ b/regression-test/suites/query_p0/topn_lazy/topn_lazy.groovy @@ -19,6 +19,7 @@ suite("topn_lazy") { sql """ set topn_lazy_materialization_threshold=1024; set runtime_filter_mode=GLOBAL; + set runtime_filter_type=BLOOM_FILTER; set TOPN_FILTER_RATIO=0.5; set disable_join_reorder=true; """ @@ -64,6 +65,8 @@ suite("topn_lazy") { contains("column_descs_lists[[`lo_orderkey` bigint NOT NULL, `lo_linenumber` bigint NOT NULL, `lo_custkey` int NOT NULL, `lo_partkey` int NOT NULL, `lo_suppkey` int NOT NULL, `lo_orderpriority` varchar(16) NOT NULL, `lo_shippriority` int NOT NULL, `lo_quantity` bigint NOT NULL, `lo_extendedprice` bigint NOT NULL, `lo_ordtotalprice` bigint NOT NULL, `lo_discount` bigint NOT NULL, `lo_revenue` bigint NOT NULL, `lo_supplycost` bigint NOT NULL, `lo_tax` bigint NOT NULL, `lo_commitdate` bigint NOT NULL, `lo_shipmode` varchar(11) NOT NULL], [`d_dayofweek` varchar(10) NOT NULL, `d_month` varchar(11) NOT NULL, `d_year` int NOT NULL, `d_yearmonthnum` int NOT NULL, `d_yearmonth` varchar(9) NOT NULL, `d_daynuminweek` int NOT NULL, `d_daynuminmonth` int NOT NULL, `d_daynuminyear` int NOT NULL, `d_monthnuminyear` int NOT NULL, `d_weeknuminyear` int NOT NULL, `d_sellingseason` varchar(14) NOT NULL, `d_lastdayinweekfl` int NOT NULL, `d_lastdayinmonthfl` int NOT NULL, `d_holidayfl` int NOT NULL, `d_weekdayfl` int NOT NULL]]") contains("locations: [[3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18], [19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33]") contains("row_ids: [__DORIS_GLOBAL_ROWID_COL__lineorder, __DORIS_GLOBAL_ROWID_COL__date]") + contains("runtime filters: RF000[bloom] -> lo_orderdate") + } @@ -105,7 +108,7 @@ suite("topn_lazy") { multiContains("VMaterializeNode", 1) } - explain { + explain { sql """ select * from customer left semi join ( @@ -215,10 +218,6 @@ suite("topn_lazy") { ORDER BY u.user_id LIMIT 5; """ - // Cleanup tables - sql """ DROP TABLE IF EXISTS users """ - sql """ DROP TABLE IF EXISTS orders """ - sql """ drop table if exists table_100_undef_partitions2_keys3_properties4_distributed_by52; create table table_100_undef_partitions2_keys3_properties4_distributed_by52 (