Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 56 additions & 13 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <pdqsort.h>
#include <stddef.h>

#include <atomic>
Expand Down Expand Up @@ -176,7 +177,8 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId fragment_instance_id) {
_rpc_channel_is_idle[low_id] = true;
_instance_to_rpc_ctx[low_id] = {};
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_time[low_id] = 0;
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
_instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
_construct_request(low_id, finst_id);
}

Expand Down Expand Up @@ -298,7 +300,10 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
set_rpc_time(id, start_rpc_time, result.receive_time());
if (_state->enable_verbose_profile()) {
auto end_rpc_time = GetCurrentTimeNanos();
update_rpc_time(id, start_rpc_time, end_rpc_time);
}
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
_set_receiver_eof(id);
Expand Down Expand Up @@ -376,7 +381,10 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
set_rpc_time(id, start_rpc_time, result.receive_time());
if (_state->enable_verbose_profile()) {
auto end_rpc_time = GetCurrentTimeNanos();
update_rpc_time(id, start_rpc_time, end_rpc_time);
}
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
_set_receiver_eof(id);
Expand Down Expand Up @@ -491,10 +499,10 @@ template <typename Parent>
void ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) {
int64_t local_max_time = 0;
int64_t local_min_time = INT64_MAX;
for (auto& [id, time] : _instance_to_rpc_time) {
if (time != 0) {
local_max_time = std::max(local_max_time, time);
local_min_time = std::min(local_min_time, time);
for (auto& [id, stats] : _instance_to_rpc_stats) {
if (stats->sum_time != 0) {
local_max_time = std::max(local_max_time, stats->sum_time);
local_min_time = std::min(local_min_time, stats->sum_time);
}
}
*max_time = local_max_time;
Expand All @@ -504,20 +512,25 @@ void ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time, int64_t
template <typename Parent>
int64_t ExchangeSinkBuffer<Parent>::get_sum_rpc_time() {
int64_t sum_time = 0;
for (auto& [id, time] : _instance_to_rpc_time) {
sum_time += time;
for (auto& [id, stats] : _instance_to_rpc_stats) {
sum_time += stats->sum_time;
}
return sum_time;
}

template <typename Parent>
void ExchangeSinkBuffer<Parent>::set_rpc_time(InstanceLoId id, int64_t start_rpc_time,
int64_t receive_rpc_time) {
void ExchangeSinkBuffer<Parent>::update_rpc_time(InstanceLoId id, int64_t start_rpc_time,
int64_t receive_rpc_time) {
_rpc_count++;
int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end());
DCHECK(_instance_to_rpc_stats.find(id) != _instance_to_rpc_stats.end());
if (rpc_spend_time > 0) {
_instance_to_rpc_time[id] += rpc_spend_time;
++_instance_to_rpc_stats[id]->rpc_count;
_instance_to_rpc_stats[id]->sum_time += rpc_spend_time;
_instance_to_rpc_stats[id]->max_time =
std::max(_instance_to_rpc_stats[id]->max_time, rpc_spend_time);
_instance_to_rpc_stats[id]->min_time =
std::min(_instance_to_rpc_stats[id]->min_time, rpc_spend_time);
}
}

Expand All @@ -538,6 +551,36 @@ void ExchangeSinkBuffer<Parent>::update_profile(RuntimeProfile* profile) {
int64_t sum_time = get_sum_rpc_time();
_sum_rpc_timer->set(sum_time);
_avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load()));

if constexpr (std::is_same_v<ExchangeSinkLocalState, Parent>) {
auto max_count = _state->rpc_verbose_profile_max_instance_count();
if (_state->enable_verbose_profile() && max_count > 0) {
pdqsort(_instance_to_rpc_stats_vec.begin(), _instance_to_rpc_stats_vec.end(),
[](const auto& a, const auto& b) { return a->max_time > b->max_time; });
auto count = std::min((size_t)max_count, _instance_to_rpc_stats_vec.size());
int i = 0;
auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个profile 对象得检查下是否为nullptr

for (const auto& stats : _instance_to_rpc_stats_vec) {
if (0 == stats->rpc_count) {
continue;
}
std::stringstream out;
out << "Instance " << std::hex << stats->inst_lo_id;
auto stats_str = fmt::format(
"Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}",
stats->rpc_count, PrettyPrinter::print(stats->max_time, TUnit::TIME_NS),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CloseTime: avg 11.310us, max 11.310us, min 11.310us
- ExecTime: avg 150.158us, max 150.158us, min 150.158us
- InitTime: avg 72.193us, max 72.193us, min 72.193us

PrettyPrinter::print(stats->min_time, TUnit::TIME_NS),
PrettyPrinter::print(stats->sum_time / std::max(static_cast<int64_t>(1),
stats->rpc_count),
TUnit::TIME_NS),
PrettyPrinter::print(stats->sum_time, TUnit::TIME_NS));
detail_profile->add_info_string(out.str(), stats_str);
if (++i == count) {
break;
}
}
}
}
}

template class ExchangeSinkBuffer<vectorized::VDataStreamSender>;
Expand Down
14 changes: 12 additions & 2 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <stdint.h>

#include <atomic>
#include <cstdint>
#include <list>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -209,7 +210,7 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {
bool can_write() const;
bool is_pending_finish();
void close();
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time);
void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time);
void update_profile(RuntimeProfile* profile);

void set_dependency(std::shared_ptr<Dependency> queue_dependency,
Expand Down Expand Up @@ -252,7 +253,16 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {
// Number of busy channels;
std::atomic<int> _busy_channels = 0;
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
struct RpcInstanceStatistics {
RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {}
InstanceLoId inst_lo_id;
int64_t rpc_count = 0;
int64_t max_time = 0;
int64_t min_time = INT64_MAX;
int64_t sum_time = 0;
};
std::vector<std::shared_ptr<RpcInstanceStatistics>> _instance_to_rpc_stats_vec;
phmap::flat_hash_map<InstanceLoId, RpcInstanceStatistics*> _instance_to_rpc_stats;
phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext> _instance_to_rpc_ctx;

std::atomic<bool> _is_finishing;
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
TUnit ::TIME_NS, timer_name, 1);
}

_runtime_profile->add_info_string("InstanceID", print_id(state->fragment_instance_id()));

return Status::OK();
}

Expand Down
11 changes: 11 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,17 @@ class RuntimeState {
return _query_options.__isset.enable_profile && _query_options.enable_profile;
}

bool enable_verbose_profile() const {
return enable_profile() && _query_options.__isset.enable_verbose_profile &&
_query_options.enable_verbose_profile;
}

int rpc_verbose_profile_max_instance_count() const {
return _query_options.__isset.rpc_verbose_profile_max_instance_count
? _query_options.rpc_verbose_profile_max_instance_count
: 0;
}

bool enable_scan_node_run_serial() const {
return _query_options.__isset.enable_scan_node_run_serial &&
_query_options.enable_scan_node_run_serial;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ private void addScanRangeLocations(Partition partition,
int useFixReplica = -1;
boolean needCheckTags = false;
boolean skipMissingVersion = false;
Set<Long> userSetBackendBlacklist = null;
if (ConnectContext.get() != null) {
allowedTags = ConnectContext.get().getResourceTags();
needCheckTags = ConnectContext.get().isResourceTagsSet();
Expand All @@ -751,6 +752,7 @@ private void addScanRangeLocations(Partition partition,
LOG.debug("query id: {}, partition id:{} visibleVersion: {}",
DebugUtil.printId(ConnectContext.get().queryId()), partition.getId(), visibleVersion);
}
userSetBackendBlacklist = ConnectContext.get().getSessionVariable().getQueryBackendBlacklist();
}
for (Tablet tablet : tablets) {
long tabletId = tablet.getId();
Expand Down Expand Up @@ -866,6 +868,16 @@ private void addScanRangeLocations(Partition partition,
+ " does not exist or not alive");
continue;
}
if (userSetBackendBlacklist != null && userSetBackendBlacklist.contains(backend.getId())) {
if (LOG.isDebugEnabled()) {
LOG.debug("backend {} is in the blacklist that user set in session variable {}",
replica.getBackendId(), replica.getId());
}
String err = "replica " + replica.getId() + "'s backend " + replica.getBackendId()
+ " in the blacklist that user set in session variable";
errs.add(err);
continue;
}
if (!backend.isMixNode()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String MAX_EXECUTION_TIME = "max_execution_time";
public static final String INSERT_TIMEOUT = "insert_timeout";
public static final String ENABLE_PROFILE = "enable_profile";
public static final String ENABLE_VERBOSE_PROFILE = "enable_verbose_profile";
public static final String RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT = "rpc_verbose_profile_max_instance_count";
public static final String AUTO_PROFILE_THRESHOLD_MS = "auto_profile_threshold_ms";
public static final String SQL_MODE = "sql_mode";
public static final String WORKLOAD_VARIABLE = "workload_group";
Expand Down Expand Up @@ -415,6 +417,8 @@ public class SessionVariable implements Serializable, Writable {
// fix replica to query. If num = 1, query the smallest replica, if 2 is the second smallest replica.
public static final String USE_FIX_REPLICA = "use_fix_replica";

public static final String QUERY_BACKEND_BLACKLIST = "query_backend_blacklist";

public static final String DRY_RUN_QUERY = "dry_run_query";

// Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3.
Expand Down Expand Up @@ -705,6 +709,12 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
public boolean enableProfile = false;

@VariableMgr.VarAttr(name = ENABLE_VERBOSE_PROFILE, needForward = true)
public boolean enableVerboseProfile = true;

@VariableMgr.VarAttr(name = RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT, needForward = true)
public int rpcVerboseProfileMaxInstanceCount = 5;

// if true, need report to coordinator when plan fragment execute successfully.
@VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true)
public int autoProfileThresholdMs = -1;
Expand Down Expand Up @@ -1475,6 +1485,13 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
@VariableMgr.VarAttr(name = USE_FIX_REPLICA)
public int useFixReplica = -1;

// This is a debug feature, when we find a backend is not stable(for example network reasons)
// we could use this variable to exclude it from query plan. It is only used for query. Not for
// load jobs.
// Use could set multiple backendids using , to split like "10111,10112"
@VariableMgr.VarAttr(name = QUERY_BACKEND_BLACKLIST, needForward = true)
public String queryBackendBlacklist = "";

@VariableMgr.VarAttr(name = DUMP_NEREIDS_MEMO)
public boolean dumpNereidsMemo = false;

Expand Down Expand Up @@ -3386,6 +3403,8 @@ public TQueryOptions toThrift() {

tResult.setQueryTimeout(queryTimeoutS);
tResult.setEnableProfile(enableProfile);
tResult.setEnableVerboseProfile(enableVerboseProfile);
tResult.setRpcVerboseProfileMaxInstanceCount(rpcVerboseProfileMaxInstanceCount);
if (enableProfile) {
// If enable profile == true, then also set report success to true
// be need report success to start report thread. But it is very tricky
Expand Down Expand Up @@ -4050,6 +4069,21 @@ public void setIgnoreStorageDataDistribution(boolean ignoreStorageDataDistributi
this.ignoreStorageDataDistribution = ignoreStorageDataDistribution;
}

// If anything wrong during parsing, just throw exception to forbidden the query
// so there is not many exception handling logic here.
public Set<Long> getQueryBackendBlacklist() {
Set<Long> blacklist = Sets.newHashSet();
if (Strings.isNullOrEmpty(queryBackendBlacklist)) {
return blacklist;
}
String[] backendIds = this.queryBackendBlacklist.trim().split(",");
for (int i = 0; i < backendIds.length; ++i) {
long backendId = Long.parseLong(backendIds[i].trim());
blacklist.add(backendId);
}
return blacklist;
}

public boolean isForceJniScanner() {
return forceJniScanner;
}
Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ struct TQueryOptions {

127: optional i32 runtime_bloom_filter_max_size = 16777216;

128: optional bool enable_verbose_profile = false;
129: optional i32 rpc_verbose_profile_max_instance_count = 0;

// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
}
Expand Down