Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 261 additions & 38 deletions be/src/exec/rowid_fetcher.cpp

Large diffs are not rendered by default.

26 changes: 19 additions & 7 deletions be/src/exec/rowid_fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ struct RowStoreReadStruct {

class RowIdStorageReader {
public:
//external profile info key.
static const std::string ScannersRunningTimeProfile;
static const std::string InitReaderAvgTimeProfile;
static const std::string GetBlockAvgTimeProfile;
static const std::string FileReadLinesProfile;

static Status read_by_rowids(const PMultiGetRequest& request, PMultiGetResponse* response);
static Status read_by_rowids(const PMultiGetRequestV2& request, PMultiGetResponseV2* response);

Expand All @@ -107,13 +113,19 @@ class RowIdStorageReader {
int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms, int64_t* acquire_segments_ms,
int64_t* lookup_row_data_ms);

static Status read_batch_external_row(const PRequestBlockDesc& request_block_desc,
std::shared_ptr<IdFileMap> id_file_map,
std::vector<SlotDescriptor>& slots,
std::shared_ptr<FileMapping> first_file_mapping,
const TUniqueId& query_id,
vectorized::Block& result_block, int64_t* init_reader_ms,
int64_t* get_block_ms);
static Status read_batch_external_row(
const uint64_t workload_group_id, const PRequestBlockDesc& request_block_desc,
std::shared_ptr<IdFileMap> id_file_map, std::vector<SlotDescriptor>& slots,
std::shared_ptr<FileMapping> first_file_mapping, const TUniqueId& query_id,
vectorized::Block& result_block, PRuntimeProfileTree* pprofile,
int64_t* init_reader_avg_ms, int64_t* get_block_avg_ms, size_t* scan_range_cnt);

struct ExternalFetchStatistics {
int64_t init_reader_ms = 0;
int64_t get_block_ms = 0;
std::string file_read_bytes;
std::string file_read_times;
};
};

template <typename Func>
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/id_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,13 @@ class IdFileMap {

int64_t get_delayed_expired_timestamp() { return delayed_expired_timestamp; }

void set_external_scan_params(QueryContext* query_ctx) {
void set_external_scan_params(QueryContext* query_ctx, int max_file_scanners) {
std::call_once(once_flag_for_external, [&] {
DCHECK(query_ctx != nullptr);
_query_global = query_ctx->get_query_globals();
_query_options = query_ctx->get_query_options();
_file_scan_range_params_map = query_ctx->file_scan_range_params_map;
_max_file_scanners = max_file_scanners;
});
}

Expand All @@ -214,6 +215,8 @@ class IdFileMap {
return _file_scan_range_params_map;
}

int get_max_file_scanners() const { return _max_file_scanners; }

private:
std::shared_mutex _mtx;
uint32_t _init_id = 0;
Expand All @@ -225,6 +228,7 @@ class IdFileMap {
TQueryOptions _query_options;
std::map<int, TFileScanRangeParams> _file_scan_range_params_map;
std::once_flag once_flag_for_external;
int _max_file_scanners = 10;

// use in Doris Format to keep temp rowsets, preventing them from being deleted by compaction
std::unordered_map<std::pair<int64_t, RowsetId>, RowsetSharedPtr> _temp_rowset_maps;
Expand Down
35 changes: 35 additions & 0 deletions be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
#include <mutex>

#include "common/logging.h"
#include "exec/rowid_fetcher.h"
#include "pipeline/exec/multi_cast_data_streamer.h"
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_task.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime_filter/runtime_filter_consumer.h"
#include "util/brpc_client_cache.h"
#include "vec/exec/scan/file_scanner.h"
#include "vec/exprs/vectorized_agg_fn.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/spill/spill_stream_manager.h"
Expand Down Expand Up @@ -481,6 +483,11 @@ Status MaterializationSharedState::merge_multi_response(vectorized::Block* block
DCHECK(rpc_struct.callback->response_->blocks_size() > i);
RETURN_IF_ERROR(
partial_block.deserialize(rpc_struct.callback->response_->blocks(i).block()));
if (rpc_struct.callback->response_->blocks(i).has_profile()) {
auto response_profile = RuntimeProfile::from_proto(
rpc_struct.callback->response_->blocks(i).profile());
_update_profile_info(backend_id, response_profile.get());
}

if (!partial_block.is_empty_column()) {
_block_maps[backend_id] = std::make_pair(std::move(partial_block), 0);
Expand Down Expand Up @@ -534,6 +541,34 @@ Status MaterializationSharedState::merge_multi_response(vectorized::Block* block
return Status::OK();
}

void MaterializationSharedState::_update_profile_info(int64_t backend_id,
RuntimeProfile* response_profile) {
if (!backend_profile_info_string.contains(backend_id)) {
backend_profile_info_string.emplace(backend_id,
std::map<std::string, fmt::memory_buffer> {});
}
auto& info_map = backend_profile_info_string[backend_id];

auto update_profile_info_key = [&](const std::string& info_key) {
const auto* info_value = response_profile->get_info_string(info_key);
if (info_value == nullptr) [[unlikely]] {
LOG(WARNING) << "Get row id fetch rpc profile success, but no info key :" << info_key;
return;
}
if (!info_map.contains(info_key)) {
info_map.emplace(info_key, fmt::memory_buffer {});
}
fmt::format_to(info_map[info_key], "{}, ", *info_value);
};

update_profile_info_key(RowIdStorageReader::ScannersRunningTimeProfile);
update_profile_info_key(RowIdStorageReader::InitReaderAvgTimeProfile);
update_profile_info_key(RowIdStorageReader::GetBlockAvgTimeProfile);
update_profile_info_key(RowIdStorageReader::FileReadLinesProfile);
update_profile_info_key(vectorized::FileScanner::FileReadBytesProfile);
update_profile_info_key(vectorized::FileScanner::FileReadTimeProfile);
}

void MaterializationSharedState::create_counter_dependency(int operator_id, int node_id,
const std::string& name) {
auto dep =
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,10 @@ struct MaterializationSharedState : public BasicSharedState {

void create_counter_dependency(int operator_id, int node_id, const std::string& name);

private:
void _update_profile_info(int64_t backend_id, RuntimeProfile* response_profile);

public:
bool rpc_struct_inited = false;
AtomicStatus rpc_status;

Expand All @@ -842,6 +846,8 @@ struct MaterializationSharedState : public BasicSharedState {
// Register each line in which block to ensure the order of the result.
// Zero means NULL value.
std::vector<std::vector<int64_t>> block_order_results;
// backend id => <rpc profile info string key, rpc profile info string value>.
std::map<int64_t, std::map<std::string, fmt::memory_buffer>> backend_profile_info_string;
};
#include "common/compile_check_end.h"
} // namespace doris::pipeline
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc

auto& id_file_map = state()->get_id_file_map();
if (id_file_map != nullptr) {
id_file_map->set_external_scan_params(state()->get_query_ctx());
id_file_map->set_external_scan_params(state()->get_query_ctx(), _max_scanners);
}

auto& p = _parent->cast<FileScanOperatorX>();
Expand Down
11 changes: 11 additions & 0 deletions be/src/pipeline/exec/materialization_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ Status MaterializationSourceOperatorX::get_block(RuntimeState* state, vectorized
max_rpc_time = std::max(max_rpc_time, rpc_struct.rpc_timer.elapsed_time());
}
COUNTER_SET(local_state._max_rpc_timer, (int64_t)max_rpc_time);

for (const auto& [backend_id, child_info] :
local_state._shared_state->backend_profile_info_string) {
auto child_profile = local_state.operator_profile()->create_child(
"RowIDFetcher: BackendId:" + std::to_string(backend_id));
for (const auto& [info_key, info_value] :
local_state._shared_state->backend_profile_info_string[backend_id]) {
child_profile->add_info_string(info_key, "{" + fmt::to_string(info_value) + "}");
}
local_state.operator_profile()->add_child(child_profile, true);
}
}

return Status::OK();
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id,

RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id,
const TQueryOptions& query_options, const TQueryGlobals& query_globals,
ExecEnv* exec_env)
ExecEnv* exec_env,
const std::shared_ptr<MemTrackerLimiter>& query_mem_tracker)
: _profile("PipelineX " + std::to_string(fragment_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
Expand All @@ -150,7 +151,8 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id,
_error_row_number(0) {
Status status = init(TUniqueId(), query_options, query_globals, exec_env);
DCHECK(status.ok());
init_mem_trackers("<unnamed>");
_query_mem_tracker = query_mem_tracker;
DCHECK(_query_mem_tracker != nullptr);
}

RuntimeState::RuntimeState(const TQueryGlobals& query_globals)
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ class RuntimeState {
// Only used in the materialization phase of delayed materialization,
// when there may be no corresponding QueryContext.
RuntimeState(const TUniqueId& query_id, int32_t fragment_id, const TQueryOptions& query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env);
const TQueryGlobals& query_globals, ExecEnv* exec_env,
const std::shared_ptr<MemTrackerLimiter>& query_mem_tracker);

// RuntimeState for executing expr in fe-support.
RuntimeState(const TQueryGlobals& query_globals);
Expand Down
140 changes: 140 additions & 0 deletions be/src/util/runtime_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ std::unique_ptr<RuntimeProfile> RuntimeProfile::from_thrift(const TRuntimeProfil
return res;
}

std::unique_ptr<RuntimeProfile> RuntimeProfile::from_proto(const PRuntimeProfileTree& tree) {
if (tree.nodes().empty()) {
return std::make_unique<RuntimeProfile>("");
}

const PRuntimeProfileNode& root_node = tree.nodes(0);
std::unique_ptr<RuntimeProfile> res = std::make_unique<RuntimeProfile>(root_node.name());
res->update(tree);
return res;
}

RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile)
: _pool(new ObjectPool()),
_name(name),
Expand Down Expand Up @@ -151,6 +162,12 @@ void RuntimeProfile::update(const TRuntimeProfileTree& thrift_profile) {
DCHECK_EQ(idx, thrift_profile.nodes.size());
}

void RuntimeProfile::update(const PRuntimeProfileTree& proto_profile) {
int idx = 0;
update(proto_profile.nodes(), &idx);
DCHECK_EQ(idx, proto_profile.nodes_size());
}

void RuntimeProfile::update(const std::vector<TRuntimeProfileNode>& nodes, int* idx) {
DCHECK_LT(*idx, nodes.size());
const TRuntimeProfileNode& node = nodes[*idx];
Expand Down Expand Up @@ -233,6 +250,82 @@ void RuntimeProfile::update(const std::vector<TRuntimeProfileNode>& nodes, int*
}
}

void RuntimeProfile::update(const google::protobuf::RepeatedPtrField<PRuntimeProfileNode>& nodes,
int* idx) {
DCHECK_LT(*idx, nodes.size());
const PRuntimeProfileNode& node = nodes.Get(*idx);

{
std::lock_guard<std::mutex> l(_counter_map_lock);

for (const auto& pcounter : node.counters()) {
const std::string& name = pcounter.name();
auto j = _counter_map.find(name);

if (j == _counter_map.end()) {
_counter_map[name] =
_pool->add(new Counter(unit_to_thrift(pcounter.type()), pcounter.value()));
} else {
if (unit_to_proto(j->second->type()) != pcounter.type()) {
LOG(ERROR) << "Cannot update counters with the same name (" << name
<< ") but different types.";
} else {
j->second->set(pcounter.value());
}
}
}

for (const auto& kv : node.child_counters_map()) {
std::set<std::string>* child_counters =
find_or_insert(&_child_counter_map, kv.first, std::set<std::string>());
for (const auto& child_name : kv.second.child_counters()) {
child_counters->insert(child_name);
}
}
}

{
std::lock_guard<std::mutex> l(_info_strings_lock);
const auto& info_map = node.info_strings();

for (const std::string& key : node.info_strings_display_order()) {
auto it = info_map.find(key);
DCHECK(it != info_map.end());

auto existing = _info_strings.find(key);
if (existing == _info_strings.end()) {
_info_strings.insert(std::make_pair(key, it->second));
_info_strings_display_order.push_back(key);
} else {
_info_strings[key] = it->second;
}
}
}

++*idx;

{
std::lock_guard<std::mutex> l(_children_lock);
for (int i = 0; i < node.num_children(); ++i) {
const PRuntimeProfileNode& pchild = nodes.Get(*idx);
RuntimeProfile* child = nullptr;

auto j = _child_map.find(pchild.name());
if (j != _child_map.end()) {
child = j->second;
} else {
child = _pool->add(new RuntimeProfile(pchild.name()));
child->_metadata = pchild.metadata();
child->_timestamp = pchild.timestamp();
_child_map[pchild.name()] = child;
_children.emplace_back(child, pchild.indent());
}

child->update(nodes, idx);
}
}
}

void RuntimeProfile::divide(int n) {
DCHECK_GT(n, 0);
std::map<std::string, Counter*>::iterator iter;
Expand Down Expand Up @@ -626,6 +719,53 @@ void RuntimeProfile::to_thrift(std::vector<TRuntimeProfileNode>* nodes, int64_t
}
}

void RuntimeProfile::to_proto(PRuntimeProfileTree* tree, int64_t profile_level) {
tree->clear_nodes();
to_proto(tree->mutable_nodes(), profile_level);
}

void RuntimeProfile::to_proto(google::protobuf::RepeatedPtrField<PRuntimeProfileNode>* nodes,
int64_t profile_level) {
PRuntimeProfileNode* node = nodes->Add(); // allocate new node
node->set_name(_name);
node->set_metadata(_metadata);
node->set_timestamp(_timestamp);
node->set_indent(true);

{
std::lock_guard<std::mutex> l(_counter_map_lock);
RuntimeProfileCounterTreeNode counter_tree = RuntimeProfileCounterTreeNode::from_map(
_counter_map, _child_counter_map, ROOT_COUNTER);
counter_tree = RuntimeProfileCounterTreeNode::prune_the_tree(counter_tree, profile_level);
counter_tree.to_proto(node->mutable_counters(), node->mutable_child_counters_map());
}

{
std::lock_guard<std::mutex> l(_info_strings_lock);
auto* info_map = node->mutable_info_strings();
for (const auto& kv : _info_strings) {
(*info_map)[kv.first] = kv.second;
}
for (const auto& key : _info_strings_display_order) {
node->add_info_strings_display_order(key);
}
}

ChildVector children;
{
std::lock_guard<std::mutex> l(_children_lock);
children = _children;
}

node->set_num_children(children.size());

for (const auto& child : children) {
int child_index = nodes->size(); // capture index for indent correction
child.first->to_proto(nodes, profile_level);
(*nodes)[child_index].set_indent(child.second);
}
}

int64_t RuntimeProfile::units_per_second(const RuntimeProfile::Counter* total_counter,
const RuntimeProfile::Counter* timer) {
DCHECK(total_counter->type() == TUnit::BYTES || total_counter->type() == TUnit::UNIT);
Expand Down
Loading
Loading