From ea33000b1ff4c040e3512857dcfdf7111f551d8f Mon Sep 17 00:00:00 2001 From: "kaijion.niu" Date: Wed, 8 Dec 2021 23:36:53 +0800 Subject: [PATCH 01/10] fix typo --- be/src/runtime/etl_job_mgr.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/etl_job_mgr.cpp b/be/src/runtime/etl_job_mgr.cpp index 155141de00c8a8..95b5539a3da2e5 100644 --- a/be/src/runtime/etl_job_mgr.cpp +++ b/be/src/runtime/etl_job_mgr.cpp @@ -248,7 +248,7 @@ Status EtlJobMgr::get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult* r // failed information if (_failed_jobs.exists(id)) { EtlJobCtx ctx; - _success_jobs.get(id, &ctx); + _failed_jobs.get(id, &ctx); result->status.__set_status_code(TStatusCode::OK); result->__set_etl_state(TEtlState::CANCELLED); From e573d62254291a22fa0d688a97983e4f2e8a8cab Mon Sep 17 00:00:00 2001 From: "kaijioh.niu" Date: Thu, 9 Dec 2021 21:24:08 +0800 Subject: [PATCH 02/10] fix typo --- be/src/runtime/etl_job_mgr.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/etl_job_mgr.cpp b/be/src/runtime/etl_job_mgr.cpp index 155141de00c8a8..95b5539a3da2e5 100644 --- a/be/src/runtime/etl_job_mgr.cpp +++ b/be/src/runtime/etl_job_mgr.cpp @@ -248,7 +248,7 @@ Status EtlJobMgr::get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult* r // failed information if (_failed_jobs.exists(id)) { EtlJobCtx ctx; - _success_jobs.get(id, &ctx); + _failed_jobs.get(id, &ctx); result->status.__set_status_code(TStatusCode::OK); result->__set_etl_state(TEtlState::CANCELLED); From 82173715ffd4c051c0bcb33c6871b1f000d0ad0d Mon Sep 17 00:00:00 2001 From: "kaijioh.niu" Date: Wed, 30 Mar 2022 20:40:58 +0800 Subject: [PATCH 03/10] bitmap oom --- be/src/common/object_pool.h | 13 +++++++++++-- be/src/exec/olap_scanner.cpp | 3 +++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/be/src/common/object_pool.h b/be/src/common/object_pool.h index c8062aa590821b..a4aef925150e89 100644 --- a/be/src/common/object_pool.h +++ b/be/src/common/object_pool.h @@ -19,7 +19,7 @@ #define DORIS_BE_SRC_COMMON_COMMON_OBJECT_POOL_H #include -#include +#include #include "util/spinlock.h" @@ -55,6 +55,15 @@ class ObjectPool { _objects.clear(); } + void remove_last_one() { + std::lock_guard l(_lock); + if (!_objects.empty()) { + Element& elem = _objects.back(); + elem.delete_fn(elem.obj); + _objects.pop_back(); + } + } + void acquire_data(ObjectPool* src) { _objects.insert(_objects.end(), src->_objects.begin(), src->_objects.end()); src->_objects.clear(); @@ -73,7 +82,7 @@ class ObjectPool { DeleteFn delete_fn; }; - std::vector _objects; + std::list _objects; SpinLock _lock; }; diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 3a675d04f4d4b1..7a0f44f309ca9b 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -335,6 +335,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { // make sure to reset null indicators since we're overwriting // the tuple assembled for the previous row tuple->init(_tuple_desc->byte_size()); + batch->agg_object_pool()->remove_last_one(); break; } } else { @@ -343,6 +344,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { // make sure to reset null indicators since we're overwriting // the tuple assembled for the previous row tuple->init(_tuple_desc->byte_size()); + batch->agg_object_pool()->remove_last_one(); break; } } @@ -357,6 +359,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { // the tuple assembled for the previous row tuple->init(_tuple_desc->byte_size()); _num_rows_pushed_cond_filtered++; + batch->agg_object_pool()->remove_last_one(); break; } } From 1b232cb0372e0716b47c9066243e225e3ec5d37d Mon Sep 17 00:00:00 2001 From: "kaijioh.niu" Date: Thu, 31 Mar 2022 21:09:04 +0800 Subject: [PATCH 04/10] bitmap oom --- be/src/exec/olap_scanner.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 3a675d04f4d4b1..59ba44e9b58aa0 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -286,6 +286,8 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; { SCOPED_TIMER(_parent->_scan_timer); + ObjectPool tmp_object_pool; + while (true) { // Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break if (batch->is_full() || @@ -295,8 +297,9 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { break; } // Read one row from reader + tmp_object_pool.clear(); auto res = _tablet_reader->next_row_with_aggregation(&_read_row_cursor, mem_pool.get(), - batch->agg_object_pool(), eof); + &tmp_object_pool, eof); if (res != OLAP_SUCCESS) { std::stringstream ss; ss << "Internal Error: read storage fail. res=" << res @@ -395,6 +398,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { // check direct && pushdown conjuncts success then commit tuple batch->commit_last_row(); + batch->agg_object_pool()->acquire_data(&tmp_object_pool); char* new_tuple = reinterpret_cast(tuple); new_tuple += _tuple_desc->byte_size(); tuple = reinterpret_cast(new_tuple); From 5056b9b31aeca4e9e1dffb09c9498bef4a8cd3e8 Mon Sep 17 00:00:00 2001 From: "kaijioh.niu" Date: Thu, 31 Mar 2022 21:17:17 +0800 Subject: [PATCH 05/10] bitmap oom --- be/src/common/object_pool.h | 13 ++----------- be/src/exec/olap_scanner.cpp | 3 --- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/be/src/common/object_pool.h b/be/src/common/object_pool.h index a4aef925150e89..c8062aa590821b 100644 --- a/be/src/common/object_pool.h +++ b/be/src/common/object_pool.h @@ -19,7 +19,7 @@ #define DORIS_BE_SRC_COMMON_COMMON_OBJECT_POOL_H #include -#include +#include #include "util/spinlock.h" @@ -55,15 +55,6 @@ class ObjectPool { _objects.clear(); } - void remove_last_one() { - std::lock_guard l(_lock); - if (!_objects.empty()) { - Element& elem = _objects.back(); - elem.delete_fn(elem.obj); - _objects.pop_back(); - } - } - void acquire_data(ObjectPool* src) { _objects.insert(_objects.end(), src->_objects.begin(), src->_objects.end()); src->_objects.clear(); @@ -82,7 +73,7 @@ class ObjectPool { DeleteFn delete_fn; }; - std::list _objects; + std::vector _objects; SpinLock _lock; }; diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index dc7e87cc6bfb1f..59ba44e9b58aa0 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -338,7 +338,6 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { // make sure to reset null indicators since we're overwriting // the tuple assembled for the previous row tuple->init(_tuple_desc->byte_size()); - batch->agg_object_pool()->remove_last_one(); break; } } else { @@ -347,7 +346,6 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { // make sure to reset null indicators since we're overwriting // the tuple assembled for the previous row tuple->init(_tuple_desc->byte_size()); - batch->agg_object_pool()->remove_last_one(); break; } } @@ -362,7 +360,6 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { // the tuple assembled for the previous row tuple->init(_tuple_desc->byte_size()); _num_rows_pushed_cond_filtered++; - batch->agg_object_pool()->remove_last_one(); break; } } From 5d7b614977ffd9ca4121ac793d25f721ae724ac8 Mon Sep 17 00:00:00 2001 From: "kaijioh.niu" Date: Tue, 5 Apr 2022 23:36:06 +0800 Subject: [PATCH 06/10] bitmap oom fix --- be/src/common/config.h | 1 + be/src/common/object_pool.h | 4 ++++ be/src/exec/olap_scanner.cpp | 10 +++++++++- 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index fe92b6b3ea44e0..a381138eb18669 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -729,6 +729,7 @@ CONF_String(rpc_load_balancer, "rr"); // so we set a soft limit, default is 1MB CONF_mInt32(string_type_length_soft_limit_bytes, "1048576"); +CONF_Int32(object_pool_buffer_size, "100"); CONF_Validator(string_type_length_soft_limit_bytes, [](const int config) -> bool { return config > 0 && config <= 2147483643; }); diff --git a/be/src/common/object_pool.h b/be/src/common/object_pool.h index c8062aa590821b..bdff0f2424d953 100644 --- a/be/src/common/object_pool.h +++ b/be/src/common/object_pool.h @@ -54,6 +54,10 @@ class ObjectPool { for (Element& elem : _objects) elem.delete_fn(elem.obj); _objects.clear(); } + + uint64_t size() { + return _objects.size(); + } void acquire_data(ObjectPool* src) { _objects.insert(_objects.end(), src->_objects.begin(), src->_objects.end()); diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 59ba44e9b58aa0..1c5a6ae7a1c55a 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -287,6 +287,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { { SCOPED_TIMER(_parent->_scan_timer); ObjectPool tmp_object_pool; + ObjectPool unused_object_pool; while (true) { // Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break @@ -297,7 +298,14 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { break; } // Read one row from reader - tmp_object_pool.clear(); + if (tmp_object_pool.size() > 0) { + unused_object_pool.acquire_data(&tmp_object_pool); + } + + if (unused_object_pool.size() >= config::object_pool_buffer_size) { + unused_object_pool.clear(); + } + auto res = _tablet_reader->next_row_with_aggregation(&_read_row_cursor, mem_pool.get(), &tmp_object_pool, eof); if (res != OLAP_SUCCESS) { From f10f02e6fc381da5aefd4ac7c6f549c4dca957cc Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Sun, 24 Apr 2022 22:34:29 +0800 Subject: [PATCH 07/10] bitmap oom fix --- be/src/exec/olap_scanner.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 1c5a6ae7a1c55a..2e4d2d821decf1 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -286,8 +286,8 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; { SCOPED_TIMER(_parent->_scan_timer); - ObjectPool tmp_object_pool; - ObjectPool unused_object_pool; + ObjectPool tmp_object_pool; // store the object which may can't pass the conjuncts temporarily. pushed all objects into agg_object_pool directly may lead to OOM. + ObjectPool unused_object_pool; // release the memory of the object which can't pass the conjuncts by lot. while (true) { // Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break From 6cdc89ba5032986f5d7cf66f36f682dd7807ae01 Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Sun, 24 Apr 2022 23:35:16 +0800 Subject: [PATCH 08/10] Update olap_scanner.cpp --- be/src/exec/olap_scanner.cpp | 39 ++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 2e4d2d821decf1..ec6634666a86f0 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -60,6 +60,7 @@ Status OlapScanner::prepare( const std::vector& filters, const std::vector>>& bloom_filters) { + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); set_tablet_reader(); // set limit to reduce end of rowset and segment mem use _tablet_reader->set_batch_size( @@ -74,8 +75,7 @@ Status OlapScanner::prepare( _version = strtoul(scan_range.version.c_str(), nullptr, 10); { std::string err; - _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash, - true, &err); + _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); if (_tablet.get() == nullptr) { std::stringstream ss; ss << "failed to get tablet. tablet_id=" << tablet_id @@ -84,7 +84,7 @@ Status OlapScanner::prepare( return Status::InternalError(ss.str()); } { - ReadLock rdlock(_tablet->get_header_lock()); + std::shared_lock rdlock(_tablet->get_header_lock()); const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); if (rowset == nullptr) { std::stringstream ss; @@ -97,9 +97,9 @@ Status OlapScanner::prepare( // to prevent this case: when there are lots of olap scanners to run for example 10000 // the rowsets maybe compacted when the last olap scanner starts Version rd_version(0, _version); - OLAPStatus acquire_reader_st = + Status acquire_reader_st = _tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers); - if (acquire_reader_st != OLAP_SUCCESS) { + if (!acquire_reader_st.ok()) { LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; std::stringstream ss; ss << "failed to initialize storage reader. tablet=" << _tablet->full_name() @@ -120,7 +120,8 @@ Status OlapScanner::prepare( Status OlapScanner::open() { SCOPED_TIMER(_parent->_reader_init_timer); - + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + if (_conjunct_ctxs.size() > _parent->_direct_conjunct_size) { _use_pushdown_conjuncts = true; } @@ -128,8 +129,7 @@ Status OlapScanner::open() { _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false); auto res = _tablet_reader->init(_tablet_reader_params); - if (res != OLAP_SUCCESS) { - OLAP_LOG_WARNING("fail to init reader.[res=%d]", res); + if (!res.ok()) { std::stringstream ss; ss << "failed to initialize storage reader. tablet=" << _tablet_reader_params.tablet->full_name() << ", res=" << res @@ -213,10 +213,10 @@ Status OlapScanner::_init_tablet_reader_params( } // use _tablet_reader_params.return_columns, because reader use this to merge sort - OLAPStatus res = + Status res = _read_row_cursor.init(_tablet->tablet_schema(), _tablet_reader_params.return_columns); - if (res != OLAP_SUCCESS) { - OLAP_LOG_WARNING("fail to init row cursor.[res=%d]", res); + if (!res.ok()) { + LOG(WARNING) << "fail to init row cursor.res = " << res; return Status::InternalError("failed to initialize storage read row cursor"); } _read_row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema()); @@ -275,6 +275,7 @@ Status OlapScanner::_init_return_columns() { } Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); // 2. Allocate Row's Tuple buf uint8_t* tuple_buf = batch->tuple_data_pool()->allocate(state->batch_size() * _tuple_desc->byte_size()); @@ -308,7 +309,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { auto res = _tablet_reader->next_row_with_aggregation(&_read_row_cursor, mem_pool.get(), &tmp_object_pool, eof); - if (res != OLAP_SUCCESS) { + if (!res.ok()) { std::stringstream ss; ss << "Internal Error: read storage fail. res=" << res << ", tablet=" << _tablet->full_name() @@ -388,13 +389,13 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { const TypeDescriptor& item_type = desc->type().children.at(0); auto pool = batch->tuple_data_pool(); CollectionValue::deep_copy_collection( - slot, item_type, [pool](int size) -> MemFootprint { - int64_t offset = pool->total_allocated_bytes(); - uint8_t* data = pool->allocate(size); - return { offset, data }; - }, - false - ); + slot, item_type, + [pool](int size) -> MemFootprint { + int64_t offset = pool->total_allocated_bytes(); + uint8_t* data = pool->allocate(size); + return {offset, data}; + }, + false); } // the memory allocate by mem pool has been copied, // so we should release these memory immediately From 91bf4130bb19652e4698561ef14235cc2763b799 Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Sun, 24 Apr 2022 23:36:17 +0800 Subject: [PATCH 09/10] Update object_pool.h --- be/src/common/object_pool.h | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/common/object_pool.h b/be/src/common/object_pool.h index bdff0f2424d953..2683c92ae1e2db 100644 --- a/be/src/common/object_pool.h +++ b/be/src/common/object_pool.h @@ -56,6 +56,7 @@ class ObjectPool { } uint64_t size() { + std::lock_guard l(_lock); return _objects.size(); } From 98b139b674514f1a888ceccccd0f6c9eef8e9efc Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Sun, 24 Apr 2022 23:40:32 +0800 Subject: [PATCH 10/10] Update config.h --- be/src/common/config.h | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/common/config.h b/be/src/common/config.h index a381138eb18669..601d36fd7d0c63 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -729,6 +729,7 @@ CONF_String(rpc_load_balancer, "rr"); // so we set a soft limit, default is 1MB CONF_mInt32(string_type_length_soft_limit_bytes, "1048576"); +// when the size of unused_object_pool is greater than object_pool_buffer_size, release the object in the unused_object_pool. CONF_Int32(object_pool_buffer_size, "100"); CONF_Validator(string_type_length_soft_limit_bytes, [](const int config) -> bool { return config > 0 && config <= 2147483643; });