Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,10 @@ CONF_mInt32(string_type_length_soft_limit_bytes, "1048576");
CONF_Validator(string_type_length_soft_limit_bytes,
[](const int config) -> bool { return config > 0 && config <= 2147483643; });

// used for olap scanner to save memory, when the size of unused_object_pool
// is greater than object_pool_buffer_size, release the object in the unused_object_pool.
CONF_Int32(object_pool_buffer_size, "100");

} // namespace config

} // namespace doris
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/object_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class ObjectPool {
src->_objects.clear();
}

uint64_t size() {
std::lock_guard<SpinLock> l(_lock);
return _objects.size();
}

private:
ObjectPool(const ObjectPool&) = delete;
void operator=(const ObjectPool&) = delete;
Expand Down
17 changes: 16 additions & 1 deletion be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
{
SCOPED_TIMER(_parent->_scan_timer);
// store the object which may can't pass the conjuncts temporarily.
// otherwise, pushed all objects into agg_object_pool directly may lead to OOM.
ObjectPool tmp_object_pool;
// release the memory of the object which can't pass the conjuncts.
ObjectPool unused_object_pool;
while (true) {
// Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break
if (batch->is_full() ||
Expand All @@ -295,9 +300,18 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
_update_realtime_counter();
break;
}

if (tmp_object_pool.size() > 0) {
unused_object_pool.acquire_data(&tmp_object_pool);
}

if (unused_object_pool.size() >= config::object_pool_buffer_size) {
unused_object_pool.clear();
}

// Read one row from reader
auto res = _tablet_reader->next_row_with_aggregation(&_read_row_cursor, mem_pool.get(),
batch->agg_object_pool(), eof);
&tmp_object_pool, eof);
if (!res.ok()) {
std::stringstream ss;
ss << "Internal Error: read storage fail. res=" << res
Expand Down Expand Up @@ -396,6 +410,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {

// check direct && pushdown conjuncts success then commit tuple
batch->commit_last_row();
batch->agg_object_pool()->acquire_data(&tmp_object_pool);
char* new_tuple = reinterpret_cast<char*>(tuple);
new_tuple += _tuple_desc->byte_size();
tuple = reinterpret_cast<Tuple*>(new_tuple);
Expand Down