Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ Status NodeChannel::init(RuntimeState* state) {

_rpc_timeout_ms = state->query_options().query_timeout * 1000;

_load_info = "load_id=" + print_id(_parent->_load_id) + ", txn_id" +
std::to_string(_parent->_txn_id);
_load_info = "load_id=" + print_id(_parent->_load_id) +
", txn_id=" + std::to_string(_parent->_txn_id);
_name = "NodeChannel[" + std::to_string(_index_id) + "-" + std::to_string(_node_id) + "]";
return Status::OK();
}
Expand Down Expand Up @@ -141,7 +141,7 @@ Status NodeChannel::open_wait() {
_add_batch_closure = ReusableClosure<PTabletWriterAddBatchResult>::create();
_add_batch_closure->addFailedHandler([this]() {
_cancelled = true;
LOG(WARNING) << "NodeChannel add batch req rpc failed, " << print_load_info()
LOG(WARNING) << name() << " add batch req rpc failed, " << print_load_info()
<< ", node=" << node_info()->host << ":" << node_info()->brpc_port;
});

Expand All @@ -160,7 +160,7 @@ Status NodeChannel::open_wait() {
}
} else {
_cancelled = true;
LOG(WARNING) << "NodeChannel add batch req success but status isn't ok, "
LOG(WARNING) << name() << " add batch req success but status isn't ok, "
<< print_load_info() << ", node=" << node_info()->host << ":"
<< node_info()->brpc_port << ", errmsg=" << status.get_error_msg();
}
Expand Down Expand Up @@ -248,13 +248,12 @@ Status NodeChannel::close_wait(RuntimeState* state) {
timer.stop();
VLOG(1) << name() << " close_wait cost: " << timer.elapsed_time() / 1000000 << " ms";

{
std::lock_guard<std::mutex> lg(_pending_batches_lock);
DCHECK(_pending_batches.empty());
DCHECK(_cur_batch == nullptr);
}

if (_add_batches_finished) {
{
std::lock_guard<std::mutex> lg(_pending_batches_lock);
CHECK(_pending_batches.empty()) << name();
CHECK(_cur_batch == nullptr) << name();
}
state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
std::make_move_iterator(_tablet_commit_infos.begin()),
std::make_move_iterator(_tablet_commit_infos.end()));
Expand All @@ -280,15 +279,6 @@ void NodeChannel::cancel() {
closure->cntl.set_timeout_ms(_rpc_timeout_ms);
_stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, closure);
request.release_id();

// Beware of the destruct sequence. RowBatches will use mem_trackers(include ancestors).
// Delete RowBatches here is a better choice to reduce the potential of dtor errors.
{
std::lock_guard<std::mutex> lg(_pending_batches_lock);
std::queue<AddBatchReq> empty;
std::swap(_pending_batches, empty);
_cur_batch.reset();
}
}

int NodeChannel::try_send_and_fetch_status() {
Expand Down Expand Up @@ -358,6 +348,13 @@ Status NodeChannel::none_of(std::initializer_list<bool> vars) {
return st;
}

void NodeChannel::clear_all_batches() {
std::lock_guard<std::mutex> lg(_pending_batches_lock);
std::queue<AddBatchReq> empty;
std::swap(_pending_batches, empty);
_cur_batch.reset();
}

IndexChannel::~IndexChannel() {}

Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets) {
Expand Down Expand Up @@ -419,7 +416,15 @@ OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
}
}

OlapTableSink::~OlapTableSink() {}
OlapTableSink::~OlapTableSink() {
// We clear NodeChannels' batches here, cuz NodeChannels' batches destruction will use
// OlapTableSink::_mem_tracker and its parents.
// But their destructions are after OlapTableSink's.
// TODO: can be remove after all MemTrackers become shared.
for (auto index_channel : _channels) {
index_channel->for_each_node_channel([](NodeChannel* ch) { ch->clear_all_batches(); });
}
}

Status OlapTableSink::init(const TDataSink& t_sink) {
DCHECK(t_sink.__isset.olap_table_sink);
Expand Down Expand Up @@ -575,7 +580,7 @@ Status OlapTableSink::open(RuntimeState* state) {
index_channel->for_each_node_channel([&index_channel](NodeChannel* ch) {
auto st = ch->open_wait();
if (!st.ok()) {
LOG(WARNING) << "tablet open failed, " << ch->print_load_info()
LOG(WARNING) << ch->name() << ": tablet open failed, " << ch->print_load_info()
<< ", node=" << ch->node_info()->host << ":"
<< ch->node_info()->brpc_port << ", errmsg=" << st.get_error_msg();
index_channel->mark_as_failed(ch);
Expand Down Expand Up @@ -661,8 +666,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
{
SCOPED_TIMER(_close_timer);
for (auto index_channel : _channels) {
index_channel->for_each_node_channel(
[](NodeChannel* ch) { WARN_IF_ERROR(ch->mark_close(), ""); });
index_channel->for_each_node_channel([](NodeChannel* ch) { ch->mark_close(); });
}

for (auto index_channel : _channels) {
Expand All @@ -672,7 +676,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
&actual_consume_ns](NodeChannel* ch) {
status = ch->close_wait(state);
if (!status.ok()) {
LOG(WARNING) << "close channel failed, " << ch->print_load_info();
LOG(WARNING)
<< ch->name() << ": close channel failed, " << ch->print_load_info()
<< ". error_msg=" << status.get_error_msg();
}
ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns,
&mem_exceeded_block_ns, &queue_push_lock_ns,
Expand Down Expand Up @@ -791,7 +797,8 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap*
SlotDescriptor* desc = _output_tuple_desc->slots()[i];
if (desc->is_nullable() && tuple->is_null(desc->null_indicator_offset())) {
if (desc->type().type == TYPE_OBJECT) {
ss << "null is not allowed for bitmap column, column_name: " << desc->col_name();
ss << "null is not allowed for bitmap column, column_name: "
<< desc->col_name();
row_valid = false;
}
continue;
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ class NodeChannel {

Status none_of(std::initializer_list<bool> vars);

void clear_all_batches();

private:
OlapTableSink* _parent = nullptr;
int64_t _index_id = -1;
Expand Down
35 changes: 21 additions & 14 deletions be/src/runtime/routine_load/routine_load_task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ class TRoutineLoadTask;
// to FE finally.
class RoutineLoadTaskExecutor {
public:
typedef std::function<void (StreamLoadContext*)> ExecFinishCallback;
typedef std::function<void(StreamLoadContext*)> ExecFinishCallback;

RoutineLoadTaskExecutor(ExecEnv* exec_env):
_exec_env(exec_env),
_thread_pool(config::routine_load_thread_pool_size, 1),
_data_consumer_pool(10) {
RoutineLoadTaskExecutor(ExecEnv* exec_env)
: _exec_env(exec_env),
_thread_pool(config::routine_load_thread_pool_size, 1),
_data_consumer_pool(10) {
REGISTER_GAUGE_DORIS_METRIC(routine_load_task_count, [this]() {
std::lock_guard<std::mutex> l(_lock);
return _task_map.size();
Expand All @@ -58,21 +58,28 @@ class RoutineLoadTaskExecutor {
~RoutineLoadTaskExecutor() {
_thread_pool.shutdown();
_thread_pool.join();

LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup";
for (auto it = _task_map.begin(); it != _task_map.end(); ++it) {
auto ctx = it->second;
if (ctx->unref()) {
delete ctx;
}
}
_task_map.clear();
}

// submit a routine load task
Status submit_task(const TRoutineLoadTask& task);

Status get_kafka_partition_meta(const PKafkaMetaProxyRequest& request, std::vector<int32_t>* partition_ids);

Status get_kafka_partition_meta(const PKafkaMetaProxyRequest& request,
std::vector<int32_t>* partition_ids);

private:
// execute the task
void exec_task(StreamLoadContext* ctx, DataConsumerPool* pool, ExecFinishCallback cb);

void err_handler(
StreamLoadContext* ctx,
const Status& st,
const std::string& err_msg);

void err_handler(StreamLoadContext* ctx, const Status& st, const std::string& err_msg);

// for test only
Status _execute_plan_for_test(StreamLoadContext* ctx);
Expand All @@ -87,4 +94,4 @@ class RoutineLoadTaskExecutor {
std::unordered_map<UniqueId, StreamLoadContext*> _task_map;
};

} // end namespace
} // namespace doris
12 changes: 11 additions & 1 deletion be/test/exec/tablet_sink_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,13 +784,15 @@ TEST_F(OlapTableSinkTest, add_batch_failed) {
_server->Start(4356, &options);
}

// ObjectPool create before RuntimeState, simulate actual situation better.
ObjectPool obj_pool;

TUniqueId fragment_id;
TQueryOptions query_options;
query_options.batch_size = 1;
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
state.init_mem_trackers(TUniqueId());

ObjectPool obj_pool;
TDescriptorTable tdesc_tbl;
auto t_data_sink = get_data_sink(&tdesc_tbl);

Expand Down Expand Up @@ -859,9 +861,17 @@ TEST_F(OlapTableSinkTest, add_batch_failed) {
memcpy(str_val->ptr, "abc", str_val->len);
batch.commit_last_row();
}

// Channels will be cancelled internally, coz brpc returns k_add_batch_status.
k_add_batch_status = Status::InternalError("dummy failed");
st = sink.send(&state, &batch);
ASSERT_TRUE(st.ok());

// Send batch multiple times, can make _cur_batch or _pending_batches(in channels) not empty.
// To ensure the order of releasing resource is OK.
sink.send(&state, &batch);
sink.send(&state, &batch);

// close
st = sink.close(&state, Status::OK());
ASSERT_FALSE(st.ok());
Expand Down
Loading