From 2c6206949d19da114460e2224fb6a79d51af652c Mon Sep 17 00:00:00 2001 From: Pxl Date: Tue, 15 Aug 2023 13:02:28 +0800 Subject: [PATCH] [Bug](exchange) init _instance_to_rpc_ctx on register_sink (#22976) init _instance_to_rpc_ctx on register_sink --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 7ff0f7b50319b9..bf43c3b860cc40 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -113,6 +113,7 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); _instance_to_sending_by_pipeline[low_id] = true; + _instance_to_rpc_ctx[low_id] = {}; _instance_to_receiver_eof[low_id] = false; _instance_to_rpc_time[low_id] = 0; _construct_request(low_id, finst_id); @@ -191,10 +192,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } auto* closure = request.channel->get_closure(id, request.eos, nullptr); - ExchangeRpcContext rpc_ctx; - rpc_ctx._closure = closure; - rpc_ctx.is_cancelled = false; - _instance_to_rpc_ctx[id] = rpc_ctx; + _instance_to_rpc_ctx[id]._closure = closure; + _instance_to_rpc_ctx[id].is_cancelled = false; closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); if (config::exchange_sink_ignore_eovercrowded) {