Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ LoadChannelMgr::~LoadChannelMgr() {
Status LoadChannelMgr::init(int64_t process_mem_limit) {
_load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
_load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100;
// If a load channel's memory consumption is no more than 10% of the hard limit, it's not
// worth to reduce memory on it. Since we only reduce 1/3 memory for one load channel,
// for a channel consume 10% of hard limit, we can only release about 3% memory each time,
// it's not quite helpfull to reduce memory pressure.
// In this case we need to pick multiple load channels to reduce memory more effectively.
_load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.1;
_mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr");
_mem_tracker_set = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD,
"LoadChannelMgrTrackerSet");
Expand Down
103 changes: 74 additions & 29 deletions be/src/runtime/load_channel_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ class LoadChannelMgr {
std::mutex _lock;
// load id -> load channel
std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
std::shared_ptr<LoadChannel> _reduce_memory_channel = nullptr;
Cache* _last_success_channel = nullptr;

// check the total load channel mem consumption of this Backend
Expand All @@ -96,6 +95,14 @@ class LoadChannelMgr {
std::unique_ptr<MemTrackerLimiter> _mem_tracker_set;
int64_t _load_hard_mem_limit = -1;
int64_t _load_soft_mem_limit = -1;
// By default, we try to reduce memory on the load channel with largest mem consumption,
// but if there are lots of small load channel, even the largest one consumes very
// small memory, in this case we need to pick multiple load channels to reduce memory
// more effectively.
// `_load_channel_min_mem_to_reduce` is used to determine whether the largest load channel's
// memory consumption is big enough.
int64_t _load_channel_min_mem_to_reduce = -1;
bool _soft_reduce_mem_in_progress = false;

// If hard limit reached, one thread will trigger load channel flush,
// other threads should wait on the condition variable.
Expand Down Expand Up @@ -171,77 +178,115 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
return Status::OK();
}
// Pick load channel to reduce memory.
std::shared_ptr<LoadChannel> channel;
// Indicate whether current thread is reducing mem on hard limit.
bool reducing_mem_on_hard_limit = false;
std::vector<std::shared_ptr<LoadChannel>> channels_to_reduce_mem;
{
std::unique_lock<std::mutex> l(_lock);
while (_should_wait_flush) {
LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit
<< ", waiting for flush";
_wait_flush_cond.wait(l);
}
bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit ||
MemInfo::proc_mem_no_allocator_cache() >= process_mem_limit;
// Some other thread is flushing data, and not reached hard limit now,
// we don't need to handle mem limit in current thread.
if (_reduce_memory_channel != nullptr &&
_mem_tracker->consumption() < _load_hard_mem_limit &&
MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
return Status::OK();
}

// We need to pick a LoadChannel to reduce memory usage.
// If `_reduce_memory_channel` is not null, it means the hard limit is
// exceed now, we still need to pick a load channel again. Because
// `_reduce_memory_channel` might not be the largest consumer now.
int64_t max_consume = 0;
// Pick LoadChannels to reduce memory usage, if some other thread is reducing memory
// due to soft limit, and we reached hard limit now, current thread may pick some
// duplicate channels and trigger duplicate reducing memory process.
// But the load channel's reduce memory process is thread safe, only 1 thread can
// reduce memory at the same time, other threads will wait on a condition variable,
// after the reduce-memory work finished, all threads will return.
using ChannelMemPair = std::pair<std::shared_ptr<LoadChannel>, int64_t>;
std::vector<ChannelMemPair> candidate_channels;
int64_t total_consume = 0;
for (auto& kv : _load_channels) {
if (kv.second->is_high_priority()) {
// do not select high priority channel to reduce memory
// to avoid blocking them.
continue;
}
if (kv.second->mem_consumption() > max_consume) {
max_consume = kv.second->mem_consumption();
channel = kv.second;
}
int64_t mem = kv.second->mem_consumption();
// save the mem consumption, since the calculation might be expensive.
candidate_channels.push_back(std::make_pair(kv.second, mem));
total_consume += mem;
}
if (max_consume == 0) {

if (candidate_channels.empty()) {
// should not happen, add log to observe
LOG(WARNING) << "failed to find suitable load channel when total load mem limit exceed";
LOG(WARNING) << "All load channels are high priority, failed to find suitable"
<< "channels to reduce memory when total load mem limit exceed";
return Status::OK();
}
DCHECK(channel.get() != nullptr);
_reduce_memory_channel = channel;

// sort all load channels, try to find the largest one.
std::sort(candidate_channels.begin(), candidate_channels.end(),
[](const ChannelMemPair& lhs, const ChannelMemPair& rhs) {
return lhs.second > rhs.second;
});

int64_t mem_consumption_in_picked_channel = 0;
auto largest_channel = *candidate_channels.begin();
// If some load-channel is big enough, we can reduce it only, try our best to avoid
// reducing small load channels.
if (_load_channel_min_mem_to_reduce > 0 &&
largest_channel.second > _load_channel_min_mem_to_reduce) {
// Pick 1 load channel to reduce memory.
channels_to_reduce_mem.push_back(largest_channel.first);
mem_consumption_in_picked_channel = largest_channel.second;
} else {
// Pick multiple channels to reduce memory.
int64_t mem_to_flushed = total_consume / 3;
for (auto ch : candidate_channels) {
channels_to_reduce_mem.push_back(ch.first);
mem_consumption_in_picked_channel += ch.second;
if (mem_consumption_in_picked_channel >= mem_to_flushed) {
break;
}
}
}

std::ostringstream oss;
if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
oss << "reducing memory of " << *channel << " because total load mem consumption "
oss << "reducing memory of " << channels_to_reduce_mem.size()
<< " load channels (total mem consumption: " << mem_consumption_in_picked_channel
<< " bytes), because total load mem consumption "
<< PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES)
<< " has exceeded";
if (_mem_tracker->consumption() > _load_hard_mem_limit) {
_should_wait_flush = true;
reducing_mem_on_hard_limit = true;
oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES);
} else {
_soft_reduce_mem_in_progress = true;
oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES);
}
} else {
_should_wait_flush = true;
reducing_mem_on_hard_limit = true;
oss << "reducing memory of " << *channel << " because process memory used "
<< PerfCounters::get_vm_rss_str() << " has exceeded limit "
oss << "reducing memory of " << channels_to_reduce_mem.size()
<< " load channels (total mem consumption: " << mem_consumption_in_picked_channel
<< " bytes), because " << PerfCounters::get_vm_rss_str() << " has exceeded limit "
<< PrettyPrinter::print(process_mem_limit, TUnit::BYTES)
<< " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str();
}
LOG(INFO) << oss.str();
}

// No matter soft limit or hard limit reached, only 1 thread will wait here,
// if hard limit reached, other threads will pend at the beginning of this
// method.
Status st = channel->handle_mem_exceed_limit(response);
LOG(INFO) << "reduce memory of " << *channel << " finished";
Status st = Status::OK();
for (auto ch : channels_to_reduce_mem) {
uint64_t begin = GetCurrentTimeMicros();
int64_t mem_usage = ch->mem_consumption();
st = ch->handle_mem_exceed_limit(response);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the last st is returned.

LOG(INFO) << "reduced memory of " << *ch << ", cost "
<< (GetCurrentTimeMicros() - begin) / 1000
<< " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes";
}

{
std::lock_guard<std::mutex> l(_lock);
Expand All @@ -251,8 +296,8 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
_should_wait_flush = false;
_wait_flush_cond.notify_all();
}
if (_reduce_memory_channel == channel) {
_reduce_memory_channel = nullptr;
if (_soft_reduce_mem_in_progress) {
_soft_reduce_mem_in_progress = false;
}
}
return st;
Expand Down