diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 541e882667f583..48e428ceef42cf 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -87,8 +87,8 @@ Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { // TODO: use runtime filter to filte result block, maybe this node need derive from vscan_node. for (const auto& filter_desc : p._runtime_filter_descs) { IRuntimeFilter* runtime_filter = nullptr; - RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, false, p.node_id(), - &runtime_filter)); + RETURN_IF_ERROR(state->register_consumer_runtime_filter( + filter_desc, p.ignore_data_distribution(), p.node_id(), &runtime_filter)); runtime_filter->init_profile(_runtime_profile.get()); } return Status::OK(); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 0968de7951ed84..5e08c22b1700e0 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1433,6 +1433,10 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case TPlanNodeType::DATA_GEN_SCAN_NODE: { op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); + op->set_ignore_data_distribution(); + } break; } case TPlanNodeType::SCHEMA_SCAN_NODE: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java index 60fce4df14848f..995a7f2b1fb9e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java @@ -22,6 +22,7 @@ import org.apache.doris.common.NereidsException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.tablefunction.DataGenTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionTask; @@ -122,6 +123,14 @@ public boolean needToCheckColumnPriv() { // by multi-processes or multi-threads. So we assign instance number to 1. @Override public int getNumInstances() { + if (ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) { + return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); + } + return 1; + } + + @Override + public int getScanRangeNum() { return 1; } diff --git a/regression-test/pipeline/cloud_p1/conf/regression-conf-custom.groovy b/regression-test/pipeline/cloud_p1/conf/regression-conf-custom.groovy index 42a18b7f22e64e..1b7c4f4c07c725 100644 --- a/regression-test/pipeline/cloud_p1/conf/regression-conf-custom.groovy +++ b/regression-test/pipeline/cloud_p1/conf/regression-conf-custom.groovy @@ -9,6 +9,7 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // keep this line as th "test_profile," + "test_refresh_mtmv," + "test_spark_load," + + "test_iot_auto_detect_concurrent," + "zzz_the_end_sentinel_do_not_touch" // keep this line as the last line excludeDirectories = "000_the_start_sentinel_do_not_touch," + // keep this line as the first line diff --git a/regression-test/pipeline/p1/conf/regression-conf.groovy b/regression-test/pipeline/p1/conf/regression-conf.groovy index d4ecd55d38f4a3..f85892e6834155 100644 --- a/regression-test/pipeline/p1/conf/regression-conf.groovy +++ b/regression-test/pipeline/p1/conf/regression-conf.groovy @@ -60,6 +60,7 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // keep this line as th "test_profile," + "test_refresh_mtmv," + "test_spark_load," + + "test_iot_auto_detect_concurrent," + "zzz_the_end_sentinel_do_not_touch" // keep this line as the last line // this dir will not be executed diff --git a/regression-test/suites/correctness_p0/test_assert_row_num.groovy b/regression-test/suites/correctness_p0/test_assert_row_num.groovy index 818213f56fee89..68e9740a321ab3 100644 --- a/regression-test/suites/correctness_p0/test_assert_row_num.groovy +++ b/regression-test/suites/correctness_p0/test_assert_row_num.groovy @@ -21,7 +21,7 @@ suite("test_assert_num_rows") { """ qt_sql_2 """ - SELECT * from numbers("number"="10") WHERE ( SELECT * FROM (SELECT 3) __DORIS_DUAL__ ) IS NOT NULL + SELECT * from numbers("number"="10") WHERE ( SELECT * FROM (SELECT 3) __DORIS_DUAL__ ) IS NOT NULL ORDER BY number """ sql """ DROP TABLE IF EXISTS table_9_undef_undef; diff --git a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy index 44a956f93c60b5..3e1e038c0586e7 100644 --- a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy @@ -39,17 +39,17 @@ qt_inner_join1 """ select a.number as num1, b.number as num2 from numbers("number" = "10") a inner join numbers("number" = "10") b - on a.number=b.number; + on a.number=b.number ORDER BY a.number,b.number; """ qt_inner_join2 """ select a.number as num1, b.number as num2 from numbers("number" = "6") a inner join numbers("number" = "6") b - on a.number>b.number; + on a.number>b.number ORDER BY a.number,b.number; """ qt_inner_join3 """ select a.number as num1, b.number as num2 from numbers("number" = "10") a inner join numbers("number" = "10") b - on a.number=b.number and b.number%2 = 0; + on a.number=b.number and b.number%2 = 0 ORDER BY a.number,b.number; """ qt_left_join """ select a.number as num1, b.number as num2