From e566eecca6bd9aa41643c75a06c7a2b7902432f9 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 3 Jul 2024 11:18:39 +0800 Subject: [PATCH 1/6] [pipeline](datagen) Improve datagen operator parallism --- be/src/pipeline/exec/datagen_operator.h | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h index 8aeeea2a699824..a1ac1cc4a23846 100644 --- a/be/src/pipeline/exec/datagen_operator.h +++ b/be/src/pipeline/exec/datagen_operator.h @@ -56,6 +56,7 @@ class DataGenSourceOperatorX final : public OperatorX { Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; [[nodiscard]] bool is_source() const override { return true; } + bool ignore_data_distribution() const override { return true; } private: friend class DataGenLocalState; From 6f4469fa722b25d5e8371820216087bd086413c7 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 4 Jul 2024 12:30:09 +0800 Subject: [PATCH 2/6] [pipeline](datagen) Improve datagen operator parallism --- be/src/pipeline/exec/datagen_operator.h | 1 - be/src/pipeline/pipeline_fragment_context.cpp | 19 +++++++++++++++++++ .../apache/doris/planner/DataGenScanNode.java | 9 +++++++++ 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h index a1ac1cc4a23846..8aeeea2a699824 100644 --- a/be/src/pipeline/exec/datagen_operator.h +++ b/be/src/pipeline/exec/datagen_operator.h @@ -56,7 +56,6 @@ class DataGenSourceOperatorX final : public OperatorX { Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; [[nodiscard]] bool is_source() const override { return true; } - bool ignore_data_distribution() const override { return true; } private: friend class DataGenLocalState; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 0968de7951ed84..04e1193c66e5eb 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -833,6 +833,7 @@ Status PipelineFragmentContext::_add_local_exchange( const std::map& shuffle_idx_to_instance_idx, const bool ignore_data_distribution) { DCHECK(_enable_local_shuffle()); + LOG(WARNING) << "======2 " << _num_instances; if (_num_instances <= 1) { return Status::OK(); } @@ -840,6 +841,7 @@ Status PipelineFragmentContext::_add_local_exchange( if (!cur_pipe->need_to_local_exchange(data_distribution)) { return Status::OK(); } + LOG(WARNING) << "======3 " << _num_instances; *do_local_exchange = true; auto& operator_xs = cur_pipe->operator_xs(); @@ -870,6 +872,19 @@ Status PipelineFragmentContext::_plan_local_exchange( const std::map& shuffle_idx_to_instance_idx) { for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) { _pipelines[pip_idx]->init_data_distribution(); + LOG(WARNING) << "======1 " + << get_exchange_type_name(_pipelines[pip_idx] + ->operator_xs() + .front() + ->required_data_distribution() + .distribution_type) + << " " + << get_exchange_type_name(_pipelines[pip_idx] + ->sink_x() + ->required_data_distribution() + .distribution_type) + << " " << _num_instances; + ; // Set property if child pipeline is not join operator's child. if (!_pipelines[pip_idx]->children().empty()) { for (auto& child : _pipelines[pip_idx]->children()) { @@ -1433,6 +1448,10 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case TPlanNodeType::DATA_GEN_SCAN_NODE: { op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); + op->set_ignore_data_distribution(); + } break; } case TPlanNodeType::SCHEMA_SCAN_NODE: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java index 60fce4df14848f..995a7f2b1fb9e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java @@ -22,6 +22,7 @@ import org.apache.doris.common.NereidsException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.tablefunction.DataGenTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionTask; @@ -122,6 +123,14 @@ public boolean needToCheckColumnPriv() { // by multi-processes or multi-threads. So we assign instance number to 1. @Override public int getNumInstances() { + if (ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) { + return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); + } + return 1; + } + + @Override + public int getScanRangeNum() { return 1; } From b86b2ad2fa3130d9c9b79d2fea361befa3385d55 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 4 Jul 2024 12:31:34 +0800 Subject: [PATCH 3/6] update --- be/src/pipeline/pipeline_fragment_context.cpp | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 04e1193c66e5eb..5e08c22b1700e0 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -833,7 +833,6 @@ Status PipelineFragmentContext::_add_local_exchange( const std::map& shuffle_idx_to_instance_idx, const bool ignore_data_distribution) { DCHECK(_enable_local_shuffle()); - LOG(WARNING) << "======2 " << _num_instances; if (_num_instances <= 1) { return Status::OK(); } @@ -841,7 +840,6 @@ Status PipelineFragmentContext::_add_local_exchange( if (!cur_pipe->need_to_local_exchange(data_distribution)) { return Status::OK(); } - LOG(WARNING) << "======3 " << _num_instances; *do_local_exchange = true; auto& operator_xs = cur_pipe->operator_xs(); @@ -872,19 +870,6 @@ Status PipelineFragmentContext::_plan_local_exchange( const std::map& shuffle_idx_to_instance_idx) { for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) { _pipelines[pip_idx]->init_data_distribution(); - LOG(WARNING) << "======1 " - << get_exchange_type_name(_pipelines[pip_idx] - ->operator_xs() - .front() - ->required_data_distribution() - .distribution_type) - << " " - << get_exchange_type_name(_pipelines[pip_idx] - ->sink_x() - ->required_data_distribution() - .distribution_type) - << " " << _num_instances; - ; // Set property if child pipeline is not join operator's child. if (!_pipelines[pip_idx]->children().empty()) { for (auto& child : _pipelines[pip_idx]->children()) { From 30194516197f6de510c26cb0f9dd892928db2d75 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 4 Jul 2024 15:10:33 +0800 Subject: [PATCH 4/6] update --- be/src/pipeline/exec/datagen_operator.cpp | 4 ++-- regression-test/conf/regression-conf.groovy | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 541e882667f583..48e428ceef42cf 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -87,8 +87,8 @@ Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { // TODO: use runtime filter to filte result block, maybe this node need derive from vscan_node. for (const auto& filter_desc : p._runtime_filter_descs) { IRuntimeFilter* runtime_filter = nullptr; - RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, false, p.node_id(), - &runtime_filter)); + RETURN_IF_ERROR(state->register_consumer_runtime_filter( + filter_desc, p.ignore_data_distribution(), p.node_id(), &runtime_filter)); runtime_filter->init_profile(_runtime_profile.get()); } return Status::OK(); diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 6d4d915633914b..481e536d987e80 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -84,7 +84,7 @@ testDirectories = "" // this groups will not be executed excludeGroups = "" // this suites will not be executed -excludeSuites = "test_broker_load" +excludeSuites = "test_broker_load,test_iot_auto_detect_concurrent" // this directories will not be executed excludeDirectories = "000_the_start_sentinel_do_not_touch," + // keep this line as the first line "segcompaction_p2," + From 524adcd61ad12787ed0638f89cc4428d354aa8ce Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 4 Jul 2024 15:30:09 +0800 Subject: [PATCH 5/6] update --- .../suites/correctness_p0/test_assert_row_num.groovy | 2 +- .../suites/external_table_p0/tvf/test_numbers.groovy | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/regression-test/suites/correctness_p0/test_assert_row_num.groovy b/regression-test/suites/correctness_p0/test_assert_row_num.groovy index 818213f56fee89..68e9740a321ab3 100644 --- a/regression-test/suites/correctness_p0/test_assert_row_num.groovy +++ b/regression-test/suites/correctness_p0/test_assert_row_num.groovy @@ -21,7 +21,7 @@ suite("test_assert_num_rows") { """ qt_sql_2 """ - SELECT * from numbers("number"="10") WHERE ( SELECT * FROM (SELECT 3) __DORIS_DUAL__ ) IS NOT NULL + SELECT * from numbers("number"="10") WHERE ( SELECT * FROM (SELECT 3) __DORIS_DUAL__ ) IS NOT NULL ORDER BY number """ sql """ DROP TABLE IF EXISTS table_9_undef_undef; diff --git a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy index 44a956f93c60b5..3e1e038c0586e7 100644 --- a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy @@ -39,17 +39,17 @@ qt_inner_join1 """ select a.number as num1, b.number as num2 from numbers("number" = "10") a inner join numbers("number" = "10") b - on a.number=b.number; + on a.number=b.number ORDER BY a.number,b.number; """ qt_inner_join2 """ select a.number as num1, b.number as num2 from numbers("number" = "6") a inner join numbers("number" = "6") b - on a.number>b.number; + on a.number>b.number ORDER BY a.number,b.number; """ qt_inner_join3 """ select a.number as num1, b.number as num2 from numbers("number" = "10") a inner join numbers("number" = "10") b - on a.number=b.number and b.number%2 = 0; + on a.number=b.number and b.number%2 = 0 ORDER BY a.number,b.number; """ qt_left_join """ select a.number as num1, b.number as num2 From 573ce0870fdc8eefe30b19ddfc0338c579d3d36e Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 4 Jul 2024 18:55:16 +0800 Subject: [PATCH 6/6] update --- regression-test/conf/regression-conf.groovy | 2 +- .../pipeline/cloud_p1/conf/regression-conf-custom.groovy | 1 + regression-test/pipeline/p1/conf/regression-conf.groovy | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 481e536d987e80..6d4d915633914b 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -84,7 +84,7 @@ testDirectories = "" // this groups will not be executed excludeGroups = "" // this suites will not be executed -excludeSuites = "test_broker_load,test_iot_auto_detect_concurrent" +excludeSuites = "test_broker_load" // this directories will not be executed excludeDirectories = "000_the_start_sentinel_do_not_touch," + // keep this line as the first line "segcompaction_p2," + diff --git a/regression-test/pipeline/cloud_p1/conf/regression-conf-custom.groovy b/regression-test/pipeline/cloud_p1/conf/regression-conf-custom.groovy index 42a18b7f22e64e..1b7c4f4c07c725 100644 --- a/regression-test/pipeline/cloud_p1/conf/regression-conf-custom.groovy +++ b/regression-test/pipeline/cloud_p1/conf/regression-conf-custom.groovy @@ -9,6 +9,7 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // keep this line as th "test_profile," + "test_refresh_mtmv," + "test_spark_load," + + "test_iot_auto_detect_concurrent," + "zzz_the_end_sentinel_do_not_touch" // keep this line as the last line excludeDirectories = "000_the_start_sentinel_do_not_touch," + // keep this line as the first line diff --git a/regression-test/pipeline/p1/conf/regression-conf.groovy b/regression-test/pipeline/p1/conf/regression-conf.groovy index d4ecd55d38f4a3..f85892e6834155 100644 --- a/regression-test/pipeline/p1/conf/regression-conf.groovy +++ b/regression-test/pipeline/p1/conf/regression-conf.groovy @@ -60,6 +60,7 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // keep this line as th "test_profile," + "test_refresh_mtmv," + "test_spark_load," + + "test_iot_auto_detect_concurrent," + "zzz_the_end_sentinel_do_not_touch" // keep this line as the last line // this dir will not be executed