From 1e730e293c9928b7ebb3fea6d1ebd3eb16d2f696 Mon Sep 17 00:00:00 2001 From: Mryange Date: Wed, 22 Jan 2025 18:40:49 +0800 Subject: [PATCH 1/2] fix --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 1e257e1f86ff6e..9869ea4b3d2692 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -431,11 +431,16 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::queue>& q = _instance_to_package_queue[id]; for (; !q.empty(); q.pop()) { + _total_queue_size--; if (q.front().block) { COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); } } + if (_queue_dependency && _total_queue_size <= _queue_capacity) { + _queue_dependency->set_ready(); + } + { std::queue> empty; swap(empty, q); From c4183905300367bccd0113145e1c5ced73ed45b2 Mon Sep 17 00:00:00 2001 From: Mryange Date: Thu, 23 Jan 2025 19:39:53 +0800 Subject: [PATCH 2/2] upd --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 9869ea4b3d2692..9cedfaa683555f 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -431,12 +431,15 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::queue>& q = _instance_to_package_queue[id]; for (; !q.empty(); q.pop()) { + // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, + // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked _total_queue_size--; if (q.front().block) { COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); } } + // Try to wake up pipeline after clearing the queue if (_queue_dependency && _total_queue_size <= _queue_capacity) { _queue_dependency->set_ready(); }