Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 60 additions & 15 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <pdqsort.h>
#include <stddef.h>

#include <atomic>
Expand Down Expand Up @@ -129,7 +130,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
finst_id.set_lo(fragment_instance_id.lo);
_rpc_channel_is_idle[low_id] = true;
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_time[low_id] = 0;
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
_instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
_construct_request(low_id, finst_id);
}

Expand Down Expand Up @@ -258,7 +260,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
set_rpc_time(id, start_rpc_time, result.receive_time());

auto end_rpc_time = GetCurrentTimeNanos();
update_rpc_time(id, start_rpc_time, end_rpc_time);

Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
_set_receiver_eof(id);
Expand Down Expand Up @@ -335,7 +340,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
set_rpc_time(id, start_rpc_time, result.receive_time());

auto end_rpc_time = GetCurrentTimeNanos();
update_rpc_time(id, start_rpc_time, end_rpc_time);

Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
_set_receiver_eof(id);
Expand Down Expand Up @@ -450,10 +458,10 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id,
void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) {
int64_t local_max_time = 0;
int64_t local_min_time = INT64_MAX;
for (auto& [id, time] : _instance_to_rpc_time) {
if (time != 0) {
local_max_time = std::max(local_max_time, time);
local_min_time = std::min(local_min_time, time);
for (auto& [id, stats] : _instance_to_rpc_stats) {
if (stats->sum_time != 0) {
local_max_time = std::max(local_max_time, stats->sum_time);
local_min_time = std::min(local_min_time, stats->sum_time);
}
}
*max_time = local_max_time;
Expand All @@ -462,27 +470,32 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_ti

int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
int64_t sum_time = 0;
for (auto& [id, time] : _instance_to_rpc_time) {
sum_time += time;
for (auto& [id, stats] : _instance_to_rpc_stats) {
sum_time += stats->sum_time;
}
return sum_time;
}

void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time,
int64_t receive_rpc_time) {
void ExchangeSinkBuffer::update_rpc_time(InstanceLoId id, int64_t start_rpc_time,
int64_t receive_rpc_time) {
_rpc_count++;
int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end());
DCHECK(_instance_to_rpc_stats.find(id) != _instance_to_rpc_stats.end());
if (rpc_spend_time > 0) {
_instance_to_rpc_time[id] += rpc_spend_time;
++_instance_to_rpc_stats[id]->rpc_count;
_instance_to_rpc_stats[id]->sum_time += rpc_spend_time;
_instance_to_rpc_stats[id]->max_time =
std::max(_instance_to_rpc_stats[id]->max_time, rpc_spend_time);
_instance_to_rpc_stats[id]->min_time =
std::min(_instance_to_rpc_stats[id]->min_time, rpc_spend_time);
}
}

void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
auto* _max_rpc_timer = ADD_TIMER(profile, "RpcMaxTime");
auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1);
auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT);
auto* _count_rpc = ADD_COUNTER_WITH_LEVEL(profile, "RpcCount", TUnit::UNIT, 1);
auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime");

int64_t max_rpc_time = 0, min_rpc_time = 0;
Expand All @@ -494,6 +507,38 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
int64_t sum_time = get_sum_rpc_time();
_sum_rpc_timer->set(sum_time);
_avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load()));

auto max_count = _state->rpc_verbose_profile_max_instance_count();
if (_state->enable_verbose_profile() && max_count > 0) {
std::vector<RpcInstanceStatistics> tmp_rpc_stats_vec;
for (const auto& stats : _instance_to_rpc_stats_vec) {
tmp_rpc_stats_vec.emplace_back(*stats);
}
pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(),
[](const auto& a, const auto& b) { return a.max_time > b.max_time; });
auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size());
int i = 0;
auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true);
for (const auto& stats : tmp_rpc_stats_vec) {
if (0 == stats.rpc_count) {
continue;
}
std::stringstream out;
out << "Instance " << std::hex << stats.inst_lo_id;
auto stats_str = fmt::format(
"Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}",
stats.rpc_count, PrettyPrinter::print(stats.max_time, TUnit::TIME_NS),
PrettyPrinter::print(stats.min_time, TUnit::TIME_NS),
PrettyPrinter::print(
stats.sum_time / std::max(static_cast<int64_t>(1), stats.rpc_count),
TUnit::TIME_NS),
PrettyPrinter::print(stats.sum_time, TUnit::TIME_NS));
detail_profile->add_info_string(out.str(), stats_str);
if (++i == count) {
break;
}
}
}
}

} // namespace pipeline
Expand Down
13 changes: 11 additions & 2 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
Status add_block(TransmitInfo&& request);
Status add_block(BroadcastTransmitInfo&& request);
void close();
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time);
void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time);
void update_profile(RuntimeProfile* profile);

void set_dependency(std::shared_ptr<Dependency> queue_dependency,
Expand Down Expand Up @@ -215,7 +215,16 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;

phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
struct RpcInstanceStatistics {
RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {}
InstanceLoId inst_lo_id;
int64_t rpc_count = 0;
int64_t max_time = 0;
int64_t min_time = INT64_MAX;
int64_t sum_time = 0;
};
std::vector<std::shared_ptr<RpcInstanceStatistics>> _instance_to_rpc_stats_vec;
phmap::flat_hash_map<InstanceLoId, RpcInstanceStatistics*> _instance_to_rpc_stats;

std::atomic<bool> _is_finishing;
PUniqueId _query_id;
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
get_data_from_recvr_timer = ADD_TIMER(_runtime_profile, "GetDataFromRecvrTime");
filter_timer = ADD_TIMER(_runtime_profile, "FilterTime");
create_merger_timer = ADD_TIMER(_runtime_profile, "CreateMergerTime");
_runtime_profile->add_info_string("InstanceID", print_id(state->fragment_instance_id()));

return Status::OK();
}
Expand Down
11 changes: 11 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,17 @@ class RuntimeState {
_query_options.enable_scan_node_run_serial;
}

bool enable_verbose_profile() const {
return enable_profile() && _query_options.__isset.enable_verbose_profile &&
_query_options.enable_verbose_profile;
}

int rpc_verbose_profile_max_instance_count() const {
return _query_options.__isset.rpc_verbose_profile_max_instance_count
? _query_options.rpc_verbose_profile_max_instance_count
: 0;
}

bool enable_share_hash_table_for_broadcast_join() const {
return _query_options.__isset.enable_share_hash_table_for_broadcast_join &&
_query_options.enable_share_hash_table_for_broadcast_join;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String MAX_EXECUTION_TIME = "max_execution_time";
public static final String INSERT_TIMEOUT = "insert_timeout";
public static final String ENABLE_PROFILE = "enable_profile";
public static final String ENABLE_VERBOSE_PROFILE = "enable_verbose_profile";
public static final String RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT = "rpc_verbose_profile_max_instance_count";
public static final String AUTO_PROFILE_THRESHOLD_MS = "auto_profile_threshold_ms";
public static final String SQL_MODE = "sql_mode";
public static final String WORKLOAD_VARIABLE = "workload_group";
Expand Down Expand Up @@ -789,6 +791,12 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
public boolean enableProfile = false;

@VariableMgr.VarAttr(name = ENABLE_VERBOSE_PROFILE, needForward = true)
public boolean enableVerboseProfile = false;

@VariableMgr.VarAttr(name = RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT, needForward = true)
public int rpcVerboseProfileMaxInstanceCount = 5;

// When enable_profile is true, profile of queries that costs more than autoProfileThresholdMs
// will be stored to disk.
@VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true)
Expand Down Expand Up @@ -3816,6 +3824,8 @@ public TQueryOptions toThrift() {

tResult.setQueryTimeout(queryTimeoutS);
tResult.setEnableProfile(enableProfile);
tResult.setEnableVerboseProfile(enableVerboseProfile);
tResult.setRpcVerboseProfileMaxInstanceCount(rpcVerboseProfileMaxInstanceCount);
if (enableProfile) {
// If enable profile == true, then also set report success to true
// be need report success to start report thread. But it is very tricky
Expand Down
Loading