diff --git a/be/src/common/config.h b/be/src/common/config.h index fe92b6b3ea44e0..601d36fd7d0c63 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -729,6 +729,8 @@ CONF_String(rpc_load_balancer, "rr"); // so we set a soft limit, default is 1MB CONF_mInt32(string_type_length_soft_limit_bytes, "1048576"); +// when the size of unused_object_pool is greater than object_pool_buffer_size, release the object in the unused_object_pool. +CONF_Int32(object_pool_buffer_size, "100"); CONF_Validator(string_type_length_soft_limit_bytes, [](const int config) -> bool { return config > 0 && config <= 2147483643; }); diff --git a/be/src/common/object_pool.h b/be/src/common/object_pool.h index c8062aa590821b..2683c92ae1e2db 100644 --- a/be/src/common/object_pool.h +++ b/be/src/common/object_pool.h @@ -54,6 +54,11 @@ class ObjectPool { for (Element& elem : _objects) elem.delete_fn(elem.obj); _objects.clear(); } + + uint64_t size() { + std::lock_guard l(_lock); + return _objects.size(); + } void acquire_data(ObjectPool* src) { _objects.insert(_objects.end(), src->_objects.begin(), src->_objects.end()); diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 3a675d04f4d4b1..ec6634666a86f0 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -60,6 +60,7 @@ Status OlapScanner::prepare( const std::vector& filters, const std::vector>>& bloom_filters) { + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); set_tablet_reader(); // set limit to reduce end of rowset and segment mem use _tablet_reader->set_batch_size( @@ -74,8 +75,7 @@ Status OlapScanner::prepare( _version = strtoul(scan_range.version.c_str(), nullptr, 10); { std::string err; - _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash, - true, &err); + _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); if (_tablet.get() == nullptr) { std::stringstream ss; ss << "failed to get tablet. tablet_id=" << tablet_id @@ -84,7 +84,7 @@ Status OlapScanner::prepare( return Status::InternalError(ss.str()); } { - ReadLock rdlock(_tablet->get_header_lock()); + std::shared_lock rdlock(_tablet->get_header_lock()); const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); if (rowset == nullptr) { std::stringstream ss; @@ -97,9 +97,9 @@ Status OlapScanner::prepare( // to prevent this case: when there are lots of olap scanners to run for example 10000 // the rowsets maybe compacted when the last olap scanner starts Version rd_version(0, _version); - OLAPStatus acquire_reader_st = + Status acquire_reader_st = _tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers); - if (acquire_reader_st != OLAP_SUCCESS) { + if (!acquire_reader_st.ok()) { LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; std::stringstream ss; ss << "failed to initialize storage reader. tablet=" << _tablet->full_name() @@ -120,7 +120,8 @@ Status OlapScanner::prepare( Status OlapScanner::open() { SCOPED_TIMER(_parent->_reader_init_timer); - + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + if (_conjunct_ctxs.size() > _parent->_direct_conjunct_size) { _use_pushdown_conjuncts = true; } @@ -128,8 +129,7 @@ Status OlapScanner::open() { _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false); auto res = _tablet_reader->init(_tablet_reader_params); - if (res != OLAP_SUCCESS) { - OLAP_LOG_WARNING("fail to init reader.[res=%d]", res); + if (!res.ok()) { std::stringstream ss; ss << "failed to initialize storage reader. tablet=" << _tablet_reader_params.tablet->full_name() << ", res=" << res @@ -213,10 +213,10 @@ Status OlapScanner::_init_tablet_reader_params( } // use _tablet_reader_params.return_columns, because reader use this to merge sort - OLAPStatus res = + Status res = _read_row_cursor.init(_tablet->tablet_schema(), _tablet_reader_params.return_columns); - if (res != OLAP_SUCCESS) { - OLAP_LOG_WARNING("fail to init row cursor.[res=%d]", res); + if (!res.ok()) { + LOG(WARNING) << "fail to init row cursor.res = " << res; return Status::InternalError("failed to initialize storage read row cursor"); } _read_row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema()); @@ -275,6 +275,7 @@ Status OlapScanner::_init_return_columns() { } Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); // 2. Allocate Row's Tuple buf uint8_t* tuple_buf = batch->tuple_data_pool()->allocate(state->batch_size() * _tuple_desc->byte_size()); @@ -286,6 +287,9 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; { SCOPED_TIMER(_parent->_scan_timer); + ObjectPool tmp_object_pool; // store the object which may can't pass the conjuncts temporarily. pushed all objects into agg_object_pool directly may lead to OOM. + ObjectPool unused_object_pool; // release the memory of the object which can't pass the conjuncts by lot. + while (true) { // Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break if (batch->is_full() || @@ -295,9 +299,17 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { break; } // Read one row from reader + if (tmp_object_pool.size() > 0) { + unused_object_pool.acquire_data(&tmp_object_pool); + } + + if (unused_object_pool.size() >= config::object_pool_buffer_size) { + unused_object_pool.clear(); + } + auto res = _tablet_reader->next_row_with_aggregation(&_read_row_cursor, mem_pool.get(), - batch->agg_object_pool(), eof); - if (res != OLAP_SUCCESS) { + &tmp_object_pool, eof); + if (!res.ok()) { std::stringstream ss; ss << "Internal Error: read storage fail. res=" << res << ", tablet=" << _tablet->full_name() @@ -377,13 +389,13 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { const TypeDescriptor& item_type = desc->type().children.at(0); auto pool = batch->tuple_data_pool(); CollectionValue::deep_copy_collection( - slot, item_type, [pool](int size) -> MemFootprint { - int64_t offset = pool->total_allocated_bytes(); - uint8_t* data = pool->allocate(size); - return { offset, data }; - }, - false - ); + slot, item_type, + [pool](int size) -> MemFootprint { + int64_t offset = pool->total_allocated_bytes(); + uint8_t* data = pool->allocate(size); + return {offset, data}; + }, + false); } // the memory allocate by mem pool has been copied, // so we should release these memory immediately @@ -395,6 +407,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { // check direct && pushdown conjuncts success then commit tuple batch->commit_last_row(); + batch->agg_object_pool()->acquire_data(&tmp_object_pool); char* new_tuple = reinterpret_cast(tuple); new_tuple += _tuple_desc->byte_size(); tuple = reinterpret_cast(new_tuple);