From b36b19786aab22d781ba41d71377d0ab3cbe6d3e Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 16 Jan 2025 21:29:50 +0800 Subject: [PATCH 1/5] fix missing rf when target not exist on backend where the producer is located fix update fix update update fix update case enlarge timeout update --- be/src/exprs/runtime_filter.cpp | 15 +- .../join/test_low_bucket/test_low_bucket.out | 151 ++++++++++ .../test_low_bucket/test_low_bucket.groovy | 284 ++++++++++++++++++ 3 files changed, 442 insertions(+), 8 deletions(-) create mode 100644 regression-test/data/query_p0/join/test_low_bucket/test_low_bucket.out create mode 100644 regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 58ec8cce5c4993..b46e654f6b1188 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -323,10 +323,10 @@ Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) { return Status::OK(); }; auto do_merge = [&]() { - if (!_state->global_runtime_filter_mgr()->get_consume_filters(_filter_id).empty()) { - LocalMergeFilters* local_merge_filters = nullptr; - RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters( - _filter_id, &local_merge_filters)); + if (auto* local_merge_filters = + _state->global_runtime_filter_mgr()->get_local_merge_producer_filters( + _filter_id); + local_merge_filters) { local_merge_filters->merge_watcher.start(); std::lock_guard l(*local_merge_filters->lock); RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get())); @@ -417,10 +417,9 @@ class SyncSizeClosure : public AutoReleaseClosureglobal_runtime_filter_mgr()->get_consume_filters(_filter_id).empty()) { - LocalMergeFilters* local_merge_filters = nullptr; - RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters( - _filter_id, &local_merge_filters)); + if (auto* local_merge_filters = + _state->global_runtime_filter_mgr()->get_local_merge_producer_filters(_filter_id); + local_merge_filters) { std::lock_guard l(*local_merge_filters->lock); local_merge_filters->merge_size_times--; local_merge_filters->local_merged_size += local_filter_size; diff --git a/regression-test/data/query_p0/join/test_low_bucket/test_low_bucket.out b/regression-test/data/query_p0/join/test_low_bucket/test_low_bucket.out new file mode 100644 index 00000000000000..e9f9e894ac82c2 --- /dev/null +++ b/regression-test/data/query_p0/join/test_low_bucket/test_low_bucket.out @@ -0,0 +1,151 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + +-- !test -- +480 + diff --git a/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy b/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy new file mode 100644 index 00000000000000..7e39804c17ed7a --- /dev/null +++ b/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_low_bucket") { + sql "set runtime_filter_wait_infinitely = true" + sql "set parallel_pipeline_task_num = 4" + + sql """ DROP TABLE IF EXISTS ads_income_statistics; """ + sql """ DROP TABLE IF EXISTS eq_group; """ + sql """ DROP TABLE IF EXISTS eq_label_setting_relation; """ + + // ads_income_statistics must be set to 1 buckets to reproduce the problem + sql """ + CREATE TABLE ads_income_statistics ( + statistics_date DATE, + distributor_id INT, + eq_group_id INT, + eq_id INT, + other_column VARCHAR(100) -- 示例其他列 + ) duplicate key (statistics_date) + PARTITION BY RANGE(`statistics_date`) + (PARTITION p20210117 VALUES [('2025-01-01'), ('2025-01-10')), + PARTITION p20210118 VALUES [('2025-01-10'), ('2025-01-19'))) + distributed BY hash(statistics_date) buckets 1 + properties("replication_num" = "1"); + """ + + sql """ + CREATE TABLE eq_group ( + eq_group_id INT, + group_name VARCHAR(100) -- 示例列 + ) duplicate key (eq_group_id) + properties("replication_num" = "1"); + """ + + sql """ + CREATE TABLE eq_label_setting_relation ( + eq_id INT, + eq_label_setting_id INT + ) duplicate key (eq_id) + properties("replication_num" = "1"); + """ + + sql """ + INSERT INTO ads_income_statistics + (statistics_date, distributor_id, eq_group_id, eq_id, other_column) + VALUES + ('2025-01-01', 1537918, 2000, 3000, 'test1'), + ('2025-01-10', 1537918, 2001, 3001, 'test2'), + ('2025-01-05', 1537918, 1000, 3002, 'test3'), -- 被排除的记录 + ('2025-01-12', 1537918, 2000, 3003, 'test4'), + ('2025-01-01', 1537918, 2000, 3000, 'test1'), + ('2025-01-10', 1537918, 2001, 3001, 'test2'), + ('2025-01-05', 1537918, 1000, 3002, 'test3'), -- 被排除的记录 + ('2025-01-12', 1537918, 2000, 3003, 'test4'), + ('2025-01-01', 1537918, 2000, 3000, 'test1'), + ('2025-01-10', 1537918, 2001, 3001, 'test2'), + ('2025-01-05', 1537918, 1000, 3002, 'test3'), -- 被排除的记录 + ('2025-01-12', 1537918, 2000, 3003, 'test4'), + ('2025-01-01', 1537918, 2000, 3000, 'test1'), + ('2025-01-10', 1537918, 2001, 3001, 'test2'), + ('2025-01-05', 1537918, 1000, 3002, 'test3'), -- 被排除的记录 + ('2025-01-12', 1537918, 2000, 3003, 'test4'); + """ + + sql """ + INSERT INTO eq_group (eq_group_id, group_name) + VALUES + (2000, 'Group A'), + (2001, 'Group B'), + (1000, 'Excluded Group'), + (2000, 'Group A'), + (2001, 'Group B'), + (1000, 'Excluded Group'), + (2000, 'Group A'), + (2001, 'Group B'), + (1000, 'Excluded Group'), + (2000, 'Group A'), + (2001, 'Group B'), + (1000, 'Excluded Group'), + (2000, 'Group A'), + (2001, 'Group B'), + (1000, 'Excluded Group'), + (2000, 'Group A'), + (2001, 'Group B'), + (1000, 'Excluded Group'), + (2000, 'Group A'), + (2001, 'Group B'), + (1000, 'Excluded Group'), + (2000, 'Group A'), + (2001, 'Group B'), + (1000, 'Excluded Group'); -- 被排除的组 + """ + + sql """ + INSERT INTO eq_label_setting_relation (eq_id, eq_label_setting_id) + VALUES + (3000, 740960), -- 满足条件的记录 + (3001, 740960), -- 满足条件的记录 + (3002, 123456), -- 不满足条件 + (3003, 740960), + (3000, 740960), -- 满足条件的记录 + (3001, 740960), -- 满足条件的记录 + (3002, 123456), -- 不满足条件 + (3003, 740960), + (3000, 740960), -- 满足条件的记录 + (3001, 740960), -- 满足条件的记录 + (3002, 123456), -- 不满足条件 + (3003, 740960), + (3000, 740960), -- 满足条件的记录 + (3001, 740960), -- 满足条件的记录 + (3002, 123456), -- 不满足条件 + (3003, 740960), + (3000, 740960), -- 满足条件的记录 + (3001, 740960), -- 满足条件的记录 + (3002, 123456), -- 不满足条件 + (3003, 740960); -- 满足条件的记录 + """ + + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ + qt_test """ + WITH tb_a AS ( SELECT * FROM ads_income_statistics ic WHERE ic.statistics_date >= '2025-01-01' AND ic.statistics_date < '2025-01-15' AND ic.distributor_id = 1537918 AND ic.eq_group_id != 1000 ) SELECT count(*) FROM eq_group eg JOIN tb_a ic ON eg.eq_group_id = ic.eq_group_id JOIN[shuffle] eq_label_setting_relation er ON ic.eq_id = er.eq_id WHERE ic.distributor_id = 1537918 AND er.eq_label_setting_id IN (740960) AND ic.eq_group_id != 1000; + """ +} From a87730f07c0f066c6ee14f6de0257513c96afa0f Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 17 Jan 2025 21:08:57 +0800 Subject: [PATCH 2/5] fix case --- .../suites/query_p0/join/test_low_bucket/test_low_bucket.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy b/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy index 7e39804c17ed7a..db5b5ae13dcb1c 100644 --- a/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy +++ b/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy @@ -16,6 +16,7 @@ // under the License. suite("test_low_bucket") { + sql "set enable_spill=false" // spill will cause rf not_ready sql "set runtime_filter_wait_infinitely = true" sql "set parallel_pipeline_task_num = 4" From 64d8d094949952d2274fee5b50fac53d5d3771e0 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Sat, 18 Jan 2025 00:12:17 +0800 Subject: [PATCH 3/5] fix case --- .../suites/query_p0/join/test_low_bucket/test_low_bucket.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy b/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy index db5b5ae13dcb1c..9bac6dc2469512 100644 --- a/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy +++ b/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy @@ -16,7 +16,7 @@ // under the License. suite("test_low_bucket") { - sql "set enable_spill=false" // spill will cause rf not_ready + sql "set enable_join_spill = false" // spill will cause rf not_ready sql "set runtime_filter_wait_infinitely = true" sql "set parallel_pipeline_task_num = 4" From af145757b80d7f6487d8758da52ecfb0dc17adc7 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Mon, 20 Jan 2025 13:12:20 +0800 Subject: [PATCH 4/5] update --- be/src/exprs/runtime_filter.cpp | 41 ++++++++++++++------------- be/src/runtime/runtime_filter_mgr.cpp | 10 ------- be/src/runtime/runtime_filter_mgr.h | 1 - 3 files changed, 22 insertions(+), 30 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index b46e654f6b1188..abde8bcf90b013 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -323,25 +323,28 @@ Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) { return Status::OK(); }; auto do_merge = [&]() { - if (auto* local_merge_filters = - _state->global_runtime_filter_mgr()->get_local_merge_producer_filters( - _filter_id); - local_merge_filters) { - local_merge_filters->merge_watcher.start(); - std::lock_guard l(*local_merge_filters->lock); - RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get())); - local_merge_filters->merge_time--; - local_merge_filters->merge_watcher.stop(); - if (local_merge_filters->merge_time == 0) { - if (_has_local_target) { - RETURN_IF_ERROR(send_to_local_targets( - local_merge_filters->filters[0]->_wrapper, true, - local_merge_filters->merge_watcher.elapsed_time())); - } else { - RETURN_IF_ERROR(send_to_remote_targets( - local_merge_filters->filters[0].get(), - local_merge_filters->merge_watcher.elapsed_time())); - } + if (_has_local_target && + _state->global_runtime_filter_mgr()->get_consume_filters(_filter_id).empty()) { + // when global consumer not exist, send_to_local_targets will do nothing, so merge rf is useless + return Status::OK(); + } + LocalMergeFilters* local_merge_filters = nullptr; + RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters( + _filter_id, &local_merge_filters)); + local_merge_filters->merge_watcher.start(); + std::lock_guard l(*local_merge_filters->lock); + RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get())); + local_merge_filters->merge_time--; + local_merge_filters->merge_watcher.stop(); + if (local_merge_filters->merge_time == 0) { + if (_has_local_target) { + RETURN_IF_ERROR( + send_to_local_targets(local_merge_filters->filters[0]->_wrapper, true, + local_merge_filters->merge_watcher.elapsed_time())); + } else { + RETURN_IF_ERROR( + send_to_remote_targets(local_merge_filters->filters[0].get(), + local_merge_filters->merge_watcher.elapsed_time())); } } return Status::OK(); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index f43b037fc19e3d..ccc1ae23d65e71 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -170,16 +170,6 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters( return Status::OK(); } -doris::LocalMergeFilters* RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id) { - DCHECK(_is_global); - std::lock_guard l(_lock); - auto iter = _local_merge_producer_map.find(filter_id); - if (iter == _local_merge_producer_map.end()) { - return nullptr; - } - return &iter->second; -} - Status RuntimeFilterMgr::register_producer_filter( const TRuntimeFilterDesc& desc, const TQueryOptions& options, std::shared_ptr* producer_filter) { diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index c54be905f28f08..e9aba249064c48 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -107,7 +107,6 @@ class RuntimeFilterMgr { std::shared_ptr producer_filter); Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** local_merge_filters); - LocalMergeFilters* get_local_merge_producer_filters(int filter_id); Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, std::shared_ptr* producer_filter); From 6dde0529c94f33b36b38d188c81671dbc847b8c2 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Mon, 20 Jan 2025 13:18:02 +0800 Subject: [PATCH 5/5] update --- be/src/exprs/runtime_filter.cpp | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index abde8bcf90b013..2a545a7e2237a8 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -323,6 +323,9 @@ Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) { return Status::OK(); }; auto do_merge = [&]() { + // two case we need do local merge: + // 1. has remote target + // 2. has local target and has global consumer (means target scan has local shuffle) if (_has_local_target && _state->global_runtime_filter_mgr()->get_consume_filters(_filter_id).empty()) { // when global consumer not exist, send_to_local_targets will do nothing, so merge rf is useless @@ -420,9 +423,14 @@ class SyncSizeClosure : public AutoReleaseClosureglobal_runtime_filter_mgr()->get_local_merge_producer_filters(_filter_id); - local_merge_filters) { + // two case we need do local merge: + // 1. has remote target + // 2. has local target and has global consumer (means target scan has local shuffle) + if (_has_remote_target || + !_state->global_runtime_filter_mgr()->get_consume_filters(_filter_id).empty()) { + LocalMergeFilters* local_merge_filters = nullptr; + RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters( + _filter_id, &local_merge_filters)); std::lock_guard l(*local_merge_filters->lock); local_merge_filters->merge_size_times--; local_merge_filters->local_merged_size += local_filter_size;