Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,14 @@ CONF_Bool(disable_mem_pools, "false");
// to a relative large number or the performance is very very bad.
CONF_Bool(use_mmap_allocate_chunk, "false");

// Chunk Allocator's reserved bytes limit,
// Default value is 2GB, increase this variable can improve performance, but will
// acquire more free memory which can not be used by other modules
CONF_Int64(chunk_reserved_bytes_limit, "2147483648");
// The reserved bytes limit of Chunk Allocator, usually set as a percentage of mem_limit.
// defaults to bytes if no unit is given, the number of bytes must be a multiple of 2.
// must larger than 0. and if larger than physical memory size, it will be set to physical memory size.
// increase this variable can improve performance,
// but will acquire more free memory which can not be used by other modules.
CONF_mString(chunk_reserved_bytes_limit, "20%");
// 1024, The minimum chunk allocator size (in bytes)
CONF_Int32(min_chunk_reserved_bytes, "1024");

// The probing algorithm of partitioned hash table.
// Enable quadratic probing hash table
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,6 @@ void Daemon::init(int argc, char** argv, const std::vector<StorePath>& paths) {

init_doris_metrics(paths);
init_signals();

ChunkAllocator::init_instance(config::chunk_reserved_bytes_limit);
}

void Daemon::start() {
Expand Down
23 changes: 23 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,29 @@ Status ExecEnv::_init_mem_tracker() {
RETURN_IF_ERROR(_disk_io_mgr->init(global_memory_limit_bytes));
RETURN_IF_ERROR(_tmp_file_mgr->init());

// 5. init chunk allocator
if (!BitUtil::IsPowerOf2(config::min_chunk_reserved_bytes)) {
ss << "Config min_chunk_reserved_bytes must be a power-of-two: "
<< config::min_chunk_reserved_bytes;
return Status::InternalError(ss.str());
}

int64_t chunk_reserved_bytes_limit =
ParseUtil::parse_mem_spec(config::chunk_reserved_bytes_limit, global_memory_limit_bytes,
MemInfo::physical_mem(), &is_percent);
if (chunk_reserved_bytes_limit <= 0) {
ss << "Invalid config chunk_reserved_bytes_limit value, must be a percentage or "
"positive bytes value or percentage: "
<< config::chunk_reserved_bytes_limit;
return Status::InternalError(ss.str());
}
chunk_reserved_bytes_limit =
BitUtil::RoundDown(chunk_reserved_bytes_limit, config::min_chunk_reserved_bytes);
ChunkAllocator::init_instance(chunk_reserved_bytes_limit);
LOG(INFO) << "Chunk allocator memory limit: "
<< PrettyPrinter::print(chunk_reserved_bytes_limit, TUnit::BYTES)
<< ", origin config value: " << config::chunk_reserved_bytes_limit;

// TODO(zc): The current memory usage configuration is a bit confusing,
// we need to sort out the use of memory
return Status::OK();
Expand Down
32 changes: 27 additions & 5 deletions be/src/runtime/memory/chunk_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_alloc_count, MetricUnit::
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_free_count, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_alloc_cost_ns, MetricUnit::NANOSECONDS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_free_cost_ns, MetricUnit::NANOSECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(chunk_pool_reserved_bytes, MetricUnit::NOUNIT);

static IntCounter* chunk_pool_local_core_alloc_count;
static IntCounter* chunk_pool_other_core_alloc_count;
static IntCounter* chunk_pool_system_alloc_count;
static IntCounter* chunk_pool_system_free_count;
static IntCounter* chunk_pool_system_alloc_cost_ns;
static IntCounter* chunk_pool_system_free_cost_ns;
static IntGauge* chunk_pool_reserved_bytes;

#ifdef BE_TEST
static std::mutex s_mutex;
Expand Down Expand Up @@ -115,6 +117,7 @@ void ChunkAllocator::init_instance(size_t reserve_limit) {

ChunkAllocator::ChunkAllocator(size_t reserve_limit)
: _reserve_bytes_limit(reserve_limit),
_steal_arena_limit(reserve_limit * 0.1),
_reserved_bytes(0),
_arenas(CpuInfo::get_max_num_cores()) {
_mem_tracker =
Expand All @@ -132,6 +135,7 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit)
INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_free_count);
INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_alloc_cost_ns);
INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_free_cost_ns);
INT_GAUGE_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_reserved_bytes);
}

Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, bool check_limits) {
Expand All @@ -158,8 +162,11 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker,
chunk_pool_local_core_alloc_count->increment(1);
return Status::OK();
}
if (_reserved_bytes > size) {
// try to allocate from other core's arena
// Second path: try to allocate from other core's arena
// When the reserved bytes is greater than the limit, the chunk is stolen from other arena.
// Otherwise, it is allocated from the system first, which can reserve enough memory as soon as possible.
// After that, allocate from current core arena as much as possible.
if (_reserved_bytes > _steal_arena_limit) {
++core_id;
for (int i = 1; i < _arenas.size(); ++i, ++core_id) {
if (_arenas[core_id % _arenas.size()]->pop_free_chunk(size, &chunk->data)) {
Expand Down Expand Up @@ -192,16 +199,14 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker,

void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) {
// The chunk's memory ownership is transferred from tls tracker to ChunkAllocator.
DCHECK(chunk.core_id != -1);
if (tracker) {
tracker->transfer_to(_mem_tracker.get(), chunk.size);
} else {
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(),
chunk.size);
}
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
if (chunk.core_id == -1) {
return;
}
int64_t old_reserved_bytes = _reserved_bytes;
int64_t new_reserved_bytes = 0;
do {
Expand All @@ -219,6 +224,15 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) {
}
} while (!_reserved_bytes.compare_exchange_weak(old_reserved_bytes, new_reserved_bytes));

// The memory size of allocate/free is a multiple of 2, so `_reserved_bytes% 100 == 32`
// will definitely happen, and the latest `_reserved_bytes` value will be set every time.
// The real-time and accurate `_reserved_bytes` value is not required. Usually,
// the value of `_reserved_bytes` is equal to ChunkAllocator MemTracker.
// The `_reserved_bytes` metric is only concerned when verifying the accuracy of MemTracker.
// Therefore, reduce the number of sets and reduce the performance impact.
if (_reserved_bytes % 100 == 32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to make sure the correctness?

At the first look, (_reserved_bytes % 100 < 32) or (_reserved_bytes % 100 > 32) both will not update the metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the first look, ChunkAllocator will allocate/free many times, the memory size of each allocate/free is a multiple of 2, so _reserved_bytes% 100 == 32 will definitely happen, and the latest _reserved_bytes value will be set each time .

The real-time and accurate _reserved_bytes value is not required. Usually, the value of _reserved_bytes is equal to ChunkAllocator MemTracker. The _reserved_bytes metric is only concerned when verifying the accuracy of MemTracker.

Therefore, reduce the number of sets and reduce the performance impact.

chunk_pool_reserved_bytes->set_value(_reserved_bytes);
}
_arenas[chunk.core_id]->push_free_chunk(chunk.data, chunk.size);
}

Expand All @@ -227,4 +241,12 @@ Status ChunkAllocator::allocate_align(size_t size, Chunk* chunk, MemTracker* tra
return allocate(BitUtil::RoundUpToPowerOfTwo(size), chunk, tracker, check_limits);
}

void ChunkAllocator::free(uint8_t* data, size_t size, MemTracker* tracker) {
Chunk chunk;
chunk.data = data;
chunk.size = size;
chunk.core_id = CpuInfo::get_current_core();
free(chunk, tracker);
}

} // namespace doris
9 changes: 9 additions & 0 deletions be/src/runtime/memory/chunk_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,19 @@ class ChunkAllocator {
// Free chunk allocated from this allocator
void free(const Chunk& chunk, MemTracker* tracker = nullptr);

// Transfer the memory ownership to the chunk allocator.
// If the chunk allocator is full, then free to the system.
// Note: make sure that the length of 'data' is equal to size,
// otherwise the capacity of chunk allocator will be wrong.
void free(uint8_t* data, size_t size, MemTracker* tracker = nullptr);

private:
static ChunkAllocator* _s_instance;

size_t _reserve_bytes_limit;
// When the reserved chunk memory size is greater than the limit,
// it is allowed to steal the chunks of other arenas.
size_t _steal_arena_limit;
std::atomic<int64_t> _reserved_bytes;
// each core has a ChunkArena
std::vector<std::unique_ptr<ChunkArena>> _arenas;
Expand Down
Loading