diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 1e257e1f86ff6e..9cedfaa683555f 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -431,11 +431,19 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::queue>& q = _instance_to_package_queue[id]; for (; !q.empty(); q.pop()) { + // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, + // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked + _total_queue_size--; if (q.front().block) { COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); } } + // Try to wake up pipeline after clearing the queue + if (_queue_dependency && _total_queue_size <= _queue_capacity) { + _queue_dependency->set_ready(); + } + { std::queue> empty; swap(empty, q);