diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 1753991fe52386..c2e9ed8b0c5605 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -129,7 +130,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { finst_id.set_lo(fragment_instance_id.lo); _rpc_channel_is_idle[low_id] = true; _instance_to_receiver_eof[low_id] = false; - _instance_to_rpc_time[low_id] = 0; + _instance_to_rpc_stats_vec.emplace_back(std::make_shared(low_id)); + _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get(); _construct_request(low_id, finst_id); } @@ -258,7 +260,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - set_rpc_time(id, start_rpc_time, result.receive_time()); + + auto end_rpc_time = GetCurrentTimeNanos(); + update_rpc_time(id, start_rpc_time, end_rpc_time); + Status s(Status::create(result.status())); if (s.is()) { _set_receiver_eof(id); @@ -335,7 +340,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - set_rpc_time(id, start_rpc_time, result.receive_time()); + + auto end_rpc_time = GetCurrentTimeNanos(); + update_rpc_time(id, start_rpc_time, end_rpc_time); + Status s(Status::create(result.status())); if (s.is()) { _set_receiver_eof(id); @@ -450,10 +458,10 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) { int64_t local_max_time = 0; int64_t local_min_time = INT64_MAX; - for (auto& [id, time] : _instance_to_rpc_time) { - if (time != 0) { - local_max_time = std::max(local_max_time, time); - local_min_time = std::min(local_min_time, time); + for (auto& [id, stats] : _instance_to_rpc_stats) { + if (stats->sum_time != 0) { + local_max_time = std::max(local_max_time, stats->sum_time); + local_min_time = std::min(local_min_time, stats->sum_time); } } *max_time = local_max_time; @@ -462,27 +470,32 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_ti int64_t ExchangeSinkBuffer::get_sum_rpc_time() { int64_t sum_time = 0; - for (auto& [id, time] : _instance_to_rpc_time) { - sum_time += time; + for (auto& [id, stats] : _instance_to_rpc_stats) { + sum_time += stats->sum_time; } return sum_time; } -void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time, - int64_t receive_rpc_time) { +void ExchangeSinkBuffer::update_rpc_time(InstanceLoId id, int64_t start_rpc_time, + int64_t receive_rpc_time) { _rpc_count++; int64_t rpc_spend_time = receive_rpc_time - start_rpc_time; - DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end()); + DCHECK(_instance_to_rpc_stats.find(id) != _instance_to_rpc_stats.end()); if (rpc_spend_time > 0) { - _instance_to_rpc_time[id] += rpc_spend_time; + ++_instance_to_rpc_stats[id]->rpc_count; + _instance_to_rpc_stats[id]->sum_time += rpc_spend_time; + _instance_to_rpc_stats[id]->max_time = + std::max(_instance_to_rpc_stats[id]->max_time, rpc_spend_time); + _instance_to_rpc_stats[id]->min_time = + std::min(_instance_to_rpc_stats[id]->min_time, rpc_spend_time); } } void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { - auto* _max_rpc_timer = ADD_TIMER(profile, "RpcMaxTime"); + auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1); auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime"); auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime"); - auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT); + auto* _count_rpc = ADD_COUNTER_WITH_LEVEL(profile, "RpcCount", TUnit::UNIT, 1); auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime"); int64_t max_rpc_time = 0, min_rpc_time = 0; @@ -494,6 +507,38 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { int64_t sum_time = get_sum_rpc_time(); _sum_rpc_timer->set(sum_time); _avg_rpc_timer->set(sum_time / std::max(static_cast(1), _rpc_count.load())); + + auto max_count = _state->rpc_verbose_profile_max_instance_count(); + if (_state->enable_verbose_profile() && max_count > 0) { + std::vector tmp_rpc_stats_vec; + for (const auto& stats : _instance_to_rpc_stats_vec) { + tmp_rpc_stats_vec.emplace_back(*stats); + } + pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(), + [](const auto& a, const auto& b) { return a.max_time > b.max_time; }); + auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size()); + int i = 0; + auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true); + for (const auto& stats : tmp_rpc_stats_vec) { + if (0 == stats.rpc_count) { + continue; + } + std::stringstream out; + out << "Instance " << std::hex << stats.inst_lo_id; + auto stats_str = fmt::format( + "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}", + stats.rpc_count, PrettyPrinter::print(stats.max_time, TUnit::TIME_NS), + PrettyPrinter::print(stats.min_time, TUnit::TIME_NS), + PrettyPrinter::print( + stats.sum_time / std::max(static_cast(1), stats.rpc_count), + TUnit::TIME_NS), + PrettyPrinter::print(stats.sum_time, TUnit::TIME_NS)); + detail_profile->add_info_string(out.str(), stats_str); + if (++i == count) { + break; + } + } + } } } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 13692532a335a4..22a1452f8d545c 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -180,7 +180,7 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { Status add_block(TransmitInfo&& request); Status add_block(BroadcastTransmitInfo&& request); void close(); - void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); + void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); void set_dependency(std::shared_ptr queue_dependency, @@ -215,7 +215,16 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { phmap::flat_hash_map _rpc_channel_is_idle; phmap::flat_hash_map _instance_to_receiver_eof; - phmap::flat_hash_map _instance_to_rpc_time; + struct RpcInstanceStatistics { + RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {} + InstanceLoId inst_lo_id; + int64_t rpc_count = 0; + int64_t max_time = 0; + int64_t min_time = INT64_MAX; + int64_t sum_time = 0; + }; + std::vector> _instance_to_rpc_stats_vec; + phmap::flat_hash_map _instance_to_rpc_stats; std::atomic _is_finishing; PUniqueId _query_id; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index deafb361c5702a..f2fbbfa38f96df 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -81,6 +81,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { get_data_from_recvr_timer = ADD_TIMER(_runtime_profile, "GetDataFromRecvrTime"); filter_timer = ADD_TIMER(_runtime_profile, "FilterTime"); create_merger_timer = ADD_TIMER(_runtime_profile, "CreateMergerTime"); + _runtime_profile->add_info_string("InstanceID", print_id(state->fragment_instance_id())); return Status::OK(); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index fcbe8519ef17bc..0d2c03e5c90ac4 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -472,6 +472,17 @@ class RuntimeState { _query_options.enable_scan_node_run_serial; } + bool enable_verbose_profile() const { + return enable_profile() && _query_options.__isset.enable_verbose_profile && + _query_options.enable_verbose_profile; + } + + int rpc_verbose_profile_max_instance_count() const { + return _query_options.__isset.rpc_verbose_profile_max_instance_count + ? _query_options.rpc_verbose_profile_max_instance_count + : 0; + } + bool enable_share_hash_table_for_broadcast_join() const { return _query_options.__isset.enable_share_hash_table_for_broadcast_join && _query_options.enable_share_hash_table_for_broadcast_join; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 3ac50c9d7b53a1..8c939ec1827152 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -95,6 +95,8 @@ public class SessionVariable implements Serializable, Writable { public static final String MAX_EXECUTION_TIME = "max_execution_time"; public static final String INSERT_TIMEOUT = "insert_timeout"; public static final String ENABLE_PROFILE = "enable_profile"; + public static final String ENABLE_VERBOSE_PROFILE = "enable_verbose_profile"; + public static final String RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT = "rpc_verbose_profile_max_instance_count"; public static final String AUTO_PROFILE_THRESHOLD_MS = "auto_profile_threshold_ms"; public static final String SQL_MODE = "sql_mode"; public static final String WORKLOAD_VARIABLE = "workload_group"; @@ -789,6 +791,12 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true) public boolean enableProfile = false; + @VariableMgr.VarAttr(name = ENABLE_VERBOSE_PROFILE, needForward = true) + public boolean enableVerboseProfile = false; + + @VariableMgr.VarAttr(name = RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT, needForward = true) + public int rpcVerboseProfileMaxInstanceCount = 5; + // When enable_profile is true, profile of queries that costs more than autoProfileThresholdMs // will be stored to disk. @VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true) @@ -3816,6 +3824,8 @@ public TQueryOptions toThrift() { tResult.setQueryTimeout(queryTimeoutS); tResult.setEnableProfile(enableProfile); + tResult.setEnableVerboseProfile(enableVerboseProfile); + tResult.setRpcVerboseProfileMaxInstanceCount(rpcVerboseProfileMaxInstanceCount); if (enableProfile) { // If enable profile == true, then also set report success to true // be need report success to start report thread. But it is very tricky