Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
334 changes: 163 additions & 171 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,208 +325,200 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {

DCHECK(req.runtime_state != nullptr);

if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && req.status.ok()) {
// this is a load plan, and load is not finished, just make a brief report
if (req.runtime_state->query_type() == TQueryType::LOAD) {
params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
} else {
if (req.runtime_state->query_type() == TQueryType::LOAD) {
params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
}
params.__isset.detailed_report = true;
DCHECK(!req.runtime_states.empty());
const bool enable_profile = (*req.runtime_states.begin())->enable_profile();
if (enable_profile) {
params.__isset.profile = true;
params.__isset.loadChannelProfile = false;
for (auto* rs : req.runtime_states) {
DCHECK(req.load_channel_profile);
TDetailedReportParams detailed_param;
rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile);
// merge all runtime_states.loadChannelProfile to req.load_channel_profile
req.load_channel_profile->update(detailed_param.loadChannelProfile);
}
req.load_channel_profile->to_thrift(&params.loadChannelProfile);
} else {
params.__isset.profile = false;
}
params.__isset.detailed_report = true;
DCHECK(!req.runtime_states.empty());
const bool enable_profile = (*req.runtime_states.begin())->enable_profile();
if (enable_profile) {
params.__isset.profile = true;
params.__isset.loadChannelProfile = false;
for (auto* rs : req.runtime_states) {
DCHECK(req.load_channel_profile);
TDetailedReportParams detailed_param;
rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile);
// merge all runtime_states.loadChannelProfile to req.load_channel_profile
req.load_channel_profile->update(detailed_param.loadChannelProfile);
}
req.load_channel_profile->to_thrift(&params.loadChannelProfile);
} else {
params.__isset.profile = false;
}

if (enable_profile) {
DCHECK(req.profile != nullptr);
if (enable_profile) {
DCHECK(req.profile != nullptr);
TDetailedReportParams detailed_param;
detailed_param.__isset.fragment_instance_id = false;
detailed_param.__isset.profile = true;
detailed_param.__isset.loadChannelProfile = false;
detailed_param.__set_is_fragment_level(true);
req.profile->to_thrift(&detailed_param.profile);
params.detailed_report.push_back(detailed_param);
for (auto pipeline_profile : req.runtime_state->pipeline_id_to_profile()) {
TDetailedReportParams detailed_param;
detailed_param.__isset.fragment_instance_id = false;
detailed_param.__isset.profile = true;
detailed_param.__isset.loadChannelProfile = false;
detailed_param.__set_is_fragment_level(true);
req.profile->to_thrift(&detailed_param.profile);
params.detailed_report.push_back(detailed_param);
for (auto pipeline_profile : req.runtime_state->pipeline_id_to_profile()) {
TDetailedReportParams detailed_param;
detailed_param.__isset.fragment_instance_id = false;
detailed_param.__isset.profile = true;
detailed_param.__isset.loadChannelProfile = false;
pipeline_profile->to_thrift(&detailed_param.profile);
params.detailed_report.push_back(std::move(detailed_param));
}
pipeline_profile->to_thrift(&detailed_param.profile);
params.detailed_report.push_back(std::move(detailed_param));
}
if (!req.runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
for (auto& it : req.runtime_state->output_files()) {
}
if (!req.runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
for (auto& it : req.runtime_state->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
for (auto& it : rs->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
for (auto& it : rs->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
}
if (!params.delta_urls.empty()) {
params.__isset.delta_urls = true;
}
}
if (!params.delta_urls.empty()) {
params.__isset.delta_urls = true;
}
}

// load rows
static std::string s_dpp_normal_all = "dpp.norm.ALL";
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
static std::string s_unselected_rows = "unselected.rows";
int64_t num_rows_load_success = 0;
int64_t num_rows_load_filtered = 0;
int64_t num_rows_load_unselected = 0;
if (req.runtime_state->num_rows_load_total() > 0 ||
req.runtime_state->num_rows_load_filtered() > 0 ||
req.runtime_state->num_finished_range() > 0) {
params.__isset.load_counters = true;

num_rows_load_success = req.runtime_state->num_rows_load_success();
num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
params.__isset.fragment_instance_reports = true;
TFragmentInstanceReport t;
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
t.__set_num_finished_range(req.runtime_state->num_finished_range());
params.fragment_instance_reports.push_back(t);
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
req.runtime_state->num_finished_range() > 0) {
params.__isset.load_counters = true;
num_rows_load_success += rs->num_rows_load_success();
num_rows_load_filtered += rs->num_rows_load_filtered();
num_rows_load_unselected += rs->num_rows_load_unselected();
params.__isset.fragment_instance_reports = true;
TFragmentInstanceReport t;
t.__set_fragment_instance_id(rs->fragment_instance_id());
t.__set_num_finished_range(rs->num_finished_range());
params.fragment_instance_reports.push_back(t);
}
// load rows
static std::string s_dpp_normal_all = "dpp.norm.ALL";
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
static std::string s_unselected_rows = "unselected.rows";
int64_t num_rows_load_success = 0;
int64_t num_rows_load_filtered = 0;
int64_t num_rows_load_unselected = 0;
if (req.runtime_state->num_rows_load_total() > 0 ||
req.runtime_state->num_rows_load_filtered() > 0 ||
req.runtime_state->num_finished_range() > 0) {
params.__isset.load_counters = true;

num_rows_load_success = req.runtime_state->num_rows_load_success();
num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
params.__isset.fragment_instance_reports = true;
TFragmentInstanceReport t;
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
t.__set_num_finished_range(req.runtime_state->num_finished_range());
params.fragment_instance_reports.push_back(t);
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
req.runtime_state->num_finished_range() > 0) {
params.__isset.load_counters = true;
num_rows_load_success += rs->num_rows_load_success();
num_rows_load_filtered += rs->num_rows_load_filtered();
num_rows_load_unselected += rs->num_rows_load_unselected();
params.__isset.fragment_instance_reports = true;
TFragmentInstanceReport t;
t.__set_fragment_instance_id(rs->fragment_instance_id());
t.__set_num_finished_range(rs->num_finished_range());
params.fragment_instance_reports.push_back(t);
}
}
params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));

if (!req.runtime_state->get_error_log_file_path().empty()) {
params.__set_tracking_url(
to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->get_error_log_file_path().empty()) {
params.__set_tracking_url(
to_load_error_http_path(rs->get_error_log_file_path()));
}
if (rs->wal_id() > 0) {
params.__set_txn_id(rs->wal_id());
params.__set_label(rs->import_label());
}
}
params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));

if (!req.runtime_state->get_error_log_file_path().empty()) {
params.__set_tracking_url(
to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->get_error_log_file_path().empty()) {
params.__set_tracking_url(to_load_error_http_path(rs->get_error_log_file_path()));
}
}
if (!req.runtime_state->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files = req.runtime_state->export_output_files();
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files.insert(params.export_files.end(),
rs->export_output_files().begin(),
rs->export_output_files().end());
}
if (rs->wal_id() > 0) {
params.__set_txn_id(rs->wal_id());
params.__set_label(rs->import_label());
}
}
if (!req.runtime_state->tablet_commit_infos().empty()) {
params.__isset.commitInfos = true;
params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size());
for (auto& info : req.runtime_state->tablet_commit_infos()) {
params.commitInfos.push_back(info);
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->tablet_commit_infos().empty()) {
params.__isset.commitInfos = true;
params.commitInfos.insert(params.commitInfos.end(),
rs->tablet_commit_infos().begin(),
rs->tablet_commit_infos().end());
}
}
if (!req.runtime_state->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files = req.runtime_state->export_output_files();
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files.insert(params.export_files.end(),
rs->export_output_files().begin(),
rs->export_output_files().end());
}
}
if (!req.runtime_state->error_tablet_infos().empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size());
for (auto& info : req.runtime_state->error_tablet_infos()) {
params.errorTabletInfos.push_back(info);
}
if (!req.runtime_state->tablet_commit_infos().empty()) {
params.__isset.commitInfos = true;
params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size());
for (auto& info : req.runtime_state->tablet_commit_infos()) {
params.commitInfos.push_back(info);
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->tablet_commit_infos().empty()) {
params.__isset.commitInfos = true;
params.commitInfos.insert(params.commitInfos.end(),
rs->tablet_commit_infos().begin(),
rs->tablet_commit_infos().end());
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->error_tablet_infos().empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.insert(params.errorTabletInfos.end(),
rs->error_tablet_infos().begin(),
rs->error_tablet_infos().end());
}
}
}
if (!req.runtime_state->error_tablet_infos().empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size());
for (auto& info : req.runtime_state->error_tablet_infos()) {
params.errorTabletInfos.push_back(info);
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->error_tablet_infos().empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.insert(params.errorTabletInfos.end(),
rs->error_tablet_infos().begin(),
rs->error_tablet_infos().end());
}
}
}

if (!req.runtime_state->hive_partition_updates().empty()) {
params.__isset.hive_partition_updates = true;
params.hive_partition_updates.reserve(
req.runtime_state->hive_partition_updates().size());
for (auto& hive_partition_update : req.runtime_state->hive_partition_updates()) {
params.hive_partition_updates.push_back(hive_partition_update);
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->hive_partition_updates().empty()) {
params.__isset.hive_partition_updates = true;
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
rs->hive_partition_updates().begin(),
rs->hive_partition_updates().end());
}
if (!req.runtime_state->hive_partition_updates().empty()) {
params.__isset.hive_partition_updates = true;
params.hive_partition_updates.reserve(req.runtime_state->hive_partition_updates().size());
for (auto& hive_partition_update : req.runtime_state->hive_partition_updates()) {
params.hive_partition_updates.push_back(hive_partition_update);
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->hive_partition_updates().empty()) {
params.__isset.hive_partition_updates = true;
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
rs->hive_partition_updates().begin(),
rs->hive_partition_updates().end());
}
}
}

if (!req.runtime_state->iceberg_commit_datas().empty()) {
params.__isset.iceberg_commit_datas = true;
params.iceberg_commit_datas.reserve(req.runtime_state->iceberg_commit_datas().size());
for (auto& iceberg_commit_data : req.runtime_state->iceberg_commit_datas()) {
params.iceberg_commit_datas.push_back(iceberg_commit_data);
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->iceberg_commit_datas().empty()) {
params.__isset.iceberg_commit_datas = true;
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
rs->iceberg_commit_datas().begin(),
rs->iceberg_commit_datas().end());
}
if (!req.runtime_state->iceberg_commit_datas().empty()) {
params.__isset.iceberg_commit_datas = true;
params.iceberg_commit_datas.reserve(req.runtime_state->iceberg_commit_datas().size());
for (auto& iceberg_commit_data : req.runtime_state->iceberg_commit_datas()) {
params.iceberg_commit_datas.push_back(iceberg_commit_data);
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->iceberg_commit_datas().empty()) {
params.__isset.iceberg_commit_datas = true;
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
rs->iceberg_commit_datas().begin(),
rs->iceberg_commit_datas().end());
}
}

// Send new errors to coordinator
req.runtime_state->get_unreported_errors(&(params.error_log));
params.__isset.error_log = (params.error_log.size() > 0);
}

// Send new errors to coordinator
req.runtime_state->get_unreported_errors(&(params.error_log));
params.__isset.error_log = (!params.error_log.empty());

if (_exec_env->master_info()->__isset.backend_id) {
params.__set_backend_id(_exec_env->master_info()->backend_id);
}
Expand Down