diff --git a/be/src/common/config.h b/be/src/common/config.h index 5bc62f01235117..4d59fd94d4e9d0 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -732,6 +732,10 @@ CONF_mInt32(string_type_length_soft_limit_bytes, "1048576"); CONF_Validator(string_type_length_soft_limit_bytes, [](const int config) -> bool { return config > 0 && config <= 2147483643; }); +// used for olap scanner to save memory, when the size of unused_object_pool +// is greater than object_pool_buffer_size, release the object in the unused_object_pool. +CONF_Int32(object_pool_buffer_size, "100"); + } // namespace config } // namespace doris diff --git a/be/src/common/object_pool.h b/be/src/common/object_pool.h index c8062aa590821b..865ee87314dee5 100644 --- a/be/src/common/object_pool.h +++ b/be/src/common/object_pool.h @@ -60,6 +60,11 @@ class ObjectPool { src->_objects.clear(); } + uint64_t size() { + std::lock_guard l(_lock); + return _objects.size(); + } + private: ObjectPool(const ObjectPool&) = delete; void operator=(const ObjectPool&) = delete; diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index a905dc98413f63..53f9af57a8e5d0 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -287,6 +287,11 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; { SCOPED_TIMER(_parent->_scan_timer); + // store the object which may can't pass the conjuncts temporarily. + // otherwise, pushed all objects into agg_object_pool directly may lead to OOM. + ObjectPool tmp_object_pool; + // release the memory of the object which can't pass the conjuncts. + ObjectPool unused_object_pool; while (true) { // Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break if (batch->is_full() || @@ -295,9 +300,18 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { _update_realtime_counter(); break; } + + if (tmp_object_pool.size() > 0) { + unused_object_pool.acquire_data(&tmp_object_pool); + } + + if (unused_object_pool.size() >= config::object_pool_buffer_size) { + unused_object_pool.clear(); + } + // Read one row from reader auto res = _tablet_reader->next_row_with_aggregation(&_read_row_cursor, mem_pool.get(), - batch->agg_object_pool(), eof); + &tmp_object_pool, eof); if (!res.ok()) { std::stringstream ss; ss << "Internal Error: read storage fail. res=" << res @@ -396,6 +410,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { // check direct && pushdown conjuncts success then commit tuple batch->commit_last_row(); + batch->agg_object_pool()->acquire_data(&tmp_object_pool); char* new_tuple = reinterpret_cast(tuple); new_tuple += _tuple_desc->byte_size(); tuple = reinterpret_cast(new_tuple);