From 0c52ad6c2e6a3e2e9b84d8051f5bf162247df4af Mon Sep 17 00:00:00 2001 From: plat1ko Date: Mon, 24 Jun 2024 14:28:22 +0800 Subject: [PATCH] Fix data loss when node channel been cancelled before close wait --- be/src/vec/sink/writer/vtablet_writer.cpp | 39 ++++++++++++----------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 3aa9c7992166b2..f385575e7c0a1c 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1312,22 +1312,22 @@ Status VTabletWriter::_incremental_open_node_channel( return Status::OK(); } -static Status cancel_channel_and_check_intolerable_failure( - Status status, const std::string& err_msg, const std::shared_ptr ich, - const std::shared_ptr nch) { - LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << err_msg; - ich->mark_as_failed(nch.get(), err_msg, -1); +static Status cancel_channel_and_check_intolerable_failure(Status status, + const std::string& err_msg, + IndexChannel& ich, VNodeChannel& nch) { + LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " << err_msg; + ich.mark_as_failed(&nch, err_msg, -1); // cancel the node channel in best effort - nch->cancel(err_msg); + nch.cancel(err_msg); // check if index has intolerable failure - Status index_st = ich->check_intolerable_failure(); + Status index_st = ich.check_intolerable_failure(); if (!index_st.ok()) { - status = index_st; - } else if (Status st = ich->check_tablet_received_rows_consistency(); !st.ok()) { - status = st; - } else if (Status st = ich->check_tablet_filtered_rows_consistency(); !st.ok()) { - status = st; + status = std::move(index_st); + } else if (Status st = ich.check_tablet_received_rows_consistency(); !st.ok()) { + status = std::move(st); + } else if (Status st = ich.check_tablet_filtered_rows_consistency(); !st.ok()) { + status = std::move(st); } return status; } @@ -1403,7 +1403,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status ch->mark_close(true); if (ch->is_cancelled()) { status = cancel_channel_and_check_intolerable_failure( - status, ch->get_cancel_msg(), index_channel, ch); + std::move(status), ch->get_cancel_msg(), *index_channel, + *ch); } }); if (!status.ok()) { @@ -1419,7 +1420,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status << "close1 wait finished!"; if (!s.ok()) { status = cancel_channel_and_check_intolerable_failure( - status, s.to_string(), index_channel, ch); + std::move(status), s.to_string(), *index_channel, *ch); } }); if (!status.ok()) { @@ -1437,7 +1438,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status ch->mark_close(); if (ch->is_cancelled()) { status = cancel_channel_and_check_intolerable_failure( - status, ch->get_cancel_msg(), index_channel, ch); + std::move(status), ch->get_cancel_msg(), *index_channel, + *ch); } }); } else { // not has_incremental_node_channel @@ -1451,7 +1453,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status ch->mark_close(); if (ch->is_cancelled()) { status = cancel_channel_and_check_intolerable_failure( - status, ch->get_cancel_msg(), index_channel, ch); + std::move(status), ch->get_cancel_msg(), *index_channel, + *ch); } }); } @@ -1507,7 +1510,7 @@ Status VTabletWriter::close(Status exec_status) { &total_add_batch_exec_time_ns, &add_batch_exec_time, &total_wait_exec_time_ns, &wait_exec_time, &total_add_batch_num](const std::shared_ptr& ch) { - if (!status.ok() || ch->is_closed()) { + if (!status.ok() || (ch->is_closed() && !ch->is_cancelled())) { return; } // in pipeline, all node channels are done or canceled, will not block. @@ -1515,7 +1518,7 @@ Status VTabletWriter::close(Status exec_status) { auto s = ch->close_wait(_state); if (!s.ok()) { status = cancel_channel_and_check_intolerable_failure( - status, s.to_string(), index_channel, ch); + std::move(status), s.to_string(), *index_channel, *ch); } ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, &channel_stat, &queue_push_lock_ns, &actual_consume_ns,