Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,16 @@ Status NodeChannel::open_wait() {

// add batch closure
_add_batch_closure = ReusableClosure<PTabletWriterAddBatchResult>::create();
_add_batch_closure->addFailedHandler([this]() {
_add_batch_closure->addFailedHandler([this](bool is_last_rpc) {
// If rpc failed, mark all tablets on this node channel as failed
_index_channel->mark_as_failed(this, _add_batch_closure->cntl.ErrorText(), -1);
_index_channel->mark_as_failed(this->node_id(), this->host(), _add_batch_closure->cntl.ErrorText(), -1);
Status st = _index_channel->check_intolerable_failure();
if (!st.ok()) {
_cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
} else if (is_last_rpc) {
// if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait
// will be blocked.
_add_batches_finished = true;
}
});

Expand All @@ -198,7 +202,7 @@ Status NodeChannel::open_wait() {
if (status.ok()) {
// if has error tablet, handle them first
for (auto& error : result.tablet_errors()) {
_index_channel->mark_as_failed(this, error.msg(), error.tablet_id());
_index_channel->mark_as_failed(this->node_id(), this->host(), error.msg(), error.tablet_id());
}

Status st = _index_channel->check_intolerable_failure();
Expand Down Expand Up @@ -493,7 +497,7 @@ void NodeChannel::try_send_batch() {
// eos request must be the last request
_add_batch_closure->end_mark();
_send_finished = true;
DCHECK(_pending_batches_num == 0);
CHECK(_pending_batches_num == 0) << _pending_batches_num;
}

if (_parent->_transfer_data_by_brpc_attachment && request.has_row_batch()) {
Expand Down Expand Up @@ -541,13 +545,15 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart
LOG(WARNING) << "unknown tablet, tablet_id=" << tablet.tablet_id;
return Status::InternalError("unknown tablet");
}
std::vector<NodeChannel*> channels;
std::vector<std::shared_ptr<NodeChannel>> channels;
for (auto& node_id : location->node_ids) {
NodeChannel* channel = nullptr;
std::shared_ptr<NodeChannel> channel;
auto it = _node_channels.find(node_id);
if (it == _node_channels.end()) {
channel =
_parent->_pool->add(new NodeChannel(_parent, this, node_id, _schema_hash));
// NodeChannel is not added to the _parent->_pool.
// Because the deconstruction of NodeChannel may take a long time to wait rpc finish.
// but the ObjectPool will hold a spin lock to delete objects.
channel = std::make_shared<NodeChannel>(_parent, this, node_id, _schema_hash);
_node_channels.emplace(node_id, channel);
} else {
channel = it->second;
Expand All @@ -571,7 +577,7 @@ void IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
// if this node channel is already failed, this add_row will be skipped
auto st = channel->add_row(tuple, tablet_id);
if (!st.ok()) {
mark_as_failed(channel, st.get_error_msg(), tablet_id);
mark_as_failed(channel->node_id(), channel->host(), st.get_error_msg(), tablet_id);
// continue add row to other node, the error will be checked for every batch outside
}
}
Expand All @@ -586,14 +592,14 @@ void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
// if this node channel is already failed, this add_row will be skipped
auto st = channel->add_row(block_row, tablet_id);
if (!st.ok()) {
mark_as_failed(channel, st.get_error_msg(), tablet_id);
mark_as_failed(channel->node_id(), channel->host(), st.get_error_msg(), tablet_id);
}
}
}

void IndexChannel::mark_as_failed(const NodeChannel* ch, const std::string& err,
void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err,
int64_t tablet_id) {
const auto& it = _tablets_by_channel.find(ch->node_id());
const auto& it = _tablets_by_channel.find(node_id);
if (it == _tablets_by_channel.end()) {
return;
}
Expand All @@ -602,16 +608,16 @@ void IndexChannel::mark_as_failed(const NodeChannel* ch, const std::string& err,
std::lock_guard<SpinLock> l(_fail_lock);
if (tablet_id == -1) {
for (const auto the_tablet_id : it->second) {
_failed_channels[the_tablet_id].insert(ch->node_id());
_failed_channels_msgs.emplace(the_tablet_id, err + ", host: " + ch->host());
_failed_channels[the_tablet_id].insert(node_id);
_failed_channels_msgs.emplace(the_tablet_id, err + ", host: " + host);
if (_failed_channels[the_tablet_id].size() >= ((_parent->_num_replicas + 1) / 2)) {
_intolerable_failure_status =
Status::InternalError(_failed_channels_msgs[the_tablet_id]);
}
}
} else {
_failed_channels[tablet_id].insert(ch->node_id());
_failed_channels_msgs.emplace(tablet_id, err + ", host: " + ch->host());
_failed_channels[tablet_id].insert(node_id);
_failed_channels_msgs.emplace(tablet_id, err + ", host: " + host);
if (_failed_channels[tablet_id].size() >= ((_parent->_num_replicas + 1) / 2)) {
_intolerable_failure_status =
Status::InternalError(_failed_channels_msgs[tablet_id]);
Expand Down Expand Up @@ -655,7 +661,7 @@ OlapTableSink::~OlapTableSink() {
// OlapTableSink::_mem_tracker and its parents.
// But their destructions are after OlapTableSink's.
for (auto index_channel : _channels) {
index_channel->for_each_node_channel([](NodeChannel* ch) { ch->clear_all_batches(); });
index_channel->for_each_node_channel([](const std::shared_ptr<NodeChannel>& ch) { ch->clear_all_batches(); });
}
}

Expand Down Expand Up @@ -809,17 +815,17 @@ Status OlapTableSink::open(RuntimeState* state) {
RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state));

for (auto index_channel : _channels) {
index_channel->for_each_node_channel([](NodeChannel* ch) { ch->open(); });
index_channel->for_each_node_channel([](const std::shared_ptr<NodeChannel>& ch) { ch->open(); });
}

for (auto index_channel : _channels) {
index_channel->for_each_node_channel([&index_channel](NodeChannel* ch) {
index_channel->for_each_node_channel([&index_channel](const std::shared_ptr<NodeChannel>& ch) {
auto st = ch->open_wait();
if (!st.ok()) {
// The open() phase is mainly to generate DeltaWriter instances on the nodes corresponding to each node channel.
// This phase will not fail due to a single tablet.
// Therefore, if the open() phase fails, all tablets corresponding to the node need to be marked as failed.
index_channel->mark_as_failed(ch,
index_channel->mark_as_failed(ch->node_id(), ch->host(),
fmt::format("{}, open failed, err: {}",
ch->channel_info(), st.get_error_msg()),
-1);
Expand Down Expand Up @@ -935,7 +941,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
{
SCOPED_TIMER(_close_timer);
for (auto index_channel : _channels) {
index_channel->for_each_node_channel([](NodeChannel* ch) { ch->mark_close(); });
index_channel->for_each_node_channel([](const std::shared_ptr<NodeChannel>& ch) { ch->mark_close(); });
num_node_channels += index_channel->num_node_channels();
}

Expand All @@ -945,10 +951,10 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
[&index_channel, &state, &node_add_batch_counter_map, &serialize_batch_ns,
&mem_exceeded_block_ns, &queue_push_lock_ns, &actual_consume_ns,
&total_add_batch_exec_time_ns, &add_batch_exec_time,
&total_add_batch_num](NodeChannel* ch) {
&total_add_batch_num](const std::shared_ptr<NodeChannel>& ch) {
auto s = ch->close_wait(state);
if (!s.ok()) {
index_channel->mark_as_failed(ch, s.get_error_msg(), -1);
index_channel->mark_as_failed(ch->node_id(), ch->host(), s.get_error_msg(), -1);
LOG(WARNING)
<< ch->channel_info()
<< ", close channel failed, err: " << s.get_error_msg();
Expand Down Expand Up @@ -1007,7 +1013,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
} else {
for (auto channel : _channels) {
channel->for_each_node_channel(
[&status](NodeChannel* ch) { ch->cancel(status.get_error_msg()); });
[&status](const std::shared_ptr<NodeChannel>& ch) { ch->cancel(status.get_error_msg()); });
}
}

Expand Down Expand Up @@ -1208,15 +1214,15 @@ void OlapTableSink::_send_batch_process() {
do {
int running_channels_num = 0;
for (auto index_channel : _channels) {
index_channel->for_each_node_channel([&running_channels_num, this](NodeChannel* ch) {
index_channel->for_each_node_channel([&running_channels_num, this](const std::shared_ptr<NodeChannel>& ch) {
running_channels_num +=
ch->try_send_and_fetch_status(this->_send_batch_thread_pool_token);
});
}

if (running_channels_num == 0) {
LOG(INFO) << "all node channels are stopped(maybe finished/offending/cancelled), "
"consumer thread exit.";
"sender thread exit. " << print_id(_load_id);
return;
}
} while (!_stop_background_threads_latch.wait_for(
Expand Down
14 changes: 7 additions & 7 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ReusableClosure : public google::protobuf::Closure {

static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }

void addFailedHandler(std::function<void()> fn) { failed_handler = fn; }
void addFailedHandler(std::function<void(bool)> fn) { failed_handler = fn; }
void addSuccessHandler(std::function<void(const T&, bool)> fn) { success_handler = fn; }

void join() {
Expand Down Expand Up @@ -126,7 +126,7 @@ class ReusableClosure : public google::protobuf::Closure {
if (cntl.Failed()) {
LOG(WARNING) << "failed to send brpc batch, error=" << berror(cntl.ErrorCode())
<< ", error_text=" << cntl.ErrorText();
failed_handler();
failed_handler(_is_last_rpc);
} else {
success_handler(result, _is_last_rpc);
}
Expand All @@ -140,7 +140,7 @@ class ReusableClosure : public google::protobuf::Closure {
brpc::CallId cid;
std::atomic<bool> _packet_in_flight {false};
std::atomic<bool> _is_last_rpc {false};
std::function<void()> failed_handler;
std::function<void(bool)> failed_handler;
std::function<void(const T&, bool)> success_handler;
};

Expand Down Expand Up @@ -288,13 +288,13 @@ class IndexChannel {

void add_row(BlockRow& block_row, int64_t tablet_id);

void for_each_node_channel(const std::function<void(NodeChannel*)>& func) {
void for_each_node_channel(const std::function<void(const std::shared_ptr<NodeChannel>&)>& func) {
for (auto& it : _node_channels) {
func(it.second);
}
}

void mark_as_failed(const NodeChannel* ch, const std::string& err, int64_t tablet_id = -1);
void mark_as_failed(int64_t node_id, const std::string& host, const std::string& err, int64_t tablet_id = -1);
Status check_intolerable_failure();

// set error tablet info in runtime state, so that it can be returned to FE.
Expand All @@ -310,9 +310,9 @@ class IndexChannel {
int32_t _schema_hash;

// BeId -> channel
std::unordered_map<int64_t, NodeChannel*> _node_channels;
std::unordered_map<int64_t, std::shared_ptr<NodeChannel>> _node_channels;
// from tablet_id to backend channel
std::unordered_map<int64_t, std::vector<NodeChannel*>> _channels_by_tablet;
std::unordered_map<int64_t, std::vector<std::shared_ptr<NodeChannel>>> _channels_by_tablet;
// from backend channel to tablet_id
std::unordered_map<int64_t, std::unordered_set<int64_t>> _tablets_by_channel;

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,8 @@ void TabletMeta::remove_delete_predicate_by_version(const Version& version) {
for (const auto& it : temp.sub_predicates()) {
del_cond_str += it + ";";
}
LOG(INFO) << "remove one del_pred. version=" << temp.version()
<< ", condition=" << del_cond_str;
VLOG_NOTICE << "remove one del_pred. version=" << temp.version()
<< ", condition=" << del_cond_str;

// remove delete condition from PB
_del_pred_array.SwapElements(ordinal, _del_pred_array.size() - 1);
Expand Down
12 changes: 10 additions & 2 deletions be/src/runtime/row_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, s
// row_tuples
_row_desc.to_protobuf(output_batch->mutable_row_tuples());
// tuple_offsets: must clear before reserve
// TODO(cmy): the tuple_offsets should be removed after v1.1.0, use new_tuple_offsets instead.
// keep tuple_offsets here is just for compatibility.
output_batch->clear_tuple_offsets();
output_batch->mutable_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row);
output_batch->clear_new_tuple_offsets();
output_batch->mutable_new_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row);
// is_compressed
Expand All @@ -279,7 +282,8 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, s
int64_t offset = 0; // current offset into output_batch->tuple_data
char* tuple_data = mutable_tuple_data->data();
const auto& tuple_descs = _row_desc.tuple_descriptors();
const auto& mutable_tuple_offsets = output_batch->mutable_new_tuple_offsets();
const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets();
const auto& mutable_new_tuple_offsets = output_batch->mutable_new_tuple_offsets();

for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
Expand All @@ -288,10 +292,12 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, s
if (row->get_tuple(j) == nullptr) {
// NULLs are encoded as -1
mutable_tuple_offsets->Add(-1);
mutable_new_tuple_offsets->Add(-1);
continue;
}
// Record offset before creating copy (which increments offset and tuple_data)
mutable_tuple_offsets->Add(offset);
mutable_tuple_offsets->Add((int32_t) offset);
mutable_new_tuple_offsets->Add(offset);
row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
CHECK_LE(offset, size);
}
Expand Down Expand Up @@ -519,7 +525,9 @@ vectorized::Block RowBatch::convert_to_vec_block() const {
size_t RowBatch::get_batch_size(const PRowBatch& batch) {
size_t result = batch.tuple_data().size();
result += batch.row_tuples().size() * sizeof(int32_t);
// TODO(cmy): remove batch.tuple_offsets
result += batch.tuple_offsets().size() * sizeof(int32_t);
result += batch.new_tuple_offsets().size() * sizeof(int64_t);
return result;
}

Expand Down
4 changes: 2 additions & 2 deletions docs/en/administrator-guide/config/fe_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -2096,7 +2096,7 @@ When there are a large number of replicas waiting to be balanced or repaired in

### repair_slow_replica

Default: true
Default: false

IsMutable:true

Expand Down Expand Up @@ -2153,4 +2153,4 @@ Dynamically configured: true

Only for Master FE: true

The data size threshold used to judge whether replica is too large
The data size threshold used to judge whether replica is too large
2 changes: 1 addition & 1 deletion docs/zh-CN/administrator-guide/config/fe_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -2116,7 +2116,7 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清

### repair_slow_replica

默认值:true
默认值:false

是否可以动态配置:true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,7 @@ public class Config extends ConfigBase {
* auto set the slowest compaction replica's status to bad
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean repair_slow_replica = true;
public static boolean repair_slow_replica = false;

/*
* The relocation of a colocation group may involve a large number of tablets moving within the cluster.
Expand Down
Loading