Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ CONF_String(priority_networks, "");

// memory mode
// performance or compact
CONF_String(memory_mode, "performance");
CONF_String(memory_mode, "moderate");

// process memory limit specified as number of bytes
// ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'),
Expand Down
118 changes: 98 additions & 20 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,31 +72,109 @@ void Daemon::tcmalloc_gc_thread() {
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \
!defined(USE_JEMALLOC)

size_t tc_use_memory_min = MemInfo::mem_limit();
// Limit size of tcmalloc cache via release_rate and max_cache_percent.
// We adjust release_rate according to memory_pressure, which is usage percent of memory.
int64_t max_cache_percent = 60;
double release_rates[10] = {1.0, 1.0, 1.0, 5.0, 5.0, 20.0, 50.0, 100.0, 500.0, 2000.0};
int64_t pressure_limit = 90;
bool is_performance_mode = false;
size_t physical_limit_bytes = std::min(MemInfo::hard_mem_limit(), MemInfo::mem_limit());

if (config::memory_mode == std::string("performance")) {
tc_use_memory_min = std::max(tc_use_memory_min / 10 * 9,
tc_use_memory_min - (size_t)10 * 1024 * 1024 * 1024);
} else {
tc_use_memory_min >>= 1;
max_cache_percent = 100;
pressure_limit = 90;
is_performance_mode = true;
physical_limit_bytes = std::min(MemInfo::mem_limit(), MemInfo::physical_mem());
} else if (config::memory_mode == std::string("compact")) {
max_cache_percent = 20;
pressure_limit = 80;
}

while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(10))) {
size_t used_size = 0;
size_t free_size = 0;

int last_ms = 0;
const int kMaxLastMs = 30000;
const int kIntervalMs = 10;
size_t init_aggressive_decommit = 0;
size_t current_aggressive_decommit = 0;
size_t expected_aggressive_decommit = 0;
int64_t last_memory_pressure = 0;

MallocExtension::instance()->GetNumericProperty("tcmalloc.aggressive_memory_decommit",
&init_aggressive_decommit);
current_aggressive_decommit = init_aggressive_decommit;

while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(kIntervalMs))) {
size_t tc_used_bytes = 0;
size_t tc_alloc_bytes = 0;
size_t rss = PerfCounters::get_vm_rss();

MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes",
&tc_alloc_bytes);
MallocExtension::instance()->GetNumericProperty("generic.current_allocated_bytes",
&used_size);
MallocExtension::instance()->GetNumericProperty("tcmalloc.pageheap_free_bytes", &free_size);
size_t alloc_size = used_size + free_size;
LOG(INFO) << "tcmalloc.pageheap_free_bytes " << free_size
<< ", generic.current_allocated_bytes " << used_size << ", tc_use_memory_min "
<< tc_use_memory_min;

if (alloc_size > tc_use_memory_min) {
size_t max_free_size = alloc_size * 20 / 100;
if (free_size > max_free_size) {
MallocExtension::instance()->ReleaseToSystem(free_size - max_free_size);
&tc_used_bytes);
int64_t tc_cached_bytes = tc_alloc_bytes - tc_used_bytes;
int64_t to_free_bytes =
(int64_t)tc_cached_bytes - (tc_used_bytes * max_cache_percent / 100);

int64_t memory_pressure = 0;
int64_t alloc_bytes = std::max(rss, tc_alloc_bytes);
memory_pressure = alloc_bytes * 100 / physical_limit_bytes;

expected_aggressive_decommit = init_aggressive_decommit;
if (memory_pressure > pressure_limit) {
// We are reaching oom, so release cache aggressively.
// Ideally, we should reuse cache and not allocate from system any more,
// however, it is hard to set limit on cache of tcmalloc and doris
// use mmap in vectorized mode.
if (last_memory_pressure <= pressure_limit) {
int64_t min_free_bytes = alloc_bytes - physical_limit_bytes * 9 / 10;
to_free_bytes = std::max(to_free_bytes, min_free_bytes);
to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 30 / 100);
to_free_bytes = std::min(to_free_bytes, tc_cached_bytes);
expected_aggressive_decommit = 1;
} else {
// release rate is enough.
to_free_bytes = 0;
}
last_ms = kMaxLastMs;
} else if (memory_pressure > (pressure_limit - 10)) {
if (last_memory_pressure <= (pressure_limit - 10)) {
to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 10 / 100);
} else {
to_free_bytes = 0;
}
}

int release_rate_index = memory_pressure / 10;
double release_rate = 1.0;
if (release_rate_index >= sizeof(release_rates)) {
release_rate = 2000.0;
} else {
release_rate = release_rates[release_rate_index];
}
MallocExtension::instance()->SetMemoryReleaseRate(release_rate);

if ((current_aggressive_decommit != expected_aggressive_decommit) && !is_performance_mode) {
MallocExtension::instance()->SetNumericProperty("tcmalloc.aggressive_memory_decommit",
expected_aggressive_decommit);
current_aggressive_decommit = expected_aggressive_decommit;
}

last_memory_pressure = memory_pressure;
if (to_free_bytes > 0) {
last_ms += kIntervalMs;
if (last_ms >= kMaxLastMs) {
LOG(INFO) << "generic.current_allocated_bytes " << tc_used_bytes
<< ", generic.total_physical_bytes " << tc_alloc_bytes << ", rss " << rss
<< ", max_cache_percent " << max_cache_percent << ", release_rate "
<< release_rate << ", memory_pressure " << memory_pressure
<< ", physical_limit_bytes " << physical_limit_bytes << ", to_free_bytes "
<< to_free_bytes << ", current_aggressive_decommit "
<< current_aggressive_decommit;
MallocExtension::instance()->ReleaseToSystem(to_free_bytes);
last_ms = 0;
}
} else {
last_ms = 0;
}
}
#endif
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct TrackerLimiterGroup {
static std::vector<TrackerLimiterGroup> mem_tracker_limiter_pool(1000);

std::atomic<bool> MemTrackerLimiter::_enable_print_log_process_usage {true};
bool MemTrackerLimiter::_oom_avoidance {true};

MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit,
RuntimeProfile* profile) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class MemTrackerLimiter final : public MemTracker {
~MemTrackerLimiter();

static bool sys_mem_exceed_limit_check(int64_t bytes) {
if (!_oom_avoidance) {
return false;
}
// Limit process memory usage using the actual physical memory of the process in `/proc/self/status`.
// This is independent of the consumption value of the mem tracker, which counts the virtual memory
// of the process malloc.
Expand Down Expand Up @@ -109,6 +112,8 @@ class MemTrackerLimiter final : public MemTracker {
// this tracker limiter.
int64_t spare_capacity() const { return _limit - consumption(); }

static void disable_oom_avoidance() { _oom_avoidance = false; }

public:
// If need to consume the tracker frequently, use it
void cache_consume(int64_t bytes);
Expand Down Expand Up @@ -208,6 +213,7 @@ class MemTrackerLimiter final : public MemTracker {
// Avoid frequent printing.
bool _enable_print_log_usage = false;
static std::atomic<bool> _enable_print_log_process_usage;
static bool _oom_avoidance;

// Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove.
std::list<MemTrackerLimiter*>::iterator _tracker_limiter_group_it;
Expand Down
19 changes: 8 additions & 11 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,21 +323,18 @@ int main(int argc, char** argv) {
#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \
!defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
// Change the total TCMalloc thread cache size if necessary.
size_t total_thread_cache_bytes;
if (!MallocExtension::instance()->GetNumericProperty("tcmalloc.max_total_thread_cache_bytes",
&total_thread_cache_bytes)) {
fprintf(stderr, "Failed to get TCMalloc total thread cache size.\n");
}
const size_t kDefaultTotalThreadCacheBytes = 1024 * 1024 * 1024;
if (total_thread_cache_bytes < kDefaultTotalThreadCacheBytes) {
if (!MallocExtension::instance()->SetNumericProperty(
"tcmalloc.max_total_thread_cache_bytes", kDefaultTotalThreadCacheBytes)) {
fprintf(stderr, "Failed to change TCMalloc total thread cache size.\n");
return -1;
}
if (!MallocExtension::instance()->SetNumericProperty("tcmalloc.max_total_thread_cache_bytes",
kDefaultTotalThreadCacheBytes)) {
fprintf(stderr, "Failed to change TCMalloc total thread cache size.\n");
return -1;
}
#endif

if (doris::config::memory_mode == std::string("performance")) {
doris::MemTrackerLimiter::disable_oom_avoidance();
}

std::vector<doris::StorePath> paths;
auto olap_res = doris::parse_conf_store_paths(doris::config::storage_root_path, &paths);
if (!olap_res) {
Expand Down