diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 730337561e812a..704c256737a644 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -615,7 +615,7 @@ void AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& p } AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), @@ -628,9 +628,11 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _limit(tnode.limit), _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}), + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.agg_node.grouping_exprs), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index b3ffa19d6db791..3124a3981b47c7 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -143,7 +143,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState { class AggSinkOperatorX final : public DataSinkOperatorX { public: AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); ~AggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", @@ -164,9 +164,11 @@ class AggSinkOperatorX final : public DataSinkOperatorX { ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); } - return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + return _is_colocate && _require_bucket_distribution + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } + bool require_data_distribution() const override { return _is_colocate; } size_t get_revocable_mem_size(RuntimeState* state) const; vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) { @@ -213,6 +215,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX { const std::vector _partition_exprs; const bool _is_colocate; + const bool _require_bucket_distribution; RowDescriptor _agg_fn_output_row_descriptor; }; diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index a1d3384edc6dde..5b4f5cee5cb016 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -193,14 +193,17 @@ vectorized::BlockRowPos AnalyticSinkLocalState::_get_partition_by_end() { } AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, - const TPlanNode& tnode, const DescriptorTbl& descs) + const TPlanNode& tnode, const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id ? tnode.analytic_node.buffered_tuple_id : 0), _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}) {} + _require_bucket_distribution(require_bucket_distribution), + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.analytic_node.partition_exprs) {} Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 3ae4a7b5cff5ca..d974f68cefaf26 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -86,7 +86,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState { public: AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX::_name); @@ -102,13 +102,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX::required_data_distribution(); } + bool require_data_distribution() const override { return true; } + private: Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr, vectorized::IColumn* dst_column, size_t length); @@ -125,6 +127,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX _num_agg_input; const bool _is_colocate; + const bool _require_bucket_distribution; const std::vector _partition_exprs; }; diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 4fbe21f71d5e32..1f84bbf145a488 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -97,8 +97,8 @@ Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { // TODO: use runtime filter to filte result block, maybe this node need derive from vscan_node. for (const auto& filter_desc : p._runtime_filter_descs) { IRuntimeFilter* runtime_filter = nullptr; - RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, false, p.node_id(), - &runtime_filter)); + RETURN_IF_ERROR(state->register_consumer_runtime_filter( + filter_desc, p.ignore_data_distribution(), p.node_id(), &runtime_filter)); runtime_filter->init_profile(_runtime_profile.get()); } return Status::OK(); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index c33b436ba03984..16c0df07b4961b 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -374,7 +374,8 @@ void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct( DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool require_bucket_distribution) : StatefulOperatorX(pool, tnode, operator_id, descs), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), @@ -382,9 +383,11 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i _output_tuple_desc(nullptr), _needs_finalize(tnode.agg_node.need_finalize), _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate) { + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.agg_node.grouping_exprs), + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution) { if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; if (_is_streaming_preagg) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index ca091f743bd8d9..d0b0d963ead125 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -97,7 +97,7 @@ class DistinctStreamingAggOperatorX final : public StatefulOperatorX { public: DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; @@ -107,13 +107,15 @@ class DistinctStreamingAggOperatorX final DataDistribution required_data_distribution() const override { if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) { - return _is_colocate + return _is_colocate && _require_bucket_distribution ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } return StatefulOperatorX::required_data_distribution(); } + bool require_data_distribution() const override { return _is_colocate; } + private: friend class DistinctStreamingAggLocalState; TupleId _intermediate_tuple_id; @@ -125,6 +127,7 @@ class DistinctStreamingAggOperatorX final const bool _is_first_phase; const std::vector _partition_exprs; const bool _is_colocate; + const bool _require_bucket_distribution; // group by k1,k2 vectorized::VExprContextSPtrs _probe_expr_ctxs; std::vector _aggregate_evaluators; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 2dab03d5a1916c..d445e2f309c373 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -167,6 +167,10 @@ class HashJoinBuildSinkOperatorX final bool is_shuffled_hash_join() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } + bool require_data_distribution() const override { + return _join_distribution == TJoinDistributionType::COLOCATE || + _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE; + } private: friend class HashJoinBuildSinkLocalState; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 5cdfe9feeb75ec..264f177bcc9c4b 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -178,6 +178,10 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX(operator_id, tnode.node_id) { - _agg_sink_operator = std::make_unique(pool, operator_id, tnode, descs); + _agg_sink_operator = std::make_unique(pool, operator_id, tnode, descs, + require_bucket_distribution); } Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 016869374bd921..d79ba6fd3d4f8f 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -289,7 +289,7 @@ class PartitionedAggSinkLocalState class PartitionedAggSinkOperatorX : public DataSinkOperatorX { public: PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); ~PartitionedAggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", @@ -308,6 +308,10 @@ class PartitionedAggSinkOperatorX : public DataSinkOperatorXrequired_data_distribution(); } + bool require_data_distribution() const override { + return _agg_sink_operator->require_data_distribution(); + } + Status set_child(OperatorXPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX::set_child(child)); return _agg_sink_operator->set_child(child); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index db20efda67e005..b10c514b2f4d42 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -182,6 +182,9 @@ class PartitionedHashJoinProbeOperatorX final _inner_sink_operator = sink_operator; _inner_probe_operator = probe_operator; } + bool require_data_distribution() const override { + return _inner_probe_operator->require_data_distribution(); + } private: Status _revoke_memory(RuntimeState* state); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 82fe5eacd9445b..2fae1f15bfa569 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -129,6 +129,10 @@ class PartitionedHashJoinSinkOperatorX _inner_probe_operator = probe_operator; } + bool require_data_distribution() const override { + return _inner_probe_operator->require_data_distribution(); + } + private: friend class PartitionedHashJoinSinkLocalState; diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index d89e54614d14c6..61c35427e57987 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -75,7 +75,7 @@ Status SortSinkLocalState::open(RuntimeState* state) { } SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), _pool(pool), @@ -85,7 +85,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read), _merge_by_exchange(tnode.sort_node.merge_by_exchange), - _is_colocate(tnode.sort_node.__isset.is_colocate ? tnode.sort_node.is_colocate : false), + _is_colocate(tnode.sort_node.__isset.is_colocate && tnode.sort_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), _is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort ? tnode.sort_node.is_analytic_sort : false), diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index ad9c23401b4c69..f29d9bbde0944c 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -74,7 +74,7 @@ class SortSinkLocalState : public PipelineXSinkLocalState { class SortSinkOperatorX final : public DataSinkOperatorX { public: SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX::_name); @@ -87,7 +87,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate + return _is_colocate && _require_bucket_distribution ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { @@ -96,6 +96,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { } return DataSinkOperatorX::required_data_distribution(); } + bool require_data_distribution() const override { return _is_colocate; } bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT; } @@ -128,6 +129,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { const bool _use_two_phase_read; const bool _merge_by_exchange; const bool _is_colocate = false; + const bool _require_bucket_distribution = false; const bool _is_analytic_sort = false; const std::vector _partition_exprs; }; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index dfda2ff61e1022..92cd1f542d8b0c 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -112,9 +112,11 @@ Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) { } SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, - const TPlanNode& tnode, const DescriptorTbl& descs) + const TPlanNode& tnode, const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id) { - _sort_sink_operator = std::make_unique(pool, operator_id, tnode, descs); + _sort_sink_operator = std::make_unique(pool, operator_id, tnode, descs, + require_bucket_distribution); } Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 9382edd6933978..fae5fe3270f3a0 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -64,7 +64,7 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorX::_name); @@ -78,6 +78,9 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorXrequired_data_distribution(); } + bool require_data_distribution() const override { + return _sort_sink_operator->require_data_distribution(); + } Status set_child(OperatorXPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX::set_child(child)); return _sort_sink_operator->set_child(child); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 1353e832e24d04..72e475f5461679 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -196,35 +196,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest new_block_wrapper->unref(local_state._shared_state); } } - } else if (bucket_seq_to_instance_idx.empty()) { - /** - * If type is `BUCKET_HASH_SHUFFLE` and `_bucket_seq_to_instance_idx` is empty, which - * means no scan operators is included in this fragment so we also need a `HASH_SHUFFLE` here. - */ - const auto& map = local_state._parent->cast() - ._shuffle_idx_to_instance_idx; - DCHECK(!map.empty()); - new_block_wrapper->ref(map.size()); - for (const auto& it : map) { - DCHECK(it.second >= 0 && it.second < _num_partitions) - << it.first << " : " << it.second << " " << _num_partitions; - uint32_t start = local_state._partition_rows_histogram[it.first]; - uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start; - if (size > 0) { - local_state._shared_state->add_mem_usage( - it.second, new_block_wrapper->data_block.allocated_bytes(), false); - - if (!_enqueue_data_and_set_ready(it.second, local_state, - {new_block_wrapper, {row_idx, start, size}})) { - local_state._shared_state->sub_mem_usage( - it.second, new_block_wrapper->data_block.allocated_bytes(), false); - new_block_wrapper->unref(local_state._shared_state); - } - } else { - new_block_wrapper->unref(local_state._shared_state); - } - } } else { + DCHECK(!bucket_seq_to_instance_idx.empty()); new_block_wrapper->ref(_num_partitions); for (size_t i = 0; i < _num_partitions; i++) { uint32_t start = local_state._partition_rows_histogram[i]; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 7d90cebc8d22d2..a0c31384b2253b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -800,15 +800,26 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( // 1. Create a new pipeline with local exchange sink. DataSinkOperatorXPtr sink; auto sink_id = next_sink_operator_id(); - const bool is_shuffled_hash_join = operator_xs.size() > idx - ? operator_xs[idx]->is_shuffled_hash_join() - : cur_pipe->sink_x()->is_shuffled_hash_join(); + /** + * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. + * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. + */ + const bool should_disable_bucket_shuffle = + bucket_seq_to_instance_idx.empty() || + (operator_xs.size() > idx ? operator_xs[idx]->is_shuffled_hash_join() + : cur_pipe->sink_x()->is_shuffled_hash_join()); sink.reset(new LocalExchangeSinkOperatorX( - sink_id, local_exchange_id, is_shuffled_hash_join ? _total_instances : _num_instances, + sink_id, local_exchange_id, + should_disable_bucket_shuffle ? _total_instances : _num_instances, data_distribution.partition_exprs, bucket_seq_to_instance_idx)); + if (should_disable_bucket_shuffle && + data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) { + data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE; + } RETURN_IF_ERROR(new_pip->set_sink(sink)); RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets, - is_shuffled_hash_join, shuffle_idx_to_instance_idx)); + should_disable_bucket_shuffle, + shuffle_idx_to_instance_idx)); // 2. Create and initialize LocalExchangeSharedState. auto shared_state = LocalExchangeSharedState::create_shared(_num_instances); @@ -816,7 +827,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), - is_shuffled_hash_join ? _total_instances : _num_instances, + should_disable_bucket_shuffle ? _total_instances : _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? _runtime_state->query_options().local_exchange_free_blocks_limit : 0); @@ -1042,8 +1053,11 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN request.query_options.__isset.enable_distinct_streaming_aggregation && request.query_options.enable_distinct_streaming_aggregation && !tnode.agg_node.grouping_exprs.empty()) { - op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); + op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, + _require_bucket_distribution)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + _require_bucket_distribution = + _require_bucket_distribution || op->require_data_distribution(); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && !tnode.agg_node.grouping_exprs.empty()) { @@ -1067,14 +1081,18 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; if (_runtime_state->enable_agg_spill() && !tnode.agg_node.grouping_exprs.empty()) { sink.reset(new PartitionedAggSinkOperatorX(pool, next_sink_operator_id(), tnode, - descs)); + descs, _require_bucket_distribution)); } else { - sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); } + _require_bucket_distribution = true; break; } case TPlanNodeType::HASH_JOIN_NODE: { @@ -1139,6 +1157,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); } + _require_bucket_distribution = + _require_bucket_distribution || op->require_data_distribution(); break; } case TPlanNodeType::CROSS_JOIN_NODE: { @@ -1201,10 +1221,14 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; if (_runtime_state->enable_sort_spill()) { - sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } else { - sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -1240,7 +1264,10 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -1279,6 +1306,10 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN case TPlanNodeType::DATA_GEN_SCAN_NODE: { op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); + op->set_ignore_data_distribution(); + } break; } case TPlanNodeType::SCHEMA_SCAN_NODE: { @@ -1301,6 +1332,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN print_plan_node_type(tnode.node_type)); } + _require_bucket_distribution = true; + return Status::OK(); } // NOLINTEND(readability-function-cognitive-complexity) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 55866400374d22..14e4b05d7e88ca 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -235,6 +235,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext { // Total instance num running on all BEs int _total_instances = -1; + + bool _require_bucket_distribution = false; }; } // namespace pipeline diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java index 14a50160d63435..f4e6dc93130d5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java @@ -22,6 +22,7 @@ import org.apache.doris.common.NereidsException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.tablefunction.DataGenTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionTask; @@ -116,6 +117,14 @@ public boolean needToCheckColumnPriv() { // by multi-processes or multi-threads. So we assign instance number to 1. @Override public int getNumInstances() { + if (ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) { + return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); + } + return 1; + } + + @Override + public int getScanRangeNum() { return 1; } diff --git a/regression-test/pipeline/p1/conf/regression-conf.groovy b/regression-test/pipeline/p1/conf/regression-conf.groovy index 8f8458e47a60c3..2a0156e16b4d4c 100644 --- a/regression-test/pipeline/p1/conf/regression-conf.groovy +++ b/regression-test/pipeline/p1/conf/regression-conf.groovy @@ -60,6 +60,7 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // keep this line as th "test_profile," + "test_refresh_mtmv," + "test_spark_load," + + "test_iot_auto_detect_concurrent," + "zzz_the_end_sentinel_do_not_touch" // keep this line as the last line // this dir will not be executed diff --git a/regression-test/suites/correctness_p0/test_assert_row_num.groovy b/regression-test/suites/correctness_p0/test_assert_row_num.groovy index 818213f56fee89..68e9740a321ab3 100644 --- a/regression-test/suites/correctness_p0/test_assert_row_num.groovy +++ b/regression-test/suites/correctness_p0/test_assert_row_num.groovy @@ -21,7 +21,7 @@ suite("test_assert_num_rows") { """ qt_sql_2 """ - SELECT * from numbers("number"="10") WHERE ( SELECT * FROM (SELECT 3) __DORIS_DUAL__ ) IS NOT NULL + SELECT * from numbers("number"="10") WHERE ( SELECT * FROM (SELECT 3) __DORIS_DUAL__ ) IS NOT NULL ORDER BY number """ sql """ DROP TABLE IF EXISTS table_9_undef_undef; diff --git a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy index 6f0f74f6433e2f..c0f2cafa403c5d 100644 --- a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy @@ -39,17 +39,17 @@ order_qt_inner_join1 """ select a.number as num1, b.number as num2 from numbers("number" = "10") a inner join numbers("number" = "10") b - on a.number=b.number; + on a.number=b.number ORDER BY a.number,b.number; """ order_qt_inner_join2 """ select a.number as num1, b.number as num2 from numbers("number" = "6") a inner join numbers("number" = "6") b - on a.number>b.number; + on a.number>b.number ORDER BY a.number,b.number; """ order_qt_inner_join3 """ select a.number as num1, b.number as num2 from numbers("number" = "10") a inner join numbers("number" = "10") b - on a.number=b.number and b.number%2 = 0; + on a.number=b.number and b.number%2 = 0 ORDER BY a.number,b.number; """ order_qt_left_join """ select a.number as num1, b.number as num2 diff --git a/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy index c66131b57dcfc9..29fe192e2b5368 100644 --- a/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy +++ b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy @@ -16,6 +16,10 @@ // under the License. suite("test_join_local_shuffle", "query,p0") { + sql "DROP TABLE IF EXISTS test_join_local_shuffle_1;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_2;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_3;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_4;" sql "SET enable_nereids_planner=true" sql "SET enable_fallback_to_original_planner=false" sql """ @@ -72,7 +76,7 @@ suite("test_join_local_shuffle", "query,p0") { sql "insert into test_join_local_shuffle_2 values(2, 0);" sql "insert into test_join_local_shuffle_3 values(2, 0);" sql "insert into test_join_local_shuffle_4 values(0, 1);" - qt_sql " select /*+SET_VAR(disable_join_reorder=true,enable_local_shuffle=true) */ * from (select c1, max(c2) from (select b.c1 c1, b.c2 c2 from test_join_local_shuffle_3 a join [shuffle] test_join_local_shuffle_1 b on a.c2 = b.c1 join [broadcast] test_join_local_shuffle_4 c on b.c1 = c.c1) t1 group by c1) t, test_join_local_shuffle_2 where t.c1 = test_join_local_shuffle_2.c2; " + qt_sql " select /*+SET_VAR(disable_join_reorder=true,enable_local_shuffle=true) */ * from (select c1, max(c2) from (select b.c1 c1, b.c2 c2 from test_join_local_shuffle_3 a join [shuffle] test_join_local_shuffle_1 b on a.c2 = b.c1 join [broadcast] test_join_local_shuffle_4 c on b.c1 = c.c1) t1 group by c1) t join [shuffle] test_join_local_shuffle_2 on t.c1 = test_join_local_shuffle_2.c2; " sql "DROP TABLE IF EXISTS test_join_local_shuffle_1;" sql "DROP TABLE IF EXISTS test_join_local_shuffle_2;"