Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ void LocalExchangeSharedState::sub_running_source_operators(

LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) {
source_deps.resize(num_instances, nullptr);
mem_trackers.resize(num_instances, nullptr);
mem_counters.resize(num_instances, nullptr);
}

vectorized::MutableColumns AggSharedState::_get_keys_hash_table() {
Expand Down
13 changes: 5 additions & 8 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,6 @@ struct AggSharedState : public BasicSharedState {
vectorized::Sizes offsets_of_aggregate_states;
std::vector<size_t> make_nullable_keys;

struct MemoryRecord {
int64_t used_in_arena {};
int64_t used_in_state {};
};
MemoryRecord mem_usage_record;
bool agg_data_created_without_key = false;
bool enable_spill = false;
bool reach_limit = false;
Expand Down Expand Up @@ -825,7 +820,7 @@ struct LocalExchangeSharedState : public BasicSharedState {
LocalExchangeSharedState(int num_instances);
~LocalExchangeSharedState() override;
std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
std::vector<RuntimeProfile::Counter*> mem_counters;
std::atomic<int64_t> mem_usage = 0;
// We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
Expand Down Expand Up @@ -861,13 +856,15 @@ struct LocalExchangeSharedState : public BasicSharedState {
}

void add_mem_usage(int channel_id, size_t delta, bool update_total_mem_usage = true) {
mem_trackers[channel_id]->consume(delta);
mem_counters[channel_id]->update(delta);
if (update_total_mem_usage) {
add_total_mem_usage(delta, channel_id);
}
}

void sub_mem_usage(int channel_id, size_t delta) { mem_trackers[channel_id]->release(delta); }
void sub_mem_usage(int channel_id, size_t delta) {
mem_counters[channel_id]->update(-(int64_t)delta);
}

virtual void add_total_mem_usage(size_t delta, int channel_id) {
if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) {
Expand Down
46 changes: 18 additions & 28 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
_agg_data = Base::_shared_state->agg_data.get();
_agg_arena_pool = Base::_shared_state->agg_arena_pool.get();
_hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT);
_hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable",
TUnit::BYTES, "MemoryUsage", 1);
_serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter(
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
_hash_table_memory_usage =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1);
_serialize_key_arena_memory_usage = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "MemoryUsageSerializeKeyArena", TUnit::BYTES, 1);

_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
Expand Down Expand Up @@ -223,24 +223,17 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() {
},
[&](auto& agg_method) -> void {
auto& data = *agg_method.hash_table;
auto arena_memory_usage =
int64_t arena_memory_usage =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage() -
Base::_shared_state->mem_usage_record.used_in_arena;
Base::_mem_tracker->consume(arena_memory_usage);
Base::_mem_tracker->consume(
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
COUNTER_UPDATE(
_hash_table_memory_usage,
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
Base::_shared_state->mem_usage_record.used_in_state =
data.get_buffer_size_in_bytes();
Base::_shared_state->mem_usage_record.used_in_arena =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage();
_shared_state->aggregate_data_container->memory_usage();
int64_t hash_table_memory_usage = data.get_buffer_size_in_bytes();

COUNTER_SET(_memory_used_counter,
arena_memory_usage + hash_table_memory_usage);
COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value());

COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage);
}},
_agg_data->method_variant);
}
Expand Down Expand Up @@ -419,11 +412,10 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
}

void AggSinkLocalState::_update_memusage_without_key() {
auto arena_memory_usage =
_agg_arena_pool->size() - Base::_shared_state->mem_usage_record.used_in_arena;
Base::_mem_tracker->consume(arena_memory_usage);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
Base::_shared_state->mem_usage_record.used_in_arena = _agg_arena_pool->size();
int64_t arena_memory_usage = _agg_arena_pool->size();
COUNTER_SET(_memory_used_counter, arena_memory_usage);
COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}

Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* block) {
Expand Down Expand Up @@ -875,8 +867,6 @@ Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) {

std::vector<char> tmp_deserialize_buffer;
_deserialize_buffer.swap(tmp_deserialize_buffer);
Base::_mem_tracker->release(Base::_shared_state->mem_usage_record.used_in_state +
Base::_shared_state->mem_usage_record.used_in_arena);
return Base::close(state, exec_status);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr;
RuntimeProfile::Counter* _serialize_key_arena_memory_usage = nullptr;

bool _should_limit_output = false;

Expand Down
4 changes: 0 additions & 4 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,17 +450,13 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, bool* eos) {
vectorized::Block::filter_block_internal(block, _shared_state->need_computes);
if (auto rows = block->rows()) {
_num_rows_returned += rows;
COUNTER_UPDATE(_blocks_returned_counter, 1);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
} else {
reached_limit(block, eos);
}
} else {
if (auto rows = block->rows()) {
_num_rows_returned += rows;
COUNTER_UPDATE(_blocks_returned_counter, 1);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_compute_agg_data_timer = ADD_TIMER(profile(), "ComputeAggDataTime");
_compute_partition_by_timer = ADD_TIMER(profile(), "ComputePartitionByTime");
_compute_order_by_timer = ADD_TIMER(profile(), "ComputeOrderByTime");
_blocks_memory_usage = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageBlocks", TUnit::BYTES, 1);
return Status::OK();
}

Expand Down Expand Up @@ -322,8 +323,10 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block
}
}

COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes());
int64_t block_mem_usage = input_block->allocated_bytes();
COUNTER_UPDATE(local_state._memory_used_counter, block_mem_usage);
COUNTER_SET(local_state._peak_memory_usage_counter, local_state._memory_used_counter->value());
COUNTER_UPDATE(local_state._blocks_memory_usage, block_mem_usage);

//TODO: if need improvement, the is a tips to maintain a free queue,
//so the memory could reuse, no need to new/delete again;
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
RuntimeProfile::Counter* _compute_agg_data_timer = nullptr;
RuntimeProfile::Counter* _compute_partition_by_timer = nullptr;
RuntimeProfile::Counter* _compute_order_by_timer = nullptr;
RuntimeProfile::Counter* _blocks_memory_usage = nullptr;

std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
};
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,6 @@ bool AnalyticLocalState::init_next_partition(BlockRowPos found_partition_end) {
Status AnalyticLocalState::output_current_block(vectorized::Block* block) {
block->swap(std::move(_shared_state->input_blocks[_output_block_index]));
_blocks_memory_usage->add(-block->allocated_bytes());
mem_tracker()->consume(-block->allocated_bytes());
if (_shared_state->origin_cols.size() < block->columns()) {
block->erase_not_in(_shared_state->origin_cols);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/assert_num_rows_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
return Status::Cancelled("Expected {} {} to be returned by expression {}",
to_string_lambda(_assertion), _desired_num_rows, _subquery_string);
}
COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
return Status::OK();
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/cache_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ Status CacheSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b
local_state._current_query_cache_rows += output_block->rows();
auto mem_consume = output_block->allocated_bytes();
local_state._current_query_cache_bytes += mem_consume;
local_state._mem_tracker->consume(mem_consume);

if (_cache_param.entry_max_bytes < local_state._current_query_cache_bytes ||
_cache_param.entry_max_rows < local_state._current_query_cache_rows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,6 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc
block->columns()));
}
local_state.add_num_rows_returned(block->rows());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
// If the limit is not reached, it is important to ensure that _aggregated_block is empty
// because it may still contain data.
// However, if the limit is reached, there is no need to output data even if some exists.
Expand Down
16 changes: 16 additions & 0 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
if (request.block) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong());
COUNTER_SET(_parent->peak_memory_usage_counter(),
_parent->memory_used_counter()->value());
}
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
Expand Down Expand Up @@ -291,6 +294,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
if (request.block) {
static_cast<void>(brpc_request->release_block());
COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong());
}
q.pop();
_total_queue_size--;
Expand Down Expand Up @@ -416,12 +420,24 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
_turn_off_channel(id, lock);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
for (; !broadcast_q.empty(); broadcast_q.pop()) {
if (broadcast_q.front().block_holder->get_block()) {
COUNTER_UPDATE(_parent->memory_used_counter(),
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
}
}
{
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, broadcast_q);
}

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
for (; !q.empty(); q.pop()) {
if (q.front().block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong());
}
}

{
std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
swap(empty, q);
Expand Down
Loading
Loading