Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ void LocalExchangeSharedState::sub_running_source_operators(

LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) {
source_deps.resize(num_instances, nullptr);
mem_trackers.resize(num_instances, nullptr);
mem_counters.resize(num_instances, nullptr);
}

vectorized::MutableColumns AggSharedState::_get_keys_hash_table() {
Expand Down
13 changes: 5 additions & 8 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,6 @@ struct AggSharedState : public BasicSharedState {
vectorized::Sizes offsets_of_aggregate_states;
std::vector<size_t> make_nullable_keys;

struct MemoryRecord {
int64_t used_in_arena {};
int64_t used_in_state {};
};
MemoryRecord mem_usage_record;
bool agg_data_created_without_key = false;
bool enable_spill = false;
bool reach_limit = false;
Expand Down Expand Up @@ -829,7 +824,7 @@ struct LocalExchangeSharedState : public BasicSharedState {
LocalExchangeSharedState(int num_instances);
~LocalExchangeSharedState() override;
std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
std::vector<RuntimeProfile::Counter*> mem_counters;
std::atomic<int64_t> mem_usage = 0;
// We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
Expand Down Expand Up @@ -865,13 +860,15 @@ struct LocalExchangeSharedState : public BasicSharedState {
}

void add_mem_usage(int channel_id, size_t delta, bool update_total_mem_usage = true) {
mem_trackers[channel_id]->consume(delta);
mem_counters[channel_id]->update(delta);
if (update_total_mem_usage) {
add_total_mem_usage(delta, channel_id);
}
}

void sub_mem_usage(int channel_id, size_t delta) { mem_trackers[channel_id]->release(delta); }
void sub_mem_usage(int channel_id, size_t delta) {
mem_counters[channel_id]->update(-(int64_t)delta);
}

virtual void add_total_mem_usage(size_t delta, int channel_id) {
if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) {
Expand Down
46 changes: 18 additions & 28 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
_agg_data = Base::_shared_state->agg_data.get();
_agg_arena_pool = Base::_shared_state->agg_arena_pool.get();
_hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT);
_hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable",
TUnit::BYTES, "MemoryUsage", 1);
_serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter(
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
_hash_table_memory_usage =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1);
_serialize_key_arena_memory_usage = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "MemoryUsageSerializeKeyArena", TUnit::BYTES, 1);

_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime");
Expand Down Expand Up @@ -227,24 +227,17 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() {
},
[&](auto& agg_method) -> void {
auto& data = *agg_method.hash_table;
auto arena_memory_usage =
int64_t arena_memory_usage =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage() -
Base::_shared_state->mem_usage_record.used_in_arena;
Base::_mem_tracker->consume(arena_memory_usage);
Base::_mem_tracker->consume(
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
COUNTER_UPDATE(
_hash_table_memory_usage,
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
Base::_shared_state->mem_usage_record.used_in_state =
data.get_buffer_size_in_bytes();
Base::_shared_state->mem_usage_record.used_in_arena =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage();
_shared_state->aggregate_data_container->memory_usage();
int64_t hash_table_memory_usage = data.get_buffer_size_in_bytes();

COUNTER_SET(_memory_used_counter,
arena_memory_usage + hash_table_memory_usage);
COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value());

COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage);
}},
_agg_data->method_variant);
}
Expand Down Expand Up @@ -423,11 +416,10 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
}

void AggSinkLocalState::_update_memusage_without_key() {
auto arena_memory_usage =
_agg_arena_pool->size() - Base::_shared_state->mem_usage_record.used_in_arena;
Base::_mem_tracker->consume(arena_memory_usage);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
Base::_shared_state->mem_usage_record.used_in_arena = _agg_arena_pool->size();
int64_t arena_memory_usage = _agg_arena_pool->size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lost agg hash table?

COUNTER_SET(_memory_used_counter, arena_memory_usage);
COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}

Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* block) {
Expand Down Expand Up @@ -876,8 +868,6 @@ Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) {

std::vector<char> tmp_deserialize_buffer;
_deserialize_buffer.swap(tmp_deserialize_buffer);
Base::_mem_tracker->release(Base::_shared_state->mem_usage_record.used_in_state +
Base::_shared_state->mem_usage_record.used_in_arena);
return Base::close(state, exec_status);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
RuntimeProfile::Counter* _max_row_size_counter = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This counter is not updated?

RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr;
RuntimeProfile::Counter* _serialize_key_arena_memory_usage = nullptr;

bool _should_limit_output = false;

Expand Down
4 changes: 0 additions & 4 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,17 +461,13 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, bool* eos) {
vectorized::Block::filter_block_internal(block, _shared_state->need_computes);
if (auto rows = block->rows()) {
_num_rows_returned += rows;
COUNTER_UPDATE(_blocks_returned_counter, 1);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
} else {
reached_limit(block, eos);
}
} else {
if (auto rows = block->rows()) {
_num_rows_returned += rows;
COUNTER_UPDATE(_blocks_returned_counter, 1);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSharedState>::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_blocks_memory_usage =
_profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1);
_blocks_memory_usage = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageBlocks", TUnit::BYTES, 1);
_evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
return Status::OK();
}
Expand Down Expand Up @@ -312,8 +311,10 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block
local_state._shared_state->ordey_by_column_idxs[i] = result_col_id;
}

local_state.mem_tracker()->consume(input_block->allocated_bytes());
local_state._blocks_memory_usage->add(input_block->allocated_bytes());
int64_t block_mem_usage = input_block->allocated_bytes();
COUNTER_UPDATE(local_state._memory_used_counter, block_mem_usage);
COUNTER_SET(local_state._peak_memory_usage_counter, local_state._memory_used_counter->value());
COUNTER_UPDATE(local_state._blocks_memory_usage, block_mem_usage);

//TODO: if need improvement, the is a tips to maintain a free queue,
//so the memory could reuse, no need to new/delete again;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
bool _whether_need_next_partition(BlockRowPos& found_partition_end);

RuntimeProfile::Counter* _evaluation_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;
RuntimeProfile::Counter* _blocks_memory_usage = nullptr;

std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
};
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_blocks_memory_usage =
profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1);
profile()->AddHighWaterMarkCounter("MemoryUsageBlocks", TUnit::BYTES, "", 1);
_evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
return Status::OK();
}
Expand Down Expand Up @@ -443,7 +443,6 @@ bool AnalyticLocalState::init_next_partition(BlockRowPos found_partition_end) {
Status AnalyticLocalState::output_current_block(vectorized::Block* block) {
block->swap(std::move(_shared_state->input_blocks[_output_block_index]));
_blocks_memory_usage->add(-block->allocated_bytes());
mem_tracker()->consume(-block->allocated_bytes());
if (_shared_state->origin_cols.size() < block->columns()) {
block->erase_not_in(_shared_state->origin_cols);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/assert_num_rows_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
return Status::Cancelled("Expected {} {} to be returned by expression {}",
to_string_lambda(_assertion), _desired_num_rows, _subquery_string);
}
COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
return Status::OK();
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/cache_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ Status CacheSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b
local_state._current_query_cache_rows += output_block->rows();
auto mem_consume = output_block->allocated_bytes();
local_state._current_query_cache_bytes += mem_consume;
local_state._mem_tracker->consume(mem_consume);

if (_cache_param.entry_max_bytes < local_state._current_query_cache_bytes ||
_cache_param.entry_max_rows < local_state._current_query_cache_rows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,6 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc
block->columns()));
}
local_state.add_num_rows_returned(block->rows());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
// If the limit is not reached, it is important to ensure that _aggregated_block is empty
// because it may still contain data.
// However, if the limit is reached, there is no need to output data even if some exists.
Expand Down
30 changes: 28 additions & 2 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
if (request.block) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong());
COUNTER_SET(_parent->peak_memory_usage_counter(),
_parent->memory_used_counter()->value());
}
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
Expand Down Expand Up @@ -303,6 +306,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
}
if (request.block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong());
static_cast<void>(brpc_request->release_block());
}
q.pop();
Expand Down Expand Up @@ -435,8 +439,30 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
_turn_off_channel(id, true);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, _instance_to_broadcast_package_queue[id]);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
for (; !broadcast_q.empty(); broadcast_q.pop()) {
if (broadcast_q.front().block_holder->get_block()) {
COUNTER_UPDATE(_parent->memory_used_counter(),
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
}
}
{
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, broadcast_q);
}

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
for (; !q.empty(); q.pop()) {
if (q.front().block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong());
}
}

{
std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
swap(empty, q);
}
}

bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
Expand Down
Loading