Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/io/fs/multi_table_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para

{
std::lock_guard<std::mutex> l(_tablet_commit_infos_lock);
auto commit_infos = state->tablet_commit_infos();
_tablet_commit_infos.insert(_tablet_commit_infos.end(),
state->tablet_commit_infos().begin(),
state->tablet_commit_infos().end());
commit_infos.begin(), commit_infos.end());
}
_number_total_rows += state->num_rows_load_total();
_number_loaded_rows += state->num_rows_load_success();
Expand Down
27 changes: 9 additions & 18 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,35 +530,26 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
}
}
}
if (!req.runtime_state->tablet_commit_infos().empty()) {
if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
params.__isset.commitInfos = true;
params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size());
for (auto& info : req.runtime_state->tablet_commit_infos()) {
params.commitInfos.push_back(info);
}
params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->tablet_commit_infos().empty()) {
if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
params.__isset.commitInfos = true;
params.commitInfos.insert(params.commitInfos.end(),
rs->tablet_commit_infos().begin(),
rs->tablet_commit_infos().end());
params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
}
}
}
if (!req.runtime_state->error_tablet_infos().empty()) {
if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size());
for (auto& info : req.runtime_state->error_tablet_infos()) {
params.errorTabletInfos.push_back(info);
}
params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->error_tablet_infos().empty()) {
if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.insert(params.errorTabletInfos.end(),
rs->error_tablet_infos().begin(),
rs->error_tablet_infos().end());
params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
rs_eti.end());
}
}
}
Expand Down
27 changes: 21 additions & 6 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,19 +433,33 @@ class RuntimeState {
return _query_options.partitioned_hash_agg_rows_threshold;
}

const std::vector<TTabletCommitInfo>& tablet_commit_infos() const {
std::vector<TTabletCommitInfo> tablet_commit_infos() const {
std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
return _tablet_commit_infos;
}

std::vector<TTabletCommitInfo>& tablet_commit_infos() { return _tablet_commit_infos; }
void add_tablet_commit_infos(std::vector<TTabletCommitInfo>& commit_infos) {
std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
_tablet_commit_infos.insert(_tablet_commit_infos.end(),
std::make_move_iterator(commit_infos.begin()),
std::make_move_iterator(commit_infos.end()));
}

std::vector<THivePartitionUpdate>& hive_partition_updates() { return _hive_partition_updates; }
std::vector<TErrorTabletInfo> error_tablet_infos() const {
std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
return _error_tablet_infos;
}

std::vector<TIcebergCommitData>& iceberg_commit_datas() { return _iceberg_commit_datas; }
void add_error_tablet_infos(std::vector<TErrorTabletInfo>& tablet_infos) {
std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
_error_tablet_infos.insert(_error_tablet_infos.end(),
std::make_move_iterator(tablet_infos.begin()),
std::make_move_iterator(tablet_infos.end()));
}

const std::vector<TErrorTabletInfo>& error_tablet_infos() const { return _error_tablet_infos; }
std::vector<THivePartitionUpdate>& hive_partition_updates() { return _hive_partition_updates; }

std::vector<TErrorTabletInfo>& error_tablet_infos() { return _error_tablet_infos; }
std::vector<TIcebergCommitData>& iceberg_commit_datas() { return _iceberg_commit_datas; }

// local runtime filter mgr, the runtime filter do not have remote target or
// not need local merge should regist here. the instance exec finish, the local
Expand Down Expand Up @@ -712,6 +726,7 @@ class RuntimeState {
int64_t _error_row_number;
std::string _error_log_file_path;
std::unique_ptr<std::ofstream> _error_log_file; // error file path, absolute path
mutable std::mutex _tablet_infos_mutex;
std::vector<TTabletCommitInfo> _tablet_commit_infos;
std::vector<TErrorTabletInfo> _error_tablet_infos;
int _max_operator_id = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
ctx->txn_id = state->wal_id();
}
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
ctx->commit_infos = std::move(state->tablet_commit_infos());
ctx->commit_infos = state->tablet_commit_infos();
ctx->number_total_rows = state->num_rows_load_total();
ctx->number_loaded_rows = state->num_rows_load_success();
ctx->number_filtered_rows = state->num_rows_load_filtered();
Expand Down
21 changes: 11 additions & 10 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,18 @@ Status IndexChannel::check_intolerable_failure() {
}

void IndexChannel::set_error_tablet_in_state(RuntimeState* state) {
std::vector<TErrorTabletInfo>& error_tablet_infos = state->error_tablet_infos();
std::vector<TErrorTabletInfo> error_tablet_infos;

std::lock_guard<doris::SpinLock> l(_fail_lock);
for (const auto& it : _failed_channels_msgs) {
TErrorTabletInfo error_info;
error_info.__set_tabletId(it.first);
error_info.__set_msg(it.second);
error_tablet_infos.emplace_back(error_info);
{
std::lock_guard<doris::SpinLock> l(_fail_lock);
for (const auto& it : _failed_channels_msgs) {
TErrorTabletInfo error_info;
error_info.__set_tabletId(it.first);
error_info.__set_msg(it.second);
error_tablet_infos.emplace_back(error_info);
}
}
state->add_error_tablet_infos(error_tablet_infos);
}

void IndexChannel::set_tablets_received_rows(
Expand Down Expand Up @@ -967,9 +970,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {

if (_add_batches_finished) {
_close_check();
state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
std::make_move_iterator(_tablet_commit_infos.begin()),
std::make_move_iterator(_tablet_commit_infos.end()));
_state->add_tablet_commit_infos(_tablet_commit_infos);

_index_channel->set_error_tablet_in_state(state);
_index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id);
Expand Down
5 changes: 1 addition & 4 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,10 +672,7 @@ Status VTabletWriterV2::close(Status exec_status) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
RETURN_IF_ERROR(
_create_commit_info(tablet_commit_infos, _load_stream_map, _num_replicas));
_state->tablet_commit_infos().insert(
_state->tablet_commit_infos().end(),
std::make_move_iterator(tablet_commit_infos.begin()),
std::make_move_iterator(tablet_commit_infos.end()));
_state->add_tablet_commit_infos(tablet_commit_infos);
}

// _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
Expand Down
Loading