Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ void register_suites() {
should_ret = true;
});
});
suite_map.emplace("test_cancel_node_channel", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("VNodeChannel::try_send_block", [](auto&& args) {
LOG(INFO) << "injection VNodeChannel::try_send_block";
auto* arg0 = try_any_cast<Status*>(args[0]);
*arg0 = Status::InternalError<false>("test_cancel_node_channel injection error");
});
sp->set_call_back("VOlapTableSink::close",
[](auto&&) { std::this_thread::sleep_for(std::chrono::seconds(5)); });
});
}

void set_sleep(const std::string& point, HttpRequest* req) {
Expand Down
42 changes: 24 additions & 18 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <vector>

#include "cloud/config.h"
#include "common/sync_point.h"
#include "util/runtime_profile.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
Expand Down Expand Up @@ -627,6 +628,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) {
&uncompressed_bytes, &compressed_bytes,
state->fragement_transmission_compression_type(),
_parent->_transfer_large_data_by_brpc);
TEST_INJECTION_POINT_CALLBACK("VNodeChannel::try_send_block", &st);
if (!st.ok()) {
cancel(fmt::format("{}, err: {}", channel_info(), st.to_string()));
_send_block_callback->clear_in_flight();
Expand Down Expand Up @@ -1325,22 +1327,22 @@ Status VTabletWriter::_incremental_open_node_channel(
return Status::OK();
}

static Status cancel_channel_and_check_intolerable_failure(
Status status, const std::string& err_msg, const std::shared_ptr<IndexChannel> ich,
const std::shared_ptr<VNodeChannel> nch) {
LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << err_msg;
ich->mark_as_failed(nch.get(), err_msg, -1);
static Status cancel_channel_and_check_intolerable_failure(Status status,
const std::string& err_msg,
IndexChannel& ich, VNodeChannel& nch) {
LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " << err_msg;
ich.mark_as_failed(&nch, err_msg, -1);
// cancel the node channel in best effort
nch->cancel(err_msg);
nch.cancel(err_msg);

// check if index has intolerable failure
Status index_st = ich->check_intolerable_failure();
Status index_st = ich.check_intolerable_failure();
if (!index_st.ok()) {
status = index_st;
} else if (Status st = ich->check_tablet_received_rows_consistency(); !st.ok()) {
status = st;
} else if (Status st = ich->check_tablet_filtered_rows_consistency(); !st.ok()) {
status = st;
status = std::move(index_st);
} else if (Status st = ich.check_tablet_received_rows_consistency(); !st.ok()) {
status = std::move(st);
} else if (Status st = ich.check_tablet_filtered_rows_consistency(); !st.ok()) {
status = std::move(st);
}
return status;
}
Expand Down Expand Up @@ -1416,7 +1418,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status
ch->mark_close(true);
if (ch->is_cancelled()) {
status = cancel_channel_and_check_intolerable_failure(
status, ch->get_cancel_msg(), index_channel, ch);
std::move(status), ch->get_cancel_msg(), *index_channel,
*ch);
}
});
if (!status.ok()) {
Expand All @@ -1432,7 +1435,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status
<< "close1 wait finished!";
if (!s.ok()) {
status = cancel_channel_and_check_intolerable_failure(
status, s.to_string(), index_channel, ch);
std::move(status), s.to_string(), *index_channel, *ch);
}
});
if (!status.ok()) {
Expand All @@ -1450,7 +1453,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status
ch->mark_close();
if (ch->is_cancelled()) {
status = cancel_channel_and_check_intolerable_failure(
status, ch->get_cancel_msg(), index_channel, ch);
std::move(status), ch->get_cancel_msg(), *index_channel,
*ch);
}
});
} else { // not has_incremental_node_channel
Expand All @@ -1464,7 +1468,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status
ch->mark_close();
if (ch->is_cancelled()) {
status = cancel_channel_and_check_intolerable_failure(
status, ch->get_cancel_msg(), index_channel, ch);
std::move(status), ch->get_cancel_msg(), *index_channel,
*ch);
}
});
}
Expand All @@ -1491,6 +1496,7 @@ Status VTabletWriter::close(Status exec_status) {

// will make the last batch of request-> close_wait will wait this finished.
_do_try_close(_state, exec_status);
TEST_INJECTION_POINT("VOlapTableSink::close");

// If _close_status is not ok, all nodes have been canceled in try_close.
if (_close_status.ok()) {
Expand Down Expand Up @@ -1520,15 +1526,15 @@ Status VTabletWriter::close(Status exec_status) {
&total_add_batch_exec_time_ns, &add_batch_exec_time, &total_wait_exec_time_ns,
&wait_exec_time,
&total_add_batch_num](const std::shared_ptr<VNodeChannel>& ch) {
if (!status.ok() || ch->is_closed()) {
if (!status.ok() || (ch->is_closed() && !ch->is_cancelled())) {
return;
}
// in pipeline, all node channels are done or canceled, will not block.
// no pipeline, close may block waiting.
auto s = ch->close_wait(_state);
if (!s.ok()) {
status = cancel_channel_and_check_intolerable_failure(
status, s.to_string(), index_channel, ch);
std::move(status), s.to_string(), *index_channel, *ch);
}
ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns,
&channel_stat, &queue_push_lock_ns, &actual_consume_ns,
Expand Down