From 536a9ee2e609c979b58656769551d6d36e78a03e Mon Sep 17 00:00:00 2001 From: liaoxin Date: Tue, 6 Feb 2024 17:11:15 +0800 Subject: [PATCH] [Fix](load) fix load channel leak when load exception occurs (#30915) --- be/src/vec/sink/vtablet_sink.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 606aded9d350ba..56e1829202c287 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -292,6 +292,8 @@ void VNodeChannel::clear_all_blocks() { _cur_mutable_block.reset(); } +// we don't need to send tablet_writer_cancel rpc request when +// init failed, so set _is_closed to true. // if "_cancelled" is set to true, // no need to set _cancel_msg because the error will be // returned directly via "TabletSink::prepare()" method. @@ -302,6 +304,7 @@ Status VNodeChannel::init(RuntimeState* state) { auto node = _parent->_nodes_info->find_node(_node_id); if (node == nullptr) { _cancelled = true; + _is_closed = true; return Status::InternalError("unknown node id, id={}", _node_id); } @@ -317,6 +320,7 @@ Status VNodeChannel::init(RuntimeState* state) { _node_info.brpc_port); if (_stub == nullptr) { _cancelled = true; + _is_closed = true; return Status::InternalError("Get rpc stub failed, host={}, port={}, info={}", _node_info.host, _node_info.brpc_port, channel_info()); } @@ -831,8 +835,10 @@ void VNodeChannel::try_send_block(RuntimeState* state) { _next_packet_seq++; } +// When _cancelled is true, we still need to send a tablet_writer_cancel +// rpc request to truly release the load channel void VNodeChannel::cancel(const std::string& cancel_msg) { - if (_is_closed || _cancelled) { + if (_is_closed) { // skip the channels that have been canceled or close_wait. return; }