Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,13 @@ Status DeltaWriter::cancel() {
return Status::OK();
}

int64_t DeltaWriter::save_mem_consumption_snapshot() {
_mem_consumption_snapshot = mem_consumption();
return _mem_consumption_snapshot;
int64_t DeltaWriter::save_memtable_consumption_snapshot() {
_memtable_consumption_snapshot = memtable_consumption();
return _memtable_consumption_snapshot;
}

int64_t DeltaWriter::get_mem_consumption_snapshot() const {
return _mem_consumption_snapshot;
int64_t DeltaWriter::get_memtable_consumption_snapshot() const {
return _memtable_consumption_snapshot;
}

int64_t DeltaWriter::mem_consumption() const {
Expand All @@ -411,6 +411,13 @@ int64_t DeltaWriter::mem_consumption() const {
return _mem_tracker->consumption();
}

int64_t DeltaWriter::memtable_consumption() const {
if (_mem_table == nullptr) {
return 0;
}
return _mem_table->memory_usage();
}

int64_t DeltaWriter::partition_id() const {
return _req.partition_id;
}
Expand Down
9 changes: 5 additions & 4 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ class DeltaWriter {

int64_t memtable_consumption() const;

int64_t save_mem_consumption_snapshot();
int64_t save_memtable_consumption_snapshot();

int64_t get_mem_consumption_snapshot() const;
int64_t get_memtable_consumption_snapshot() const;

void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);

Expand Down Expand Up @@ -161,8 +161,9 @@ class DeltaWriter {
// use in vectorized load
bool _is_vec;

//only used for std::sort more detail see issue(#9237)
int64_t _mem_consumption_snapshot = 0;
// memory consumption snapshot for current memtable, only
// used for std::sort
int64_t _memtable_consumption_snapshot = 0;

std::unordered_set<int64_t> _unfinished_slave_node;
PSuccessSlaveTabletNodeIds _success_slave_node_ids;
Expand Down
8 changes: 4 additions & 4 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit, TabletWriterAddResult
// Sort the DeltaWriters by mem consumption in descend order.
std::vector<DeltaWriter*> writers;
for (auto& it : _tablet_writers) {
it.second->save_mem_consumption_snapshot();
it.second->save_memtable_consumption_snapshot();
writers.push_back(it.second);
}
std::sort(writers.begin(), writers.end(), [](const DeltaWriter* lhs, const DeltaWriter* rhs) {
return lhs->get_mem_consumption_snapshot() > rhs->get_mem_consumption_snapshot();
return lhs->get_memtable_consumption_snapshot() > rhs->get_memtable_consumption_snapshot();
});

// Decide which writes should be flushed to reduce mem consumption.
Expand All @@ -228,11 +228,11 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit, TabletWriterAddResult
int counter = 0;
int64_t sum = 0;
for (auto writer : writers) {
if (writer->mem_consumption() <= 0) {
if (writer->memtable_consumption() <= 0) {
break;
}
++counter;
sum += writer->mem_consumption();
sum += writer->memtable_consumption();
if (sum > mem_to_flushed) {
break;
}
Expand Down