From 56fbd8d6f5e1b89ccfc44d0c849399aab9125f66 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Tue, 15 Aug 2023 10:27:02 +0800 Subject: [PATCH] init _instance_to_rpc_ctx on register_sink --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 7ff0f7b50319b9..bf43c3b860cc40 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -113,6 +113,7 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); _instance_to_sending_by_pipeline[low_id] = true; + _instance_to_rpc_ctx[low_id] = {}; _instance_to_receiver_eof[low_id] = false; _instance_to_rpc_time[low_id] = 0; _construct_request(low_id, finst_id); @@ -191,10 +192,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } auto* closure = request.channel->get_closure(id, request.eos, nullptr); - ExchangeRpcContext rpc_ctx; - rpc_ctx._closure = closure; - rpc_ctx.is_cancelled = false; - _instance_to_rpc_ctx[id] = rpc_ctx; + _instance_to_rpc_ctx[id]._closure = closure; + _instance_to_rpc_ctx[id].is_cancelled = false; closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); if (config::exchange_sink_ignore_eovercrowded) {