Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion be/src/vec/sink/vtablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ void VNodeChannel::clear_all_blocks() {
_cur_mutable_block.reset();
}

// we don't need to send tablet_writer_cancel rpc request when
// init failed, so set _is_closed to true.
// if "_cancelled" is set to true,
// no need to set _cancel_msg because the error will be
// returned directly via "TabletSink::prepare()" method.
Expand All @@ -302,6 +304,7 @@ Status VNodeChannel::init(RuntimeState* state) {
auto node = _parent->_nodes_info->find_node(_node_id);
if (node == nullptr) {
_cancelled = true;
_is_closed = true;
return Status::InternalError("unknown node id, id={}", _node_id);
}

Expand All @@ -317,6 +320,7 @@ Status VNodeChannel::init(RuntimeState* state) {
_node_info.brpc_port);
if (_stub == nullptr) {
_cancelled = true;
_is_closed = true;
return Status::InternalError("Get rpc stub failed, host={}, port={}, info={}",
_node_info.host, _node_info.brpc_port, channel_info());
}
Expand Down Expand Up @@ -831,8 +835,10 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
_next_packet_seq++;
}

// When _cancelled is true, we still need to send a tablet_writer_cancel
// rpc request to truly release the load channel
void VNodeChannel::cancel(const std::string& cancel_msg) {
if (_is_closed || _cancelled) {
if (_is_closed) {
// skip the channels that have been canceled or close_wait.
return;
}
Expand Down