diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 39a6a59bd49f01..c4bb736c6f9323 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -176,7 +177,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { _rpc_channel_is_idle[low_id] = true; _instance_to_rpc_ctx[low_id] = {}; _instance_to_receiver_eof[low_id] = false; - _instance_to_rpc_time[low_id] = 0; + _instance_to_rpc_stats_vec.emplace_back(std::make_shared(low_id)); + _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get(); _construct_request(low_id, finst_id); } @@ -298,7 +300,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - set_rpc_time(id, start_rpc_time, result.receive_time()); + if (_state->enable_verbose_profile()) { + auto end_rpc_time = GetCurrentTimeNanos(); + update_rpc_time(id, start_rpc_time, end_rpc_time); + } Status s(Status::create(result.status())); if (s.is()) { _set_receiver_eof(id); @@ -376,7 +381,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - set_rpc_time(id, start_rpc_time, result.receive_time()); + if (_state->enable_verbose_profile()) { + auto end_rpc_time = GetCurrentTimeNanos(); + update_rpc_time(id, start_rpc_time, end_rpc_time); + } Status s(Status::create(result.status())); if (s.is()) { _set_receiver_eof(id); @@ -491,10 +499,10 @@ template void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) { int64_t local_max_time = 0; int64_t local_min_time = INT64_MAX; - for (auto& [id, time] : _instance_to_rpc_time) { - if (time != 0) { - local_max_time = std::max(local_max_time, time); - local_min_time = std::min(local_min_time, time); + for (auto& [id, stats] : _instance_to_rpc_stats) { + if (stats->sum_time != 0) { + local_max_time = std::max(local_max_time, stats->sum_time); + local_min_time = std::min(local_min_time, stats->sum_time); } } *max_time = local_max_time; @@ -504,20 +512,25 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t template int64_t ExchangeSinkBuffer::get_sum_rpc_time() { int64_t sum_time = 0; - for (auto& [id, time] : _instance_to_rpc_time) { - sum_time += time; + for (auto& [id, stats] : _instance_to_rpc_stats) { + sum_time += stats->sum_time; } return sum_time; } template -void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time, - int64_t receive_rpc_time) { +void ExchangeSinkBuffer::update_rpc_time(InstanceLoId id, int64_t start_rpc_time, + int64_t receive_rpc_time) { _rpc_count++; int64_t rpc_spend_time = receive_rpc_time - start_rpc_time; - DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end()); + DCHECK(_instance_to_rpc_stats.find(id) != _instance_to_rpc_stats.end()); if (rpc_spend_time > 0) { - _instance_to_rpc_time[id] += rpc_spend_time; + ++_instance_to_rpc_stats[id]->rpc_count; + _instance_to_rpc_stats[id]->sum_time += rpc_spend_time; + _instance_to_rpc_stats[id]->max_time = + std::max(_instance_to_rpc_stats[id]->max_time, rpc_spend_time); + _instance_to_rpc_stats[id]->min_time = + std::min(_instance_to_rpc_stats[id]->min_time, rpc_spend_time); } } @@ -538,6 +551,36 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { int64_t sum_time = get_sum_rpc_time(); _sum_rpc_timer->set(sum_time); _avg_rpc_timer->set(sum_time / std::max(static_cast(1), _rpc_count.load())); + + if constexpr (std::is_same_v) { + auto max_count = _state->rpc_verbose_profile_max_instance_count(); + if (_state->enable_verbose_profile() && max_count > 0) { + pdqsort(_instance_to_rpc_stats_vec.begin(), _instance_to_rpc_stats_vec.end(), + [](const auto& a, const auto& b) { return a->max_time > b->max_time; }); + auto count = std::min((size_t)max_count, _instance_to_rpc_stats_vec.size()); + int i = 0; + auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true); + for (const auto& stats : _instance_to_rpc_stats_vec) { + if (0 == stats->rpc_count) { + continue; + } + std::stringstream out; + out << "Instance " << std::hex << stats->inst_lo_id; + auto stats_str = fmt::format( + "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}", + stats->rpc_count, PrettyPrinter::print(stats->max_time, TUnit::TIME_NS), + PrettyPrinter::print(stats->min_time, TUnit::TIME_NS), + PrettyPrinter::print(stats->sum_time / std::max(static_cast(1), + stats->rpc_count), + TUnit::TIME_NS), + PrettyPrinter::print(stats->sum_time, TUnit::TIME_NS)); + detail_profile->add_info_string(out.str(), stats_str); + if (++i == count) { + break; + } + } + } + } } template class ExchangeSinkBuffer; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index cd5502ee6d0e2a..5ce6d75b149a48 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -209,7 +210,7 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { bool can_write() const; bool is_pending_finish(); void close(); - void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); + void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); void set_dependency(std::shared_ptr queue_dependency, @@ -252,7 +253,16 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { // Number of busy channels; std::atomic _busy_channels = 0; phmap::flat_hash_map _instance_to_receiver_eof; - phmap::flat_hash_map _instance_to_rpc_time; + struct RpcInstanceStatistics { + RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {} + InstanceLoId inst_lo_id; + int64_t rpc_count = 0; + int64_t max_time = 0; + int64_t min_time = INT64_MAX; + int64_t sum_time = 0; + }; + std::vector> _instance_to_rpc_stats_vec; + phmap::flat_hash_map _instance_to_rpc_stats; phmap::flat_hash_map _instance_to_rpc_ctx; std::atomic _is_finishing; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 3f3ab7368146bb..94cbe439a3e406 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -88,6 +88,8 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { TUnit ::TIME_NS, timer_name, 1); } + _runtime_profile->add_info_string("InstanceID", print_id(state->fragment_instance_id())); + return Status::OK(); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index b88b29ee8d0c0c..8b8cbd85f0f117 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -499,6 +499,17 @@ class RuntimeState { return _query_options.__isset.enable_profile && _query_options.enable_profile; } + bool enable_verbose_profile() const { + return enable_profile() && _query_options.__isset.enable_verbose_profile && + _query_options.enable_verbose_profile; + } + + int rpc_verbose_profile_max_instance_count() const { + return _query_options.__isset.rpc_verbose_profile_max_instance_count + ? _query_options.rpc_verbose_profile_max_instance_count + : 0; + } + bool enable_scan_node_run_serial() const { return _query_options.__isset.enable_scan_node_run_serial && _query_options.enable_scan_node_run_serial; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index d133d359b27d31..4ffa12e8f05b8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -741,6 +741,7 @@ private void addScanRangeLocations(Partition partition, int useFixReplica = -1; boolean needCheckTags = false; boolean skipMissingVersion = false; + Set userSetBackendBlacklist = null; if (ConnectContext.get() != null) { allowedTags = ConnectContext.get().getResourceTags(); needCheckTags = ConnectContext.get().isResourceTagsSet(); @@ -751,6 +752,7 @@ private void addScanRangeLocations(Partition partition, LOG.debug("query id: {}, partition id:{} visibleVersion: {}", DebugUtil.printId(ConnectContext.get().queryId()), partition.getId(), visibleVersion); } + userSetBackendBlacklist = ConnectContext.get().getSessionVariable().getQueryBackendBlacklist(); } for (Tablet tablet : tablets) { long tabletId = tablet.getId(); @@ -866,6 +868,16 @@ private void addScanRangeLocations(Partition partition, + " does not exist or not alive"); continue; } + if (userSetBackendBlacklist != null && userSetBackendBlacklist.contains(backend.getId())) { + if (LOG.isDebugEnabled()) { + LOG.debug("backend {} is in the blacklist that user set in session variable {}", + replica.getBackendId(), replica.getId()); + } + String err = "replica " + replica.getId() + "'s backend " + replica.getBackendId() + + " in the blacklist that user set in session variable"; + errs.add(err); + continue; + } if (!backend.isMixNode()) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 1b0c4bc6946be1..1a70e43cf77d43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -87,6 +87,8 @@ public class SessionVariable implements Serializable, Writable { public static final String MAX_EXECUTION_TIME = "max_execution_time"; public static final String INSERT_TIMEOUT = "insert_timeout"; public static final String ENABLE_PROFILE = "enable_profile"; + public static final String ENABLE_VERBOSE_PROFILE = "enable_verbose_profile"; + public static final String RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT = "rpc_verbose_profile_max_instance_count"; public static final String AUTO_PROFILE_THRESHOLD_MS = "auto_profile_threshold_ms"; public static final String SQL_MODE = "sql_mode"; public static final String WORKLOAD_VARIABLE = "workload_group"; @@ -415,6 +417,8 @@ public class SessionVariable implements Serializable, Writable { // fix replica to query. If num = 1, query the smallest replica, if 2 is the second smallest replica. public static final String USE_FIX_REPLICA = "use_fix_replica"; + public static final String QUERY_BACKEND_BLACKLIST = "query_backend_blacklist"; + public static final String DRY_RUN_QUERY = "dry_run_query"; // Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3. @@ -705,6 +709,12 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true) public boolean enableProfile = false; + @VariableMgr.VarAttr(name = ENABLE_VERBOSE_PROFILE, needForward = true) + public boolean enableVerboseProfile = true; + + @VariableMgr.VarAttr(name = RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT, needForward = true) + public int rpcVerboseProfileMaxInstanceCount = 5; + // if true, need report to coordinator when plan fragment execute successfully. @VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true) public int autoProfileThresholdMs = -1; @@ -1475,6 +1485,13 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { @VariableMgr.VarAttr(name = USE_FIX_REPLICA) public int useFixReplica = -1; + // This is a debug feature, when we find a backend is not stable(for example network reasons) + // we could use this variable to exclude it from query plan. It is only used for query. Not for + // load jobs. + // Use could set multiple backendids using , to split like "10111,10112" + @VariableMgr.VarAttr(name = QUERY_BACKEND_BLACKLIST, needForward = true) + public String queryBackendBlacklist = ""; + @VariableMgr.VarAttr(name = DUMP_NEREIDS_MEMO) public boolean dumpNereidsMemo = false; @@ -3386,6 +3403,8 @@ public TQueryOptions toThrift() { tResult.setQueryTimeout(queryTimeoutS); tResult.setEnableProfile(enableProfile); + tResult.setEnableVerboseProfile(enableVerboseProfile); + tResult.setRpcVerboseProfileMaxInstanceCount(rpcVerboseProfileMaxInstanceCount); if (enableProfile) { // If enable profile == true, then also set report success to true // be need report success to start report thread. But it is very tricky @@ -4050,6 +4069,21 @@ public void setIgnoreStorageDataDistribution(boolean ignoreStorageDataDistributi this.ignoreStorageDataDistribution = ignoreStorageDataDistribution; } + // If anything wrong during parsing, just throw exception to forbidden the query + // so there is not many exception handling logic here. + public Set getQueryBackendBlacklist() { + Set blacklist = Sets.newHashSet(); + if (Strings.isNullOrEmpty(queryBackendBlacklist)) { + return blacklist; + } + String[] backendIds = this.queryBackendBlacklist.trim().split(","); + for (int i = 0; i < backendIds.length; ++i) { + long backendId = Long.parseLong(backendIds[i].trim()); + blacklist.add(backendId); + } + return blacklist; + } + public boolean isForceJniScanner() { return forceJniScanner; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 41b8fb8cf02703..422b4baaff98cc 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -322,6 +322,9 @@ struct TQueryOptions { 127: optional i32 runtime_bloom_filter_max_size = 16777216; + 128: optional bool enable_verbose_profile = false; + 129: optional i32 rpc_verbose_profile_max_instance_count = 0; + // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false }