From bc5289d7dfda76aace5ed2ee9c3b6c5135f217c0 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 14 Dec 2023 16:47:17 +0800 Subject: [PATCH 1/4] [pipelineX](improvement) Support global rf --- be/src/exprs/runtime_filter.cpp | 58 ++++++-- be/src/exprs/runtime_filter.h | 16 ++- be/src/exprs/runtime_filter_slots.h | 12 +- .../pipeline/exec/aggregation_sink_operator.h | 10 +- .../exec/aggregation_source_operator.cpp | 5 +- be/src/pipeline/exec/analytic_sink_operator.h | 11 +- .../pipeline/exec/assert_num_rows_operator.h | 4 +- .../pipeline/exec/exchange_source_operator.h | 7 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 9 +- be/src/pipeline/exec/hashjoin_build_sink.h | 18 +-- .../pipeline/exec/hashjoin_probe_operator.h | 12 +- .../exec/nested_loop_join_build_operator.h | 8 ++ .../exec/nested_loop_join_probe_operator.h | 6 +- .../exec/partition_sort_sink_operator.h | 6 +- be/src/pipeline/exec/scan_operator.cpp | 6 +- be/src/pipeline/exec/scan_operator.h | 15 +- .../pipeline/exec/set_probe_sink_operator.h | 6 +- be/src/pipeline/exec/set_sink_operator.h | 6 +- be/src/pipeline/exec/sort_sink_operator.h | 6 +- .../streaming_aggregation_sink_operator.h | 4 +- be/src/pipeline/pipeline.cpp | 1 + be/src/pipeline/pipeline.h | 38 ++--- be/src/pipeline/pipeline_x/dependency.cpp | 1 + be/src/pipeline/pipeline_x/dependency.h | 28 ++++ .../local_exchange_sink_operator.cpp | 10 ++ .../local_exchange_sink_operator.h | 5 +- .../local_exchange_source_operator.cpp | 10 ++ .../local_exchange_source_operator.h | 9 ++ .../local_exchange/local_exchanger.cpp | 41 +++++- .../local_exchange/local_exchanger.h | 40 +++++- be/src/pipeline/pipeline_x/operator.cpp | 4 +- be/src/pipeline/pipeline_x/operator.h | 28 ++-- .../pipeline_x_fragment_context.cpp | 131 +++++++----------- .../pipeline_x/pipeline_x_fragment_context.h | 14 +- be/src/runtime/runtime_filter_mgr.cpp | 24 +++- be/src/runtime/runtime_filter_mgr.h | 5 +- be/src/vec/exec/runtime_filter_consumer.cpp | 16 ++- be/src/vec/exec/runtime_filter_consumer.h | 4 +- be/src/vec/sink/vdata_stream_sender.cpp | 18 +++ be/src/vec/sink/vdata_stream_sender.h | 8 +- .../post/RuntimeFilterGenerator.java | 4 + .../java/org/apache/doris/qe/Coordinator.java | 4 +- .../org/apache/doris/qe/SessionVariable.java | 2 +- 43 files changed, 451 insertions(+), 219 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 7221ba1a972f7c..d13aa71c4e6e0a 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -889,13 +889,21 @@ class RuntimePredicateWrapper { return Status::OK(); } - PrimitiveType column_type() { return _column_return_type; } + PrimitiveType column_type() { + return _column_return_type; + } - bool is_bloomfilter() const { return _is_bloomfilter; } + bool is_bloomfilter() const { + return _is_bloomfilter; + } - bool is_ignored_in_filter() const { return _is_ignored_in_filter; } + bool is_ignored_in_filter() const { + return _is_ignored_in_filter; + } - std::string* get_ignored_in_filter_msg() const { return _ignored_in_filter_msg; } + std::string* get_ignored_in_filter_msg() const { + return _ignored_in_filter_msg; + } void batch_assign(const PInFilter* filter, void (*assign_func)(std::shared_ptr& _hybrid_set, @@ -906,7 +914,9 @@ class RuntimePredicateWrapper { } } - size_t get_in_filter_size() const { return _context.hybrid_set->size(); } + size_t get_in_filter_size() const { + return _context.hybrid_set->size(); + } std::shared_ptr get_bitmap_filter() const { return _context.bitmap_filter_func; @@ -946,10 +956,11 @@ class RuntimePredicateWrapper { Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* pool, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, int node_id, IRuntimeFilter** res, - bool build_bf_exactly) { - *res = pool->add(new IRuntimeFilter(state, pool, desc)); + bool build_bf_exactly, bool is_global, int parallel_tasks) { + *res = pool->add(new IRuntimeFilter(state, pool, desc, is_global, parallel_tasks)); (*res)->set_role(role); - return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly); + return (*res)->init_with_desc(desc, query_options, node_id, + is_global ? false : build_bf_exactly); } void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) { @@ -972,9 +983,35 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta _wrapper->insert_batch(column, start); } +Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, int* merged_num) { + SCOPED_TIMER(_merge_local_rf_timer); + std::unique_lock lock(_local_merge_mutex); + if (_merged_rf_num == 0) { + _wrapper = wrapper; + } else { + RETURN_IF_ERROR(merge_from(wrapper)); + } + *merged_num = ++_merged_rf_num; + return Status::OK(); +} + Status IRuntimeFilter::publish() { DCHECK(is_producer()); - if (_has_local_target) { + if (_is_global) { + std::vector filters; + RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filters( + _filter_id, filters)); + // push down + for (auto filter : filters) { + int merged_num = 0; + RETURN_IF_ERROR(filter->merge_local_filter(_wrapper, &merged_num)); + if (merged_num == _parallel_build_tasks) { + filter->update_runtime_filter_type_to_profile(); + filter->signal(); + } + } + return Status::OK(); + } else if (_has_local_target) { std::vector filters; RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters)); // push down @@ -1297,6 +1334,9 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { _profile_init = true; parent_profile->add_child(_profile.get(), true, nullptr); _profile->add_info_string("Info", _format_status()); + if (_is_global) { + _merge_local_rf_timer = ADD_TIMER(_profile.get(), "MergeLocalRuntimeFilterTime"); + } if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { update_runtime_filter_type_to_profile(); } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 17660810520f11..5353f7da97ee2e 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -186,7 +186,7 @@ enum RuntimeFilterState { class IRuntimeFilter { public: IRuntimeFilter(RuntimeFilterParamsContext* state, ObjectPool* pool, - const TRuntimeFilterDesc* desc) + const TRuntimeFilterDesc* desc, bool is_global = false, int parallel_tasks = -1) : _state(state), _pool(pool), _filter_id(desc->filter_id), @@ -206,14 +206,17 @@ class IRuntimeFilter { _runtime_filter_type(get_runtime_filter_type(desc)), _name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, to_string(_runtime_filter_type))), - _profile(new RuntimeProfile(_name)) {} + _profile(new RuntimeProfile(_name)), + _is_global(is_global), + _parallel_build_tasks(parallel_tasks) {} ~IRuntimeFilter() = default; static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, int node_id, IRuntimeFilter** res, - bool build_bf_exactly = false); + bool build_bf_exactly = false, bool is_global = false, + int parallel_tasks = 0); void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context); Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context); @@ -359,6 +362,8 @@ class IRuntimeFilter { void set_filter_timer(std::shared_ptr); + Status merge_local_filter(RuntimePredicateWrapper* wrapper, int* merged_num); + protected: // serialize _wrapper to protobuf void to_protobuf(PInFilter* filter); @@ -452,7 +457,12 @@ class IRuntimeFilter { // parent profile // only effect on consumer std::unique_ptr _profile; + RuntimeProfile::Counter* _merge_local_rf_timer = nullptr; bool _opt_remote_rf; + const bool _is_global = false; + std::mutex _local_merge_mutex; + int _merged_rf_num = 0; + const int _parallel_build_tasks = -1; std::vector> _filter_timer; }; diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 62cf0eab7d1915..7223b652a3c71e 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -34,8 +34,10 @@ class VRuntimeFilterSlots { public: VRuntimeFilterSlots( const std::vector>& build_expr_ctxs, - const std::vector& runtime_filter_descs) - : _build_expr_context(build_expr_ctxs), _runtime_filter_descs(runtime_filter_descs) {} + const std::vector& runtime_filter_descs, bool is_global = false) + : _build_expr_context(build_expr_ctxs), + _runtime_filter_descs(runtime_filter_descs), + _is_global(is_global) {} Status init(RuntimeState* state, int64_t hash_table_size) { // runtime filter effect strategy @@ -45,7 +47,10 @@ class VRuntimeFilterSlots { std::map has_in_filter; - auto ignore_local_filter = [state](int filter_id) { + auto ignore_local_filter = [&](int filter_id) { + if (_is_global) { + return Status::OK(); + } std::vector filters; RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filters(filter_id, filters)); if (filters.empty()) { @@ -236,6 +241,7 @@ class VRuntimeFilterSlots { private: const std::vector>& _build_expr_context; const std::vector& _runtime_filter_descs; + const bool _is_global = false; // prob_contition index -> [IRuntimeFilter] std::map> _runtime_filters; }; diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 639687ec74d09f..b7966724ebd929 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -366,15 +366,15 @@ class AggSinkOperatorX : public DataSinkOperatorX { Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - std::vector get_local_shuffle_exprs() const override { return _partition_exprs; } - ExchangeType get_local_exchange_type() const override { + DataDistribution get_local_exchange_type() const override { if (_probe_expr_ctxs.empty()) { return _needs_finalize || DataSinkOperatorX::_child_x ->ignore_data_distribution() - ? ExchangeType::PASSTHROUGH - : ExchangeType::NOOP; + ? DataDistribution(ExchangeType::PASSTHROUGH) + : DataDistribution(ExchangeType::NOOP); } - return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE; + return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } using DataSinkOperatorX::id; diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 765fdec3daff10..e80f1e3d5e8301 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -506,8 +506,9 @@ Status AggLocalState::_get_without_key_result(RuntimeState* state, vectorized::B if (!column_type->is_nullable() || data_types[i]->is_nullable() || !remove_nullable(column_type)->equals(*data_types[i])) { return Status::InternalError( - "column_type not match data_types, column_type={}, data_types={}", - column_type->get_name(), data_types[i]->get_name()); + "node id = {}, column_type not match data_types, column_type={}, " + "data_types={}", + _parent->node_id(), column_type->get_name(), data_types[i]->get_name()); } } diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 8fa1c2e1ea37d0..c938a74e8560c4 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -107,14 +107,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX get_local_shuffle_exprs() const override { return _partition_exprs; } - ExchangeType get_local_exchange_type() const override { + DataDistribution get_local_exchange_type() const override { if (_partition_by_eq_expr_ctxs.empty()) { - return ExchangeType::PASSTHROUGH; + return {ExchangeType::PASSTHROUGH}; } else if (_order_by_eq_expr_ctxs.empty()) { - return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE; + return _is_colocate + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } - return ExchangeType::NOOP; + return {ExchangeType::NOOP}; } private: diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h b/be/src/pipeline/exec/assert_num_rows_operator.h index 060c5be7e5f9f4..1e796b622dc9e7 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.h +++ b/be/src/pipeline/exec/assert_num_rows_operator.h @@ -57,7 +57,9 @@ class AssertNumRowsOperatorX final : public StreamingOperatorX { return _sub_plan_query_statistics_recvr; } - bool need_to_local_shuffle() const override { - return !_is_hash_partition || OperatorX::ignore_data_distribution(); + DataDistribution get_local_exchange_type() const override { + if (!_is_hash_partition || OperatorX::ignore_data_distribution()) { + return {ExchangeType::NOOP}; + } + return {ExchangeType::HASH_SHUFFLE}; } private: diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index b8ae2ca9666b11..c3238df035edff 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -103,7 +103,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter( - p._runtime_filter_descs[i], state->query_options(), _build_expr_ctxs.size() == 1)); + p._runtime_filter_descs[i], state->query_options(), _build_expr_ctxs.size() == 1, + p._use_global_rf, p._child_x->parallel_tasks())); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter( p._runtime_filter_descs[i].filter_id, &_runtime_filters[i])); } @@ -386,12 +387,14 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool use_global_rf) : JoinBuildSinkOperatorX(pool, operator_id, tnode, descs), _join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type : TJoinDistributionType::NONE), _is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join && - tnode.hash_join_node.is_broadcast_join) { + tnode.hash_join_node.is_broadcast_join), + _use_global_rf(use_global_rf) { _runtime_filter_descs = tnode.runtime_filters; } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 580c2bb8ff9f20..b2fbec8575ea6c 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -135,7 +135,7 @@ class HashJoinBuildSinkOperatorX final : public JoinBuildSinkOperatorX { public: HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool use_global_rf); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TDataSink", JoinBuildSinkOperatorX::_name); @@ -155,18 +155,18 @@ class HashJoinBuildSinkOperatorX final ._should_build_hash_table; } - std::vector get_local_shuffle_exprs() const override { return _partition_exprs; } - ExchangeType get_local_exchange_type() const override { + DataDistribution get_local_exchange_type() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return ExchangeType::NOOP; + return {ExchangeType::NOOP}; } else if (_is_broadcast_join) { - return _child_x->ignore_data_distribution() ? ExchangeType::BROADCAST - : ExchangeType::NOOP; + return _child_x->ignore_data_distribution() + ? DataDistribution(ExchangeType::PASS_TO_ONE) + : DataDistribution(ExchangeType::NOOP); } return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE - ? ExchangeType::BUCKET_HASH_SHUFFLE - : ExchangeType::HASH_SHUFFLE; + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } private: @@ -187,6 +187,8 @@ class HashJoinBuildSinkOperatorX final vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr; std::vector _runtime_filter_descs; std::vector _partition_exprs; + + const bool _use_global_rf; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 5ea66375ea029f..16b455e4f6cf02 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -163,17 +163,17 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX get_local_shuffle_exprs() const override { return _partition_exprs; } - ExchangeType get_local_exchange_type() const override { + DataDistribution get_local_exchange_type() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return ExchangeType::NOOP; + return {ExchangeType::NOOP}; } return _is_broadcast_join - ? ExchangeType::PASSTHROUGH + ? DataDistribution(ExchangeType::PASSTHROUGH) : (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE - ? ExchangeType::BUCKET_HASH_SHUFFLE - : ExchangeType::HASH_SHUFFLE); + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs)); } private: diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 9d7b8821c90469..daa976b4e78871 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -102,6 +102,14 @@ class NestedLoopJoinBuildSinkOperatorX final Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; + DataDistribution get_local_exchange_type() const override { + if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + return {ExchangeType::NOOP}; + } + return _child_x->ignore_data_distribution() ? DataDistribution(ExchangeType::BROADCAST) + : DataDistribution(ExchangeType::NOOP); + } + private: friend class NestedLoopJoinBuildSinkLocalState; diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 77652f81c18d97..5e57399eae81d2 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -227,11 +227,11 @@ class NestedLoopJoinProbeOperatorX final return _old_version_flag ? _row_descriptor : *_intermediate_row_desc; } - ExchangeType get_local_exchange_type() const override { + DataDistribution get_local_exchange_type() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return ExchangeType::NOOP; + return {ExchangeType::NOOP}; } - return ExchangeType::ADAPTIVE_PASSTHROUGH; + return {ExchangeType::ADAPTIVE_PASSTHROUGH}; } const RowDescriptor& row_desc() override { diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index ecbbad5c659d35..81bc6b9a3fdf98 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -105,11 +105,11 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorX::init(RuntimeState* state, LocalStateInfo& info) SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); - RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); + RETURN_IF_ERROR(RuntimeFilterConsumer::init(state, p.ignore_data_distribution())); _scan_dependency = ScanDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(), PipelineXLocalState<>::_parent->node_id(), @@ -161,6 +161,7 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) _wait_for_scanner_done_timer = ADD_CHILD_TIMER_WITH_LEVEL(_runtime_profile, "WaitForScannerDone", timer_name, 1); _wait_for_eos_timer = ADD_CHILD_TIMER_WITH_LEVEL(_runtime_profile, "WaitForEos", timer_name, 1); + _wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter"); return Status::OK(); } @@ -1445,6 +1446,7 @@ Status ScanLocalState::close(RuntimeState* state) { return Status::OK(); } COUNTER_UPDATE(exec_time_counter(), _scan_dependency->watcher_elapse_time()); + COUNTER_UPDATE(exec_time_counter(), _filter_dependency->watcher_elapse_time()); SCOPED_TIMER(_close_timer); SCOPED_TIMER(exec_time_counter()); @@ -1453,6 +1455,8 @@ Status ScanLocalState::close(RuntimeState* state) { } COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); + COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time()); + return PipelineXLocalState<>::close(state); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 78cb399427e787..9bc42453c79d97 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -171,8 +171,8 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::Runt RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr; // time of prefilter input block from scanner RuntimeProfile::Counter* _wait_for_eos_timer = nullptr; - RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; + RuntimeProfile::Counter* _wait_for_rf_timer = nullptr; }; template @@ -434,14 +434,15 @@ class ScanOperatorX : public OperatorX { TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } - bool need_to_local_shuffle() const override { - // 1. `_col_distribute_ids` is empty means storage distribution is not effective, so we prefer to do local shuffle. - // 2. `ignore_data_distribution()` returns true means we ignore the distribution. - return _col_distribute_ids.empty() || OperatorX::ignore_data_distribution(); + DataDistribution get_local_exchange_type() const override { + if (_col_distribute_ids.empty() || OperatorX::ignore_data_distribution()) { + // 1. `_col_distribute_ids` is empty means storage distribution is not effective, so we prefer to do local shuffle. + // 2. `ignore_data_distribution()` returns true means we ignore the distribution. + return {ExchangeType::NOOP}; + } + return {ExchangeType::BUCKET_HASH_SHUFFLE}; } - bool is_bucket_shuffle_scan() const override { return !_col_distribute_ids.empty(); } - int64_t get_push_down_count() const { return _push_down_count; } using OperatorX::id; using OperatorX::operator_id; diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index dfa27ebd3ffd89..a86bf4917219d1 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -144,9 +144,9 @@ class SetProbeSinkOperatorX final : public DataSinkOperatorX get_local_shuffle_exprs() const override { return _partition_exprs; } - ExchangeType get_local_exchange_type() const override { - return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE; + DataDistribution get_local_exchange_type() const override { + return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } private: diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 4319ec8f4334bb..375906b5aa3c41 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -129,9 +129,9 @@ class SetSinkOperatorX final : public DataSinkOperatorX get_local_shuffle_exprs() const override { return _partition_exprs; } - ExchangeType get_local_exchange_type() const override { - return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE; + DataDistribution get_local_exchange_type() const override { + return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } private: diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 3f44c118c8f22c..c8a4dda7795bde 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -93,12 +93,12 @@ class SortSinkOperatorX final : public DataSinkOperatorX { Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - ExchangeType get_local_exchange_type() const override { + DataDistribution get_local_exchange_type() const override { if (_merge_by_exchange) { // The current sort node is used for the ORDER BY - return ExchangeType::PASSTHROUGH; + return {ExchangeType::PASSTHROUGH}; } - return ExchangeType::NOOP; + return {ExchangeType::NOOP}; } private: diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index b0327e71cd151c..ef7f71b7e2920b 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -120,7 +120,9 @@ class StreamingAggSinkOperatorX final : public AggSinkOperatorXset_parallel_tasks(num_tasks()); operatorXs.emplace_back(op); if (op->is_source()) { std::reverse(operatorXs.begin(), operatorXs.end()); diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 832a3f51e3c98e..1b123cb5ab4494 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -30,14 +30,9 @@ #include "pipeline/pipeline_x/operator.h" #include "util/runtime_profile.h" -namespace doris { -namespace pipeline { -class PipelineFragmentContext; -} // namespace pipeline -} // namespace doris - namespace doris::pipeline { +class PipelineFragmentContext; class Pipeline; using PipelinePtr = std::shared_ptr; @@ -127,13 +122,20 @@ class Pipeline : public std::enable_shared_from_this { return _collect_query_statistics_with_every_batch; } - bool need_to_local_shuffle() const { return _need_to_local_shuffle; } - void set_need_to_local_shuffle(bool need_to_local_shuffle) { - _need_to_local_shuffle = need_to_local_shuffle; + bool need_to_local_shuffle(const DataDistribution target_data_distribution) const { + if (target_data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE || + target_data_distribution.distribution_type == ExchangeType::HASH_SHUFFLE) { + return target_data_distribution.operator!=(_data_distribution); + } + return true; } - void init_need_to_local_shuffle_by_source() { - set_need_to_local_shuffle(operatorXs.front()->need_to_local_shuffle()); + void init_data_distribution() { + set_data_distribution(operatorXs.front()->get_local_exchange_type()); } + void set_data_distribution(const DataDistribution& data_distribution) { + _data_distribution = data_distribution; + } + const DataDistribution& data_distribution() const { return _data_distribution; } std::vector>& children() { return _children; } void set_children(std::shared_ptr child) { _children.push_back(child); } @@ -141,15 +143,19 @@ class Pipeline : public std::enable_shared_from_this { void incr_created_tasks() { _num_tasks_created++; } bool need_to_create_task() const { return _num_tasks > _num_tasks_created; } - void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; } + void set_num_tasks(int num_tasks) { + _num_tasks = num_tasks; + for (auto& op : operatorXs) { + op->set_parallel_tasks(_num_tasks); + } + } int num_tasks() const { return _num_tasks; } std::string debug_string() { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, - "Pipeline [id: {}, _num_tasks: {}, _num_tasks_created: {}, " - "_need_to_local_shuffle: {}]", - _pipeline_id, _num_tasks, _num_tasks_created, _need_to_local_shuffle); + "Pipeline [id: {}, _num_tasks: {}, _num_tasks_created: {}]", _pipeline_id, + _num_tasks, _num_tasks_created); for (size_t i = 0; i < operatorXs.size(); i++) { fmt::format_to(debug_string_buffer, "\n{}", operatorXs[i]->debug_string(i)); } @@ -217,7 +223,7 @@ class Pipeline : public std::enable_shared_from_this { // 2. is exchange operator with Hash/BucketHash partition // then set `_need_to_local_shuffle` to false which means we should use local shuffle in this fragment // because data already be partitioned by storage/shuffling. - bool _need_to_local_shuffle = true; + DataDistribution _data_distribution {ExchangeType::NOOP}; // How many tasks should be created ? int _num_tasks = 1; diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 103d07c68ca499..86fe982343a427 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -169,6 +169,7 @@ void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) { void RuntimeFilterDependency::sub_filters() { auto value = _filters.fetch_sub(1); if (value == 1) { + _watcher.stop(); std::vector local_block_task {}; { std::unique_lock lc(_task_lock); diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 0aebc472637ab5..3070693f1d4ade 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -566,6 +566,7 @@ enum class ExchangeType : uint8_t { BUCKET_HASH_SHUFFLE = 3, BROADCAST = 4, ADAPTIVE_PASSTHROUGH = 5, + PASS_TO_ONE = 6, }; inline std::string get_exchange_type_name(ExchangeType idx) { @@ -582,11 +583,38 @@ inline std::string get_exchange_type_name(ExchangeType idx) { return "BROADCAST"; case ExchangeType::ADAPTIVE_PASSTHROUGH: return "ADAPTIVE_PASSTHROUGH"; + case ExchangeType::PASS_TO_ONE: + return "PASS_TO_ONE"; } LOG(FATAL) << "__builtin_unreachable"; __builtin_unreachable(); } +struct DataDistribution { + DataDistribution(ExchangeType type) : distribution_type(type) {} + DataDistribution(ExchangeType type, const std::vector& partition_exprs_) + : distribution_type(type), partition_exprs(partition_exprs_) {} + DataDistribution(const DataDistribution& other) + : distribution_type(other.distribution_type), partition_exprs(other.partition_exprs) {} + bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; } + bool operator==(const DataDistribution& other) const { + if (distribution_type == other.distribution_type && + (distribution_type == ExchangeType::HASH_SHUFFLE || + distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) && + (partition_exprs.empty() || other.partition_exprs.empty())) { + return true; + } + return distribution_type == other.distribution_type && + partition_exprs == other.partition_exprs; + } + DataDistribution operator=(const DataDistribution& other) const { + return DataDistribution(other.distribution_type, other.partition_exprs); + } + bool operator!=(const DataDistribution& other) const { return !operator==(other); } + ExchangeType distribution_type; + const std::vector partition_exprs; +}; + class Exchanger; struct LocalExchangeSharedState : public BasicSharedState { diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index b950febfead54f..7c8c18be61cc9f 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -40,6 +40,16 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo return Status::OK(); } +std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, + "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}", + Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, + _exchanger->_num_senders, _exchanger->_num_sources, + _exchanger->_running_sink_operators); + return fmt::to_string(debug_string_buffer); +} + Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) { auto& local_state = get_local_state(state); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index 0f2339410c8a76..87afb2b098deff 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -34,6 +34,7 @@ class Exchanger; class ShuffleExchanger; class PassthroughExchanger; class BroadcastExchanger; +class PassToOneExchanger; class LocalExchangeSinkOperatorX; class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState { @@ -46,8 +47,7 @@ class LocalExchangeSinkLocalState final ~LocalExchangeSinkLocalState() override = default; Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - - int get_data_queue_idx() const; + std::string debug_string(int indentation_level) const override; private: friend class LocalExchangeSinkOperatorX; @@ -55,6 +55,7 @@ class LocalExchangeSinkLocalState final friend class BucketShuffleExchanger; friend class PassthroughExchanger; friend class BroadcastExchanger; + friend class PassToOneExchanger; friend class AdaptivePassthroughExchanger; Exchanger* _exchanger = nullptr; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index 5d705fe9b8f478..c1933f93c716fa 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -51,6 +51,16 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& return Status::OK(); } +std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, + "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}", + Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, + _exchanger->_num_senders, _exchanger->_num_sources, + _exchanger->_running_sink_operators); + return fmt::to_string(debug_string_buffer); +} + Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index 6d0e375a494fe5..63d71bbe08be17 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -36,6 +36,7 @@ class Exchanger; class ShuffleExchanger; class PassthroughExchanger; class BroadcastExchanger; +class PassToOneExchanger; class LocalExchangeSourceOperatorX; class LocalExchangeSourceLocalState final : public PipelineXLocalState { @@ -46,12 +47,14 @@ class LocalExchangeSourceLocalState final : Base(state, parent) {} Status init(RuntimeState* state, LocalStateInfo& info) override; + std::string debug_string(int indentation_level) const override; private: friend class LocalExchangeSourceOperatorX; friend class ShuffleExchanger; friend class PassthroughExchanger; friend class BroadcastExchanger; + friend class PassToOneExchanger; friend class AdaptivePassthroughExchanger; Exchanger* _exchanger = nullptr; @@ -66,6 +69,7 @@ class LocalExchangeSourceOperatorX final : public OperatorXunref(local_state._shared_state); } } - } else if (_num_senders != _num_sources) { + } else if (_num_senders != _num_sources || _ignore_source_data_distribution) { for (size_t i = 0; i < _num_partitions; i++) { size_t start = local_state._partition_rows_histogram[i]; size_t size = local_state._partition_rows_histogram[i + 1] - start; @@ -138,7 +138,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest local_state._shared_state->add_mem_usage( i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), false); data_queue[i % _num_sources].enqueue({new_block_wrapper, {row_idx, start, size}}); - local_state._shared_state->set_ready_to_read(i); + local_state._shared_state->set_ready_to_read(i % _num_sources); } else { new_block_wrapper->unref(local_state._shared_state); } @@ -204,7 +204,7 @@ Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* b return Status::OK(); } -Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, +Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, LocalExchangeSinkLocalState& local_state) { vectorized::Block new_block(in_block->clone_empty()); @@ -215,7 +215,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block return Status::OK(); } -Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, +Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, LocalExchangeSourceLocalState& local_state) { if (local_state._channel_id != 0) { @@ -239,6 +239,39 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } +Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state, + LocalExchangeSinkLocalState& local_state) { + for (size_t i = 0; i < _num_partitions; i++) { + auto mutable_block = vectorized::MutableBlock::create_unique(in_block->clone_empty()); + mutable_block->add_rows(in_block, 0, in_block->rows()); + _data_queue[i].enqueue(mutable_block->to_block()); + local_state._shared_state->set_ready_to_read(i); + } + + return Status::OK(); +} + +Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state, + LocalExchangeSourceLocalState& local_state) { + vectorized::Block next_block; + if (_running_sink_operators == 0) { + if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + *block = std::move(next_block); + } else { + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); + source_state = SourceState::FINISHED; + } + } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + *block = std::move(next_block); + } else { + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); + local_state._dependency->block(); + } + return Status::OK(); +} + Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h index 635e7e1cb32e43..ff64c88d5aafe1 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h @@ -50,6 +50,8 @@ class Exchanger { friend struct LocalExchangeSourceDependency; friend struct LocalExchangeSharedState; friend struct ShuffleBlockWrapper; + friend class LocalExchangeSourceLocalState; + friend class LocalExchangeSinkLocalState; std::atomic _running_sink_operators = 0; const int _num_partitions; const int _num_senders; @@ -86,10 +88,6 @@ class ShuffleExchanger : public Exchanger { : Exchanger(running_sink_operators, num_partitions) { _data_queue.resize(num_partitions); } - ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions) - : Exchanger(running_sink_operators, num_sources, num_partitions) { - _data_queue.resize(num_partitions); - } ~ShuffleExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, LocalExchangeSinkLocalState& local_state) override; @@ -99,17 +97,27 @@ class ShuffleExchanger : public Exchanger { ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; } protected: + ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, + bool ignore_source_data_distribution) + : Exchanger(running_sink_operators, num_sources, num_partitions), + _ignore_source_data_distribution(ignore_source_data_distribution) { + _data_queue.resize(num_partitions); + } Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, SourceState source_state, LocalExchangeSinkLocalState& local_state); std::vector> _data_queue; + + const bool _ignore_source_data_distribution = false; }; class BucketShuffleExchanger : public ShuffleExchanger { ENABLE_FACTORY_CREATOR(BucketShuffleExchanger); - BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions) - : ShuffleExchanger(running_sink_operators, num_sources, num_partitions) {} + BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, + bool ignore_source_data_distribution) + : ShuffleExchanger(running_sink_operators, num_sources, num_partitions, + ignore_source_data_distribution) {} ~BucketShuffleExchanger() override = default; ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } }; @@ -133,6 +141,25 @@ class PassthroughExchanger final : public Exchanger { std::vector> _data_queue; }; +class PassToOneExchanger final : public Exchanger { +public: + ENABLE_FACTORY_CREATOR(PassToOneExchanger); + PassToOneExchanger(int running_sink_operators, int num_partitions) + : Exchanger(running_sink_operators, num_partitions) { + _data_queue.resize(num_partitions); + } + ~PassToOneExchanger() override = default; + Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, + LocalExchangeSinkLocalState& local_state) override; + + Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, + LocalExchangeSourceLocalState& local_state) override; + ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; } + +private: + std::vector> _data_queue; +}; + class BroadcastExchanger final : public Exchanger { public: ENABLE_FACTORY_CREATOR(BroadcastExchanger); @@ -151,6 +178,7 @@ class BroadcastExchanger final : public Exchanger { private: std::vector> _data_queue; }; + //The code in AdaptivePassthroughExchanger is essentially // a copy of ShuffleExchanger and PassthroughExchanger. class AdaptivePassthroughExchanger : public Exchanger { diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 55fa103e193bd2..d40f7f6adb3123 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -88,8 +88,8 @@ std::string PipelineXSinkLocalState::debug_string(int indentatio std::string OperatorXBase::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}{}: id={}", std::string(indentation_level * 2, ' '), - _op_name, node_id()); + fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}", + std::string(indentation_level * 2, ' '), _op_name, node_id(), _parallel_tasks); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index d697970311de3e..fc95785924bfac 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -181,13 +181,17 @@ class OperatorXBase : public OperatorBase { } [[nodiscard]] std::string get_name() const override { return _op_name; } [[nodiscard]] virtual DependencySPtr get_dependency(QueryContext* ctx) = 0; - [[nodiscard]] virtual std::vector get_local_shuffle_exprs() const { return {}; } - [[nodiscard]] virtual ExchangeType get_local_exchange_type() const { + [[nodiscard]] virtual DataDistribution get_local_exchange_type() const { return _child_x && _child_x->ignore_data_distribution() && !is_source() - ? ExchangeType::PASSTHROUGH - : ExchangeType::NOOP; + ? DataDistribution(ExchangeType::PASSTHROUGH) + : DataDistribution(ExchangeType::NOOP); + } + [[nodiscard]] virtual bool ignore_data_distribution() const { + return _child_x ? _child_x->ignore_data_distribution() : _ignore_data_distribution; + } + [[nodiscard]] bool ignore_data_hash_distribution() const { + return _child_x ? _child_x->ignore_data_hash_distribution() : _ignore_data_distribution; } - [[nodiscard]] bool ignore_data_distribution() const { return _ignore_data_distribution; } void set_ignore_data_distribution() { _ignore_data_distribution = true; } Status prepare(RuntimeState* state) override; @@ -198,9 +202,6 @@ class OperatorXBase : public OperatorBase { [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; } - [[nodiscard]] virtual bool need_to_local_shuffle() const { return true; } - [[nodiscard]] virtual bool is_bucket_shuffle_scan() const { return false; } - bool can_read() override { LOG(FATAL) << "should not reach here!"; return false; @@ -278,6 +279,8 @@ class OperatorXBase : public OperatorBase { /// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc Status do_projections(RuntimeState* state, vectorized::Block* origin_block, vectorized::Block* output_block) const; + void set_parallel_tasks(int parallel_tasks) { _parallel_tasks = parallel_tasks; } + int parallel_tasks() const { return _parallel_tasks; } protected: template @@ -303,6 +306,7 @@ class OperatorXBase : public OperatorBase { std::string _op_name; bool _ignore_data_distribution = false; + int _parallel_tasks = 0; }; template @@ -477,10 +481,10 @@ class DataSinkOperatorXBase : public OperatorBase { } virtual void get_dependency(std::vector& dependency, QueryContext* ctx) = 0; - virtual std::vector get_local_shuffle_exprs() const { return {}; } - virtual ExchangeType get_local_exchange_type() const { - return _child_x && _child_x->ignore_data_distribution() ? ExchangeType::PASSTHROUGH - : ExchangeType::NOOP; + virtual DataDistribution get_local_exchange_type() const { + return _child_x && _child_x->ignore_data_distribution() + ? DataDistribution(ExchangeType::PASSTHROUGH) + : DataDistribution(ExchangeType::NOOP); } Status close(RuntimeState* state) override { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 6cb666da41dd74..09faf87a09a50d 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -254,43 +254,45 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r Status PipelineXFragmentContext::_plan_local_exchange( int num_buckets, const std::map& bucket_seq_to_instance_idx) { for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) { - _pipelines[pip_idx]->init_need_to_local_shuffle_by_source(); + _pipelines[pip_idx]->init_data_distribution(); // Set property if child pipeline is not join operator's child. if (!_pipelines[pip_idx]->children().empty()) { for (auto& child : _pipelines[pip_idx]->children()) { if (child->sink_x()->node_id() == _pipelines[pip_idx]->operator_xs().front()->node_id()) { - _pipelines[pip_idx]->set_need_to_local_shuffle( - _pipelines[pip_idx]->need_to_local_shuffle() && - child->need_to_local_shuffle()); + RETURN_IF_ERROR(_pipelines[pip_idx]->operator_xs().front()->set_child( + child->operator_xs().back())); + _pipelines[pip_idx]->set_data_distribution(child->data_distribution()); } } } RETURN_IF_ERROR(_plan_local_exchange( - _pipelines[pip_idx]->operator_xs().front()->ignore_data_distribution() + _pipelines[pip_idx]->operator_xs().front()->ignore_data_hash_distribution() ? _num_instances : num_buckets, - pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx)); + pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx, + _pipelines[pip_idx]->operator_xs().front()->ignore_data_hash_distribution())); } return Status::OK(); } Status PipelineXFragmentContext::_plan_local_exchange( int num_buckets, int pip_idx, PipelinePtr pip, - const std::map& bucket_seq_to_instance_idx) { - int idx = 0; + const std::map& bucket_seq_to_instance_idx, + const bool ignore_data_hash_distribution) { + int idx = 1; bool do_local_exchange = false; do { auto& ops = pip->operator_xs(); do_local_exchange = false; // Plan local exchange for each operator. for (; idx < ops.size();) { - if (ops[idx]->get_local_exchange_type() != ExchangeType::NOOP) { + if (ops[idx]->get_local_exchange_type().need_local_exchange()) { RETURN_IF_ERROR(_add_local_exchange( pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, - ops[idx]->get_local_shuffle_exprs(), ops[idx]->get_local_exchange_type(), - &do_local_exchange, num_buckets, bucket_seq_to_instance_idx)); + ops[idx]->get_local_exchange_type(), &do_local_exchange, num_buckets, + bucket_seq_to_instance_idx, ignore_data_hash_distribution)); } if (do_local_exchange) { // If local exchange is needed for current operator, we will split this pipeline to @@ -303,11 +305,11 @@ Status PipelineXFragmentContext::_plan_local_exchange( idx++; } } while (do_local_exchange); - if (pip->sink_x()->get_local_exchange_type() != ExchangeType::NOOP) { + if (pip->sink_x()->get_local_exchange_type().need_local_exchange()) { RETURN_IF_ERROR(_add_local_exchange( pip_idx, idx, pip->sink_x()->node_id(), _runtime_state->obj_pool(), pip, - pip->sink_x()->get_local_shuffle_exprs(), pip->sink_x()->get_local_exchange_type(), - &do_local_exchange, num_buckets, bucket_seq_to_instance_idx)); + pip->sink_x()->get_local_exchange_type(), &do_local_exchange, num_buckets, + bucket_seq_to_instance_idx, ignore_data_hash_distribution)); } return Status::OK(); } @@ -706,58 +708,25 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, return Status::OK(); } -void PipelineXFragmentContext::_inherit_pipeline_properties(ExchangeType exchange_type, - PipelinePtr pipe_with_source, - PipelinePtr pipe_with_sink) { +void PipelineXFragmentContext::_inherit_pipeline_properties( + const DataDistribution& data_distribution, PipelinePtr pipe_with_source, + PipelinePtr pipe_with_sink) { pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks()); pipe_with_source->set_num_tasks(_num_instances); - switch (exchange_type) { - case ExchangeType::HASH_SHUFFLE: - // If HASH_SHUFFLE local exchanger is planned, data will be always HASH distribution so we - // do not need to plan another shuffle local exchange in the rest of current pipeline. - pipe_with_sink->set_need_to_local_shuffle(false); - pipe_with_source->set_need_to_local_shuffle(false); - break; - case ExchangeType::BUCKET_HASH_SHUFFLE: - // Same as ExchangeType::HASH_SHUFFLE. - pipe_with_sink->set_need_to_local_shuffle(false); - pipe_with_source->set_need_to_local_shuffle(false); - break; - case ExchangeType::PASSTHROUGH: - // If PASSTHROUGH local exchanger is planned, data will be split randomly. So we should make - // sure remaining operators should use local shuffle to make data distribution right. - pipe_with_sink->set_need_to_local_shuffle(pipe_with_source->need_to_local_shuffle()); - pipe_with_source->set_need_to_local_shuffle(true); - break; - case ExchangeType::BROADCAST: - // If PASSTHROUGH local exchanger is planned, data will be split randomly. So we should make - // sure remaining operators should use local shuffle to make data distribution right. - pipe_with_sink->set_need_to_local_shuffle(pipe_with_source->need_to_local_shuffle()); - pipe_with_source->set_need_to_local_shuffle(true); - break; - case ExchangeType::ADAPTIVE_PASSTHROUGH: - // ADAPTIVE_PASSTHROUGH is a combination of SHUFFLE and PASSTHROUGH, - // with the former being the SHUFFLE and the latter being the PASSTHROUGH . - pipe_with_sink->set_need_to_local_shuffle(pipe_with_source->need_to_local_shuffle()); - pipe_with_source->set_need_to_local_shuffle(true); - break; - default: - __builtin_unreachable(); - } + pipe_with_source->set_data_distribution(data_distribution); } Status PipelineXFragmentContext::_add_local_exchange( int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe, - const std::vector& texprs, ExchangeType exchange_type, bool* do_local_exchange, - int num_buckets, const std::map& bucket_seq_to_instance_idx) { + DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, + const std::map& bucket_seq_to_instance_idx, + const bool ignore_data_hash_distribution) { DCHECK(_enable_local_shuffle()); if (_num_instances <= 1) { return Status::OK(); } - if (!cur_pipe->need_to_local_shuffle() && - (exchange_type == ExchangeType::HASH_SHUFFLE || - exchange_type == ExchangeType::BUCKET_HASH_SHUFFLE)) { + if (!cur_pipe->need_to_local_shuffle(data_distribution)) { return Status::OK(); } *do_local_exchange = true; @@ -770,40 +739,42 @@ Status PipelineXFragmentContext::_add_local_exchange( auto new_pip = add_pipeline(cur_pipe, pip_idx + 1); DataSinkOperatorXPtr sink; auto sink_id = next_sink_operator_id(); - sink.reset(new LocalExchangeSinkOperatorX(sink_id, local_exchange_id, _num_instances, texprs, + sink.reset(new LocalExchangeSinkOperatorX(sink_id, local_exchange_id, _num_instances, + data_distribution.partition_exprs, bucket_seq_to_instance_idx)); RETURN_IF_ERROR(new_pip->set_sink(sink)); - RETURN_IF_ERROR(new_pip->sink_x()->init(exchange_type, num_buckets)); - - // 2. Inherit properties from current pipeline. - _inherit_pipeline_properties(exchange_type, cur_pipe, new_pip); + RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets)); - // 3. Create and initialize LocalExchangeSharedState. + // 2. Create and initialize LocalExchangeSharedState. auto shared_state = LocalExchangeSharedState::create_shared(); - switch (exchange_type) { + switch (data_distribution.distribution_type) { case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = - ShuffleExchanger::create_unique(new_pip->num_tasks(), _num_instances); + ShuffleExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); break; case ExchangeType::BUCKET_HASH_SHUFFLE: shared_state->exchanger = BucketShuffleExchanger::create_unique( - new_pip->num_tasks(), _num_instances, num_buckets); + cur_pipe->num_tasks(), _num_instances, num_buckets, ignore_data_hash_distribution); break; case ExchangeType::PASSTHROUGH: shared_state->exchanger = - PassthroughExchanger::create_unique(new_pip->num_tasks(), _num_instances); + PassthroughExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); break; case ExchangeType::BROADCAST: shared_state->exchanger = - BroadcastExchanger::create_unique(new_pip->num_tasks(), _num_instances); + BroadcastExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); + break; + case ExchangeType::PASS_TO_ONE: + shared_state->exchanger = + BroadcastExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); break; case ExchangeType::ADAPTIVE_PASSTHROUGH: shared_state->exchanger = - AdaptivePassthroughExchanger::create_unique(new_pip->num_tasks(), _num_instances); + AdaptivePassthroughExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); break; default: return Status::InternalError("Unsupported local exchange type : " + - std::to_string((int)exchange_type)); + std::to_string((int)data_distribution.distribution_type)); } shared_state->source_dependencies.resize(_num_instances, nullptr); shared_state->mem_trackers.resize(_num_instances, nullptr); @@ -813,27 +784,27 @@ Status PipelineXFragmentContext::_add_local_exchange( shared_state->sink_dependency = sink_dep.get(); _op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}}); - // 4. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to + // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink]. - // 4.1 Initialize new pipeline's operator list. + // 3.1 Initialize new pipeline's operator list. std::copy(operator_xs.begin(), operator_xs.begin() + idx, std::inserter(new_pip->operator_xs(), new_pip->operator_xs().end())); - // 4.2 Erase unused operators in previous pipeline. + // 3.2 Erase unused operators in previous pipeline. operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx); - // 5. Initialize LocalExchangeSource and insert it into this pipeline. + // 4. Initialize LocalExchangeSource and insert it into this pipeline. OperatorXPtr source_op; source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id)); - RETURN_IF_ERROR(source_op->init(exchange_type)); + RETURN_IF_ERROR(source_op->set_child(new_pip->operator_xs().back())); + RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type)); if (!operator_xs.empty()) { RETURN_IF_ERROR(operator_xs.front()->set_child(source_op)); } operator_xs.insert(operator_xs.begin(), source_op); - RETURN_IF_ERROR(source_op->set_child(new_pip->operator_xs().back())); - // 6. Set children for two pipelines separately. + // 5. Set children for two pipelines separately. std::vector> new_children; std::vector edges_with_source; for (auto child : cur_pipe->children()) { @@ -852,7 +823,7 @@ Status PipelineXFragmentContext::_add_local_exchange( new_children.push_back(new_pip); edges_with_source.push_back(new_pip->id()); - // 7. Set DAG for new pipelines. + // 6. Set DAG for new pipelines. if (!new_pip->children().empty()) { std::vector edges_with_sink; for (auto child : new_pip->children()) { @@ -865,6 +836,9 @@ Status PipelineXFragmentContext::_add_local_exchange( RETURN_IF_ERROR(new_pip->sink_x()->set_child(new_pip->operator_xs().back())); RETURN_IF_ERROR(cur_pipe->sink_x()->set_child(cur_pipe->operator_xs().back())); + // 7. Inherit properties from current pipeline. + _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip); + CHECK(total_op_num + 1 == cur_pipe->operator_xs().size() + new_pip->operator_xs().size()) << "total_op_num: " << total_op_num << " cur_pipe->operator_xs().size(): " << cur_pipe->operator_xs().size() @@ -939,8 +913,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), descs, num_senders)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); if (request.__isset.parallel_instances) { - cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); + cur_pipe->set_num_tasks(request.parallel_instances); } break; } @@ -1010,7 +984,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new HashJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new HashJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + request.__isset.parallel_instances)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 81672ac465fc52..be0e26461debeb 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -123,11 +123,12 @@ class PipelineXFragmentContext : public PipelineFragmentContext { void _close_fragment_instance() override; Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override; Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool* pool, - PipelinePtr cur_pipe, const std::vector& texprs, - ExchangeType exchange_type, bool* do_local_exchange, int num_buckets, - const std::map& bucket_seq_to_instance_idx); - void _inherit_pipeline_properties(ExchangeType exchange_type, PipelinePtr pipe_with_source, - PipelinePtr pipe_with_sink); + PipelinePtr cur_pipe, DataDistribution data_distribution, + bool* do_local_exchange, int num_buckets, + const std::map& bucket_seq_to_instance_idx, + const bool ignore_data_distribution); + void _inherit_pipeline_properties(const DataDistribution& data_distribution, + PipelinePtr pipe_with_source, PipelinePtr pipe_with_sink); [[nodiscard]] Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, @@ -156,7 +157,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext { Status _plan_local_exchange(int num_buckets, const std::map& bucket_seq_to_instance_idx); Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip, - const std::map& bucket_seq_to_instance_idx); + const std::map& bucket_seq_to_instance_idx, + const bool ignore_data_distribution); bool _has_inverted_index_or_partial_update(TOlapTableSink sink); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 879b225896c181..4072b218e1d3a1 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -98,7 +98,8 @@ Status RuntimeFilterMgr::get_consume_filters(const int filter_id, std::lock_guard l(_lock); auto iter = _consumer_map.find(key); if (iter == _consumer_map.end()) { - return Status::InvalidArgument("unknown filter: {}, role: CONSUMER", key); + return Status::InvalidArgument("unknown filter: {}, role: CONSUMER. stack trace: {}", key, + get_stack_trace()); } for (auto& holder : iter->second) { consumer_filters.emplace_back(holder.filter); @@ -108,7 +109,7 @@ Status RuntimeFilterMgr::get_consume_filters(const int filter_id, Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, - bool build_bf_exactly, int merged_rf_num) { + bool build_bf_exactly, bool is_global) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; @@ -131,6 +132,20 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc RuntimeFilterRole::CONSUMER, node_id, &filter, build_bf_exactly)); _consumer_map[key].emplace_back(node_id, filter); + } else if (is_global) { + if (iter != _consumer_map.end()) { + for (auto holder : iter->second) { + if (holder.node_id == node_id) { + return Status::OK(); + } + } + } + + IRuntimeFilter* filter; + RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc, &options, + RuntimeFilterRole::CONSUMER, node_id, &filter, + build_bf_exactly, is_global)); + _consumer_map[key].emplace_back(node_id, filter); } else { if (iter != _consumer_map.end()) { for (auto holder : iter->second) { @@ -151,7 +166,8 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - bool build_bf_exactly) { + bool build_bf_exactly, bool is_global, + int parallel_tasks) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; std::lock_guard l(_lock); @@ -164,7 +180,7 @@ Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc IRuntimeFilter* filter; RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, RuntimeFilterRole::PRODUCER, -1, &filter, - build_bf_exactly)); + build_bf_exactly, is_global, parallel_tasks)); _producer_map.emplace(key, filter); return Status::OK(); } diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index e0340216b56191..86e470a706e979 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -83,9 +83,10 @@ class RuntimeFilterMgr { // register filter Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, bool build_bf_exactly = false, - int merged_rf_num = 0); + bool is_global = false); Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - bool build_bf_exactly = false); + bool build_bf_exactly = false, bool is_global = false, + int parallel_tasks = 0); // update filter by remote Status update_filter(const PPublishFilterRequest* request, diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index d97338da86dddd..b7146c7c6a7e45 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -30,9 +30,9 @@ RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id, _blocked_by_rf = std::make_shared(false); } -Status RuntimeFilterConsumer::init(RuntimeState* state, int parallel_tasks) { +Status RuntimeFilterConsumer::init(RuntimeState* state, bool is_global) { _state = state; - RETURN_IF_ERROR(_register_runtime_filter(parallel_tasks)); + RETURN_IF_ERROR(_register_runtime_filter(is_global)); return Status::OK(); } @@ -45,7 +45,7 @@ void RuntimeFilterConsumer::_init_profile(RuntimeProfile* profile) { profile->add_info_string("RuntimeFilters: ", ss.str()); } -Status RuntimeFilterConsumer::_register_runtime_filter(int parallel_tasks) { +Status RuntimeFilterConsumer::_register_runtime_filter(bool is_global) { int filter_size = _runtime_filter_descs.size(); _runtime_filter_ctxs.reserve(filter_size); _runtime_filter_ready_flag.reserve(filter_size); @@ -58,12 +58,18 @@ Status RuntimeFilterConsumer::_register_runtime_filter(int parallel_tasks) { // 1. All BE and FE has been upgraded (e.g. opt_remote_rf) // 2. This filter is bloom filter (only bloom filter should be used for merging) RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( - filter_desc, _state->query_options(), _filter_id, false)); + filter_desc, _state->query_options(), _filter_id, false, is_global)); + RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( + filter_desc.filter_id, _filter_id, &runtime_filter)); + } else if (is_global) { + // For pipelineX engine, runtime filter is global iff data distribution is ignored. + RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( + filter_desc, _state->query_options(), _filter_id, false, is_global)); RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( filter_desc.filter_id, _filter_id, &runtime_filter)); } else { RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter( - filter_desc, _state->query_options(), _filter_id, false)); + filter_desc, _state->query_options(), _filter_id, false, is_global)); RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter( filter_desc.filter_id, _filter_id, &runtime_filter)); } diff --git a/be/src/vec/exec/runtime_filter_consumer.h b/be/src/vec/exec/runtime_filter_consumer.h index 00c10cfa9d8d89..15b9455ac56de4 100644 --- a/be/src/vec/exec/runtime_filter_consumer.h +++ b/be/src/vec/exec/runtime_filter_consumer.h @@ -30,7 +30,7 @@ class RuntimeFilterConsumer { const RowDescriptor& row_descriptor, VExprContextSPtrs& conjuncts); ~RuntimeFilterConsumer() = default; - Status init(RuntimeState* state, int parallel_tasks = 0); + Status init(RuntimeState* state, bool is_global = false); // Try to append late arrived runtime filters. // Return num of filters which are applied already. @@ -42,7 +42,7 @@ class RuntimeFilterConsumer { protected: // Register and get all runtime filters at Init phase. - Status _register_runtime_filter(int parallel_tasks); + Status _register_runtime_filter(bool is_global); // Get all arrived runtime filters at Open phase. Status _acquire_runtime_filter(); // Append late-arrival runtime filters to the vconjunct_ctx. diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 2f1451c5c47e85..55cfc2d232f4a1 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -106,6 +106,22 @@ Status Channel::init(RuntimeState* state) { return Status::OK(); } +template +std::shared_ptr +PipChannel::get_local_channel_dependency() { + if (!Channel::_local_recvr) { + if constexpr (std::is_same_v) { + throw Exception(ErrorCode::INTERNAL_ERROR, + "_local_recvr is null: " + + std::to_string(Channel::_parent->parent()->node_id())); + } else { + throw Exception(ErrorCode::INTERNAL_ERROR, "_local_recvr is null"); + } + } + return Channel::_local_recvr->get_local_channel_dependency( + Channel::_parent->sender_id()); +} + template Status Channel::send_current_block(bool eos, Status exec_status) { // FIXME: Now, local exchange will cause the performance problem is in a multi-threaded scenario @@ -825,6 +841,8 @@ bool VDataStreamSender::channel_all_can_write() { template class Channel; template class Channel; +template class PipChannel; +template class PipChannel; template class Channel; template class BlockSerializer; template class BlockSerializer; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index d66295c8705ef8..c4bf7fdd4564cf 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -546,13 +546,7 @@ class PipChannel final : public Channel { return _send_callback; } - std::shared_ptr get_local_channel_dependency() { - if (!Channel::_local_recvr) { - throw Exception(ErrorCode::INTERNAL_ERROR, "_local_recvr is null"); - } - return Channel::_local_recvr->get_local_channel_dependency( - Channel::_parent->sender_id()); - } + std::shared_ptr get_local_channel_dependency(); private: friend class VDataStreamSender; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index cff906df208553..ea6467db49623c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -363,6 +363,10 @@ private void pushDownRuntimeFilterCommon(PhysicalHashJoin legalTypes = Arrays.stream(TRuntimeFilterType.values()) .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); + if (ctx.getSessionVariable().isIgnoreScanDistribution()) { + legalTypes.clear(); + legalTypes.add(TRuntimeFilterType.BLOOM); + } List hashJoinConjuncts = join.getEqualToConjuncts(); for (int i = 0; i < hashJoinConjuncts.size(); i++) { EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder( diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 3c8dd8c1dc80e6..bbf2b95f1204ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1624,7 +1624,9 @@ private void computeFragmentExecParams() throws Exception { }); } else { // add destination host to this fragment's destination - for (int j = 0; j < destParams.instanceExecParams.size(); ++j) { + int parallelTasksNum = destParams.ignoreDataDistribution + ? destParams.parallelTasksNum : destParams.instanceExecParams.size(); + for (int j = 0; j < parallelTasksNum; ++j) { TPlanFragmentDestination dest = new TPlanFragmentDestination(); dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 7578da745e030f..1fcefb3c7b6c64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -787,7 +787,7 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = IGNORE_SCAN_DISTRIBUTION, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) - private boolean ignoreScanDistribution = false; + private boolean ignoreScanDistribution = true; @VariableMgr.VarAttr( name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, From 53c40c8993b37b86265a8eb7b320c3b0109e022a Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 20 Dec 2023 14:46:14 +0800 Subject: [PATCH 2/4] update --- be/src/exprs/runtime_filter.cpp | 20 +++++--------------- be/src/exprs/runtime_filter.h | 6 ++++++ be/src/pipeline/pipeline.h | 9 ++++----- be/src/pipeline/pipeline_x/dependency.h | 6 ++++++ be/src/runtime/runtime_filter_mgr.cpp | 2 +- 5 files changed, 22 insertions(+), 21 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index d13aa71c4e6e0a..8e8e3dfd8cdb66 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -889,21 +889,13 @@ class RuntimePredicateWrapper { return Status::OK(); } - PrimitiveType column_type() { - return _column_return_type; - } + PrimitiveType column_type() { return _column_return_type; } - bool is_bloomfilter() const { - return _is_bloomfilter; - } + bool is_bloomfilter() const { return _is_bloomfilter; } - bool is_ignored_in_filter() const { - return _is_ignored_in_filter; - } + bool is_ignored_in_filter() const { return _is_ignored_in_filter; } - std::string* get_ignored_in_filter_msg() const { - return _ignored_in_filter_msg; - } + std::string* get_ignored_in_filter_msg() const { return _ignored_in_filter_msg; } void batch_assign(const PInFilter* filter, void (*assign_func)(std::shared_ptr& _hybrid_set, @@ -914,9 +906,7 @@ class RuntimePredicateWrapper { } } - size_t get_in_filter_size() const { - return _context.hybrid_set->size(); - } + size_t get_in_filter_size() const { return _context.hybrid_set->size(); } std::shared_ptr get_bitmap_filter() const { return _context.bitmap_filter_func; diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 5353f7da97ee2e..32fee99173e78b 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -459,8 +459,14 @@ class IRuntimeFilter { std::unique_ptr _profile; RuntimeProfile::Counter* _merge_local_rf_timer = nullptr; bool _opt_remote_rf; + // `_is_global` indicates whether this runtime filter is global on this BE. + // All runtime filters should be merged on each BE if it is global. + // This is improvement for pipelineX. const bool _is_global = false; std::mutex _local_merge_mutex; + // There are `_parallel_build_tasks` pipeline tasks to build runtime filter. + // We should call `signal` once all runtime filters are done and merged to one + // (e.g. `_merged_rf_num` is equal to `_parallel_build_tasks`). int _merged_rf_num = 0; const int _parallel_build_tasks = -1; diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 1b123cb5ab4494..70f2a273151ae3 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -125,6 +125,8 @@ class Pipeline : public std::enable_shared_from_this { bool need_to_local_shuffle(const DataDistribution target_data_distribution) const { if (target_data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE || target_data_distribution.distribution_type == ExchangeType::HASH_SHUFFLE) { + // If `_data_distribution` of this pipeline does not match the `target_data_distribution`, + // we should do local shuffle. return target_data_distribution.operator!=(_data_distribution); } return true; @@ -218,11 +220,8 @@ class Pipeline : public std::enable_shared_from_this { bool _is_root_pipeline = false; bool _collect_query_statistics_with_every_batch = false; - // If source operator meets one of all conditions below: - // 1. is scan operator with Hash Bucket - // 2. is exchange operator with Hash/BucketHash partition - // then set `_need_to_local_shuffle` to false which means we should use local shuffle in this fragment - // because data already be partitioned by storage/shuffling. + // Input data distribution of this pipeline. We do local exchange when input data distribution + // does not match the target data distribution. DataDistribution _data_distribution {ExchangeType::NOOP}; // How many tasks should be created ? diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 3070693f1d4ade..b9f92f9fa3e623 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -561,11 +561,17 @@ struct SetSharedState : public BasicSharedState { enum class ExchangeType : uint8_t { NOOP = 0, + // Shuffle data by Crc32HashPartitioner. HASH_SHUFFLE = 1, + // Round-robin passthrough data blocks. PASSTHROUGH = 2, + // Shuffle data by Crc32HashPartitioner (e.g. same as storage engine). BUCKET_HASH_SHUFFLE = 3, + // Passthrough data blocks to all channels. BROADCAST = 4, + // Passthrough data to channels evenly in an adaptive way. ADAPTIVE_PASSTHROUGH = 5, + // Send all data to the first channel. PASS_TO_ONE = 6, }; diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 4072b218e1d3a1..588640c1612664 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -142,7 +142,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc } IRuntimeFilter* filter; - RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc, &options, + RETURN_IF_ERROR(IRuntimeFilter::create(_state, _state->obj_pool(), &desc, &options, RuntimeFilterRole::CONSUMER, node_id, &filter, build_bf_exactly, is_global)); _consumer_map[key].emplace_back(node_id, filter); From 58194850684973a362fd942e9cd81d1aadb05f69 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 20 Dec 2023 17:00:14 +0800 Subject: [PATCH 3/4] update --- be/src/pipeline/exec/aggregation_sink_operator.h | 2 +- be/src/pipeline/exec/analytic_sink_operator.h | 2 +- be/src/pipeline/exec/partition_sort_sink_operator.h | 2 +- be/src/pipeline/exec/sort_sink_operator.h | 2 +- be/src/pipeline/pipeline_x/dependency.h | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index b7966724ebd929..2cd6ef5093970a 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -371,7 +371,7 @@ class AggSinkOperatorX : public DataSinkOperatorX { return _needs_finalize || DataSinkOperatorX::_child_x ->ignore_data_distribution() ? DataDistribution(ExchangeType::PASSTHROUGH) - : DataDistribution(ExchangeType::NOOP); + : DataSinkOperatorX::get_local_exchange_type(); } return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index c938a74e8560c4..14ed8c815b1287 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -115,7 +115,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX::get_local_exchange_type(); } private: diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index 81bc6b9a3fdf98..1a47e0fa133500 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -107,7 +107,7 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorX::get_local_exchange_type(); } return {ExchangeType::PASSTHROUGH}; } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index c8a4dda7795bde..3146e915eefccd 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -98,7 +98,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { // The current sort node is used for the ORDER BY return {ExchangeType::PASSTHROUGH}; } - return {ExchangeType::NOOP}; + return DataSinkOperatorX::get_local_exchange_type(); } private: diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index b9f92f9fa3e623..a136ed710fe8f6 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -126,7 +126,7 @@ class Dependency : public std::enable_shared_from_this { std::atomic _ready; const QueryContext* _query_ctx = nullptr; - std::shared_ptr _shared_state; + std::shared_ptr _shared_state = nullptr; MonotonicStopWatch _watcher; std::list> _children; From 50c325dbc3bbd71f94e42188afc0205a75527ae7 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 20 Dec 2023 18:40:52 +0800 Subject: [PATCH 4/4] update --- be/src/runtime/runtime_filter_mgr.cpp | 3 +-- .../processor/post/RuntimeFilterGenerator.java | 8 +++++++- .../org/apache/doris/planner/OlapScanNode.java | 2 +- .../java/org/apache/doris/planner/ScanNode.java | 4 ++-- .../java/org/apache/doris/qe/Coordinator.java | 12 ++++++------ .../org/apache/doris/qe/SessionVariable.java | 17 +++++++++-------- 6 files changed, 26 insertions(+), 20 deletions(-) diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 588640c1612664..b67bc5ffd3b461 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -98,8 +98,7 @@ Status RuntimeFilterMgr::get_consume_filters(const int filter_id, std::lock_guard l(_lock); auto iter = _consumer_map.find(key); if (iter == _consumer_map.end()) { - return Status::InvalidArgument("unknown filter: {}, role: CONSUMER. stack trace: {}", key, - get_stack_trace()); + return Status::InvalidArgument("unknown filter: {}, role: CONSUMER.", key); } for (auto& holder : iter->second) { consumer_filters.emplace_back(holder.filter); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index ea6467db49623c..bae483c29397f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -289,6 +289,11 @@ public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin legalTypes = Arrays.stream(TRuntimeFilterType.values()) .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); - if (ctx.getSessionVariable().isIgnoreScanDistribution()) { + if (ctx.getSessionVariable().isIgnoreStorageDataDistribution()) { + // If storage data distribution is ignored, we use BLOOM filter. legalTypes.clear(); legalTypes.add(TRuntimeFilterType.BLOOM); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index cbf832053e16c3..8d7a5c0ab261e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1297,7 +1297,7 @@ public int getNumInstances() { return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); } if (ConnectContext.get().getSessionVariable().getEnablePipelineXEngine() - && ConnectContext.get().getSessionVariable().isIgnoreScanDistribution()) { + && ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) { return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); } return scanRangeLocations.size(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 6eece9aa5459e2..ff744d52627877 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -720,9 +720,9 @@ public boolean shouldDisableSharedScan(ConnectContext context) { || getShouldColoScan(); } - public boolean ignoreScanDistribution(ConnectContext context) { + public boolean ignoreStorageDataDistribution(ConnectContext context) { return !isKeySearch() && context != null - && context.getSessionVariable().isIgnoreScanDistribution() + && context.getSessionVariable().isIgnoreStorageDataDistribution() && context.getSessionVariable().getEnablePipelineXEngine() && !fragment.isHasColocateFinalizeAggNode() && !fragment.isHasNullAwareLeftAntiJoin(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index bbf2b95f1204ea..0a89d6f5afb0df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2010,7 +2010,7 @@ private void computeFragmentHosts() throws Exception { // 4. Disable shared scan optimization by session variable boolean sharedScan = true; if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) - || (node.get().ignoreScanDistribution(context) && useNereids))) { + || (node.get().ignoreStorageDataDistribution(context) && useNereids))) { int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); expectedInstanceNum = Math.max(expectedInstanceNum, 1); @@ -2837,9 +2837,9 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId); Set scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId); - boolean ignoreScanDistribution = scanNodes.stream().filter(scanNode -> { + boolean ignoreStorageDataDistribution = scanNodes.stream().filter(scanNode -> { return scanNodeIds.contains(scanNode.getId().asInt()); - }).allMatch(node -> node.ignoreScanDistribution(context)) && useNereids; + }).allMatch(node -> node.ignoreStorageDataDistribution(context)) && useNereids; // 1. count each node in one fragment should scan how many tablet, gather them in one list Map>>>> addressToScanRanges @@ -2870,7 +2870,7 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc Map> range = findOrInsert(assignment, addressScanRange.getKey(), new HashMap<>()); - if (ignoreScanDistribution) { + if (ignoreStorageDataDistribution) { FInstanceExecParam instanceParam = new FInstanceExecParam( null, addressScanRange.getKey(), 0, params); @@ -2926,8 +2926,8 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc } } } - params.parallelTasksNum = ignoreScanDistribution ? 1 : params.instanceExecParams.size(); - params.ignoreDataDistribution = ignoreScanDistribution; + params.parallelTasksNum = ignoreStorageDataDistribution ? 1 : params.instanceExecParams.size(); + params.ignoreDataDistribution = ignoreStorageDataDistribution; } private final Map fragmentIdTobucketSeqToScanRangeMap = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 1fcefb3c7b6c64..182cfc4424aff3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -219,7 +219,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_PIPELINE_X_ENGINE = "enable_pipeline_x_engine"; public static final String ENABLE_SHARED_SCAN = "enable_shared_scan"; - public static final String IGNORE_SCAN_DISTRIBUTION = "ignore_scan_distribution"; + + public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution"; public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle"; @@ -785,9 +786,9 @@ public class SessionVariable implements Serializable, Writable { needForward = true) private boolean enableSharedScan = false; - @VariableMgr.VarAttr(name = IGNORE_SCAN_DISTRIBUTION, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, - needForward = true) - private boolean ignoreScanDistribution = true; + @VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false, + varType = VariableAnnotation.EXPERIMENTAL, needForward = true) + private boolean ignoreStorageDataDistribution = false; @VariableMgr.VarAttr( name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, @@ -3173,11 +3174,11 @@ public boolean isMaterializedViewRewriteEnableContainForeignTable() { return materializedViewRewriteEnableContainForeignTable; } - public boolean isIgnoreScanDistribution() { - return ignoreScanDistribution && getEnablePipelineXEngine() && enableLocalShuffle; + public boolean isIgnoreStorageDataDistribution() { + return ignoreStorageDataDistribution && getEnablePipelineXEngine() && enableLocalShuffle; } - public void setIgnoreScanDistribution(boolean ignoreScanDistribution) { - this.ignoreScanDistribution = ignoreScanDistribution; + public void setIgnoreStorageDataDistribution(boolean ignoreStorageDataDistribution) { + this.ignoreStorageDataDistribution = ignoreStorageDataDistribution; } }