From add28659b7a4345706038865b25ffdd3ffc03223 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 6 Aug 2024 09:59:35 +0800 Subject: [PATCH] [fix](pipeline) Fix mem control in local exchanger (#38885) ## Proposed changes If a block (>128M) is dequeue by local exchange source operator and it is the last block, both of source operators and sink operators will be hang. This PR fixed it. --- be/src/pipeline/pipeline_x/dependency.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index ba0d09620028c9..dfdb13b4414beb 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -788,13 +788,13 @@ struct LocalExchangeSharedState : public BasicSharedState { } void add_total_mem_usage(size_t delta) { - if (mem_usage.fetch_add(delta) > config::local_exchange_buffer_mem_limit) { + if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) { sink_deps.front()->block(); } } void sub_total_mem_usage(size_t delta) { - if (mem_usage.fetch_sub(delta) <= config::local_exchange_buffer_mem_limit) { + if (mem_usage.fetch_sub(delta) - delta <= config::local_exchange_buffer_mem_limit) { sink_deps.front()->set_ready(); } }