Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -664,14 +664,6 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla
fileScan.getTableSnapshot().ifPresent(fileQueryScanNode::setQueryTableSnapshot);
fileScan.getScanParams().ifPresent(fileQueryScanNode::setScanParams);
}
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(fileScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(scanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor);
}

Expand Down Expand Up @@ -773,21 +765,7 @@ private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan fileSca
}
Utils.execWithUncheckedException(scanNode::init);
context.addScanNode(scanNode, fileScan);
ScanNode finalScanNode = scanNode;
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(fileScan).forEach(
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context)
)
);
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(fileScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(scanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.getTopnFilterContext().translateTarget(fileScan, scanNode, context);
translateRuntimeFilter(fileScan, scanNode, context);
// Create PlanFragment
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan);
Expand All @@ -807,21 +785,7 @@ public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanTransla
context.getNereidsIdToPlanNodeIdMap().put(jdbcScan.getId(), jdbcScanNode.getId());
Utils.execWithUncheckedException(jdbcScanNode::init);
context.addScanNode(jdbcScanNode, jdbcScan);
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(jdbcScan).forEach(
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context)
)
);
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(jdbcScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(jdbcScanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}

context.getTopnFilterContext().translateTarget(jdbcScan, jdbcScanNode, context);
translateRuntimeFilter(jdbcScan, jdbcScanNode, context);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), jdbcScanNode, dataPartition);
context.addPlanFragment(planFragment);
Expand All @@ -840,21 +804,7 @@ public PlanFragment visitPhysicalOdbcScan(PhysicalOdbcScan odbcScan, PlanTransla
context.getNereidsIdToPlanNodeIdMap().put(odbcScan.getId(), odbcScanNode.getId());
Utils.execWithUncheckedException(odbcScanNode::init);
context.addScanNode(odbcScanNode, odbcScan);
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(odbcScan).forEach(
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context)
)
);
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(odbcScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(odbcScanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context);
context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context);
translateRuntimeFilter(odbcScan, odbcScanNode, context);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), odbcScanNode, dataPartition);
context.addPlanFragment(planFragment);
Expand Down Expand Up @@ -976,25 +926,10 @@ private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTran

// create scan range
Utils.execWithUncheckedException(olapScanNode::init);
// TODO: process collect scan node in one place
context.addScanNode(olapScanNode, olapScan);
// TODO: process translate runtime filter in one place
// use real plan node to present rf apply and rf generator
context.getRuntimeTranslator().ifPresent(
runtimeFilterTranslator -> runtimeFilterTranslator.getContext().getTargetListByScan(olapScan)
.forEach(expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(
expr, olapScanNode, context)
)
);
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(olapScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(olapScanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.getTopnFilterContext().translateTarget(olapScan, olapScanNode, context);

translateRuntimeFilter(olapScan, olapScanNode, context);

olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId()));
// Create PlanFragment
// TODO: use a util function to convert distribution to DataPartition
Expand All @@ -1015,6 +950,26 @@ private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTran
return planFragment;
}

private void translateRuntimeFilter(PhysicalRelation physicalRelation, ScanNode scanNode,
PlanTranslatorContext context) {
if (context.getRuntimeTranslator().isPresent()) {
RuntimeFilterTranslator runtimeFilterTranslator = context.getRuntimeTranslator().get();
for (Slot slot : runtimeFilterTranslator.getContext().getTargetListByScan(physicalRelation)) {
runtimeFilterTranslator.translateRuntimeFilterTarget(slot, scanNode, context);
}
}

// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(physicalRelation);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(scanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.getTopnFilterContext().translateTarget(physicalRelation, scanNode, context);
}

@Override
public PlanFragment visitPhysicalDeferMaterializeOlapScan(
PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, PlanTranslatorContext context) {
Expand Down Expand Up @@ -1090,21 +1045,9 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT
}
scanNode.setNereidsId(schemaScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(schemaScan.getId(), scanNode.getId());
SchemaScanNode finalScanNode = scanNode;
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(schemaScan)
.forEach(expr -> runtimeFilterGenerator
.translateRuntimeFilterTarget(expr, finalScanNode, context)
)
);
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(schemaScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(scanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}

translateRuntimeFilter(schemaScan, scanNode, context);

context.addScanNode(scanNode, schemaScan);
PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan);
context.addPlanFragment(planFragment);
Expand Down Expand Up @@ -1419,17 +1362,7 @@ public PlanFragment visitPhysicalCTEConsumer(PhysicalCTEConsumer cteConsumer,
}
}
CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor);
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
runtimeFilterTranslator.getContext().getTargetListByScan(cteConsumer).forEach(
expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(expr, cteScanNode, context)));
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(cteConsumer);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(cteScanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
translateRuntimeFilter(cteConsumer, cteScanNode, context);
context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(), cteScanNode);

return multiCastFragment;
Expand Down Expand Up @@ -2800,15 +2733,8 @@ public PlanFragment visitPhysicalLazyMaterializeOlapScan(PhysicalLazyMaterialize
olapScanNode.addTopnLazyMaterializeOutputColumns(((SlotReference) slot).getOriginalColumn().get());
}
}
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(lazyScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(olapScanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.getTopnFilterContext().translateTarget(lazyScan, olapScanNode, context);

translateRuntimeFilter(lazyScan, olapScanNode, context);

return planFragment;
}
Expand Down
9 changes: 4 additions & 5 deletions regression-test/suites/query_p0/topn_lazy/topn_lazy.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ suite("topn_lazy") {
sql """
set topn_lazy_materialization_threshold=1024;
set runtime_filter_mode=GLOBAL;
set runtime_filter_type=BLOOM_FILTER;
set TOPN_FILTER_RATIO=0.5;
set disable_join_reorder=true;
"""
Expand Down Expand Up @@ -64,6 +65,8 @@ suite("topn_lazy") {
contains("column_descs_lists[[`lo_orderkey` bigint NOT NULL, `lo_linenumber` bigint NOT NULL, `lo_custkey` int NOT NULL, `lo_partkey` int NOT NULL, `lo_suppkey` int NOT NULL, `lo_orderpriority` varchar(16) NOT NULL, `lo_shippriority` int NOT NULL, `lo_quantity` bigint NOT NULL, `lo_extendedprice` bigint NOT NULL, `lo_ordtotalprice` bigint NOT NULL, `lo_discount` bigint NOT NULL, `lo_revenue` bigint NOT NULL, `lo_supplycost` bigint NOT NULL, `lo_tax` bigint NOT NULL, `lo_commitdate` bigint NOT NULL, `lo_shipmode` varchar(11) NOT NULL], [`d_dayofweek` varchar(10) NOT NULL, `d_month` varchar(11) NOT NULL, `d_year` int NOT NULL, `d_yearmonthnum` int NOT NULL, `d_yearmonth` varchar(9) NOT NULL, `d_daynuminweek` int NOT NULL, `d_daynuminmonth` int NOT NULL, `d_daynuminyear` int NOT NULL, `d_monthnuminyear` int NOT NULL, `d_weeknuminyear` int NOT NULL, `d_sellingseason` varchar(14) NOT NULL, `d_lastdayinweekfl` int NOT NULL, `d_lastdayinmonthfl` int NOT NULL, `d_holidayfl` int NOT NULL, `d_weekdayfl` int NOT NULL]]")
contains("locations: [[3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18], [19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33]")
contains("row_ids: [__DORIS_GLOBAL_ROWID_COL__lineorder, __DORIS_GLOBAL_ROWID_COL__date]")
contains("runtime filters: RF000[bloom] -> lo_orderdate")

}


Expand Down Expand Up @@ -105,7 +108,7 @@ suite("topn_lazy") {
multiContains("VMaterializeNode", 1)
}

explain {
explain {
sql """ select *
from
customer left semi join (
Expand Down Expand Up @@ -215,10 +218,6 @@ suite("topn_lazy") {
ORDER BY u.user_id LIMIT 5;
"""

// Cleanup tables
sql """ DROP TABLE IF EXISTS users """
sql """ DROP TABLE IF EXISTS orders """

sql """
drop table if exists table_100_undef_partitions2_keys3_properties4_distributed_by52;
create table table_100_undef_partitions2_keys3_properties4_distributed_by52 (
Expand Down
Loading