From d94f66d56ae31298eeecc0ee19c0fe76aadae73e Mon Sep 17 00:00:00 2001 From: Mryange Date: Fri, 24 Jan 2025 05:04:20 +0800 Subject: [PATCH] [fix](exchange)fix exchange sink buffer does not update total_queue_size when EOF. (#47312) https://github.com/apache/doris/pull/41602 EOF clears _instance_to_package_queue but does not update total_queue_size, causing incorrect judgments that rely on total_queue_size. UT ``` mock transmit_blockv2 dest ins id :1 mock transmit_blockv2 dest ins id :2 mock transmit_blockv2 dest ins id :3 queue size : 6 each queue size : Instance: 2, queue size: 2 Instance: 1, queue size: 2 Instance: 3, queue size: 2 queue size : 6 // error size each queue size : Instance: 2, queue size: 0 Instance: 1, queue size: 2 Instance: 3, queue size: 2 mock transmit_blockv2 dest ins id :1 mock transmit_blockv2 dest ins id :1 mock transmit_blockv2 dest ins id :3 mock transmit_blockv2 dest ins id :3 ``` --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 24 ++++++++++- be/src/pipeline/exec/exchange_sink_buffer.h | 3 ++ .../pipeline/exec/exchange_sink_operator.cpp | 13 +++--- be/test/vec/exec/exchange_sink_test.cpp | 40 +++++++++++++++++++ 4 files changed, 73 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 3fb460c9cc7408..a776025c67611a 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -42,6 +42,7 @@ #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "service/backend_options.h" +#include "util/defer_op.h" #include "util/proto_util.h" #include "util/time.h" #include "vec/sink/vdata_stream_sender.h" @@ -445,7 +446,8 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { // When the receiving side reaches eof, it means the receiver has finished early. // The remaining data in the current rpc_channel does not need to be sent, // and the rpc_channel should be turned off immediately. - _turn_off_channel(id, lock); + Defer turn_off([&]() { _turn_off_channel(id, lock); }); + std::queue>& broadcast_q = _instance_to_broadcast_package_queue[id]; for (; !broadcast_q.empty(); broadcast_q.pop()) { @@ -461,12 +463,22 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::queue>& q = _instance_to_package_queue[id]; for (; !q.empty(); q.pop()) { + // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, + // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked + _total_queue_size--; if (q.front().block) { COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); } } + // Try to wake up pipeline after clearing the queue + if (_total_queue_size <= _queue_capacity) { + for (auto& [_, dep] : _queue_deps) { + dep->set_ready(); + } + } + { std::queue> empty; swap(empty, q); @@ -578,6 +590,16 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { } } +std::string ExchangeSinkBuffer::debug_each_instance_queue_size() { + fmt::memory_buffer debug_string_buffer; + for (auto& [id, m] : _instance_to_package_queue_mutex) { + std::unique_lock lock(*m); + fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, + _instance_to_package_queue[id].size()); + } + return fmt::to_string(debug_string_buffer); +} + } // namespace pipeline #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 899a2991110353..51698c118cd600 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -250,6 +250,7 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { } void set_low_memory_mode() { _queue_capacity = 8; } + std::string debug_each_instance_queue_size(); #ifdef BE_TEST public: #else @@ -319,6 +320,8 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); int64_t get_sum_rpc_time(); + // _total_queue_size is the sum of the sizes of all instance_to_package_queues. + // Any modification to instance_to_package_queue requires a corresponding modification to _total_queue_size. std::atomic _total_queue_size = 0; // _running_sink_count is used to track how many sinks have not finished yet. diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 573f4aa840ec56..c6ac3b80d880f0 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -507,12 +507,13 @@ std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); if (_sink_buffer) { - fmt::format_to(debug_string_buffer, - ", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: " - "{}, queue dep: {}), _reach_limit: {}, working channels: {}", - _sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size, - _sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), - _reach_limit.load(), _working_channels_count.load()); + fmt::format_to( + debug_string_buffer, + ", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: " + "{}, queue dep: {}), _reach_limit: {}, working channels: {} , each queue size: {}", + _sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size, + _sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), _reach_limit.load(), + _working_channels_count.load(), _sink_buffer->debug_each_instance_queue_size()); } return fmt::to_string(debug_string_buffer); } diff --git a/be/test/vec/exec/exchange_sink_test.cpp b/be/test/vec/exec/exchange_sink_test.cpp index 9576ed71ee2ada..7dbd352bd3a23f 100644 --- a/be/test/vec/exec/exchange_sink_test.cpp +++ b/be/test/vec/exec/exchange_sink_test.cpp @@ -193,4 +193,44 @@ TEST_F(ExchangeSInkTest, test_error_end) { } } +TEST_F(ExchangeSInkTest, test_queue_size) { + { + auto state = create_runtime_state(); + auto buffer = create_buffer(state); + + auto sink1 = create_sink(state, buffer); + + EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK()); + + EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK()); + + EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK()); + + std::cout << "queue size : " << buffer->_total_queue_size << "\n"; + + EXPECT_EQ(buffer->_total_queue_size, 6); + + std::cout << "each queue size : \n" << buffer->debug_each_instance_queue_size() << "\n"; + + pop_block(dest_ins_id_2, PopState::eof); + + std::cout << "queue size : " << buffer->_total_queue_size << "\n"; + + EXPECT_EQ(buffer->_total_queue_size, 4); + + std::cout << "each queue size : \n" << buffer->debug_each_instance_queue_size() << "\n"; + + EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], false); + EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], true); + EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false); + clear_all_done(); + } +} + } // namespace doris::vectorized