From 9b2e450fed52452b240fa6cc4bc0df44cf5b8bee Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 26 Sep 2024 16:25:42 +0800 Subject: [PATCH 1/2] [fix](shuffle) Fix remaining tasks if all tasks are running on single BE --- .../pipeline/exec/exchange_sink_operator.cpp | 35 +++++++++++-------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 4773c7a90bd89f..518620ba6b4d3e 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -132,13 +132,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { PUniqueId id; id.set_hi(_state->query_id().hi); id.set_lo(_state->query_id().lo); - _sink_buffer = std::make_unique(id, p._dest_node_id, _sender_id, - _state->be_number(), state, this); - register_channels(_sink_buffer.get()); - _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "ExchangeSinkQueueDependency", true); - _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); + if (!only_local_exchange) { + _sink_buffer = std::make_unique(id, p._dest_node_id, _sender_id, + _state->be_number(), state, this); + register_channels(_sink_buffer.get()); + _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "ExchangeSinkQueueDependency", true); + _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); + _finish_dependency->block(); + } + if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { _broadcast_dependency = Dependency::create_shared( @@ -244,7 +248,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { fmt::format("Crc32HashPartitioner({})", _partition_count)); } - _finish_dependency->block(); if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { @@ -559,8 +562,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block final_st = st; } } - local_state._sink_buffer->set_should_stop(); - return final_st; + if (local_state._sink_buffer) { + local_state._sink_buffer->set_should_stop(); + } } return final_st; } @@ -631,11 +635,14 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx( std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); - fmt::format_to(debug_string_buffer, - ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), " - "_reach_limit: {}", - _sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(), - _sink_buffer->_is_finishing.load(), _reach_limit.load()); + if (_sink_buffer) { + fmt::format_to( + debug_string_buffer, + ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), " + "_reach_limit: {}", + _sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(), + _sink_buffer->_is_finishing.load(), _reach_limit.load()); + } return fmt::to_string(debug_string_buffer); } From fa0b289b37031b71a9160cb21376c9a4ed81042b Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 27 Sep 2024 09:49:09 +0800 Subject: [PATCH 2/2] update --- be/src/pipeline/exec/exchange_sink_operator.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 300e2a5172f3d1..adf8a3424706d2 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -64,7 +64,9 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { std::vector dependencies() const override { std::vector dep_vec; - dep_vec.push_back(_queue_dependency.get()); + if (_queue_dependency) { + dep_vec.push_back(_queue_dependency.get()); + } if (_broadcast_dependency) { dep_vec.push_back(_broadcast_dependency.get()); }