Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,8 @@ CONF_String(rpc_load_balancer, "rr");
// so we set a soft limit, default is 1MB
CONF_mInt32(string_type_length_soft_limit_bytes, "1048576");

// when the size of unused_object_pool is greater than object_pool_buffer_size, release the object in the unused_object_pool.
CONF_Int32(object_pool_buffer_size, "100");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment for this new config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

CONF_Validator(string_type_length_soft_limit_bytes,
[](const int config) -> bool { return config > 0 && config <= 2147483643; });

Expand Down
5 changes: 5 additions & 0 deletions be/src/common/object_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class ObjectPool {
for (Element& elem : _objects) elem.delete_fn(elem.obj);
_objects.clear();
}

uint64_t size() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a std::lock_guard<SpinLock> l(_lock);?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

std::lock_guard<SpinLock> l(_lock);
return _objects.size();
}

void acquire_data(ObjectPool* src) {
_objects.insert(_objects.end(), src->_objects.begin(), src->_objects.end());
Expand Down
53 changes: 33 additions & 20 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Status OlapScanner::prepare(
const std::vector<TCondition>& filters,
const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters) {
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
set_tablet_reader();
// set limit to reduce end of rowset and segment mem use
_tablet_reader->set_batch_size(
Expand All @@ -74,8 +75,7 @@ Status OlapScanner::prepare(
_version = strtoul(scan_range.version.c_str(), nullptr, 10);
{
std::string err;
_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash,
true, &err);
_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
if (_tablet.get() == nullptr) {
std::stringstream ss;
ss << "failed to get tablet. tablet_id=" << tablet_id
Expand All @@ -84,7 +84,7 @@ Status OlapScanner::prepare(
return Status::InternalError(ss.str());
}
{
ReadLock rdlock(_tablet->get_header_lock());
std::shared_lock rdlock(_tablet->get_header_lock());
const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
if (rowset == nullptr) {
std::stringstream ss;
Expand All @@ -97,9 +97,9 @@ Status OlapScanner::prepare(
// to prevent this case: when there are lots of olap scanners to run for example 10000
// the rowsets maybe compacted when the last olap scanner starts
Version rd_version(0, _version);
OLAPStatus acquire_reader_st =
Status acquire_reader_st =
_tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers);
if (acquire_reader_st != OLAP_SUCCESS) {
if (!acquire_reader_st.ok()) {
LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st;
std::stringstream ss;
ss << "failed to initialize storage reader. tablet=" << _tablet->full_name()
Expand All @@ -120,16 +120,16 @@ Status OlapScanner::prepare(

Status OlapScanner::open() {
SCOPED_TIMER(_parent->_reader_init_timer);

SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);

if (_conjunct_ctxs.size() > _parent->_direct_conjunct_size) {
_use_pushdown_conjuncts = true;
}

_runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false);

auto res = _tablet_reader->init(_tablet_reader_params);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init reader.[res=%d]", res);
if (!res.ok()) {
std::stringstream ss;
ss << "failed to initialize storage reader. tablet="
<< _tablet_reader_params.tablet->full_name() << ", res=" << res
Expand Down Expand Up @@ -213,10 +213,10 @@ Status OlapScanner::_init_tablet_reader_params(
}

// use _tablet_reader_params.return_columns, because reader use this to merge sort
OLAPStatus res =
Status res =
_read_row_cursor.init(_tablet->tablet_schema(), _tablet_reader_params.return_columns);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init row cursor.[res=%d]", res);
if (!res.ok()) {
LOG(WARNING) << "fail to init row cursor.res = " << res;
return Status::InternalError("failed to initialize storage read row cursor");
}
_read_row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
Expand Down Expand Up @@ -275,6 +275,7 @@ Status OlapScanner::_init_return_columns() {
}

Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
// 2. Allocate Row's Tuple buf
uint8_t* tuple_buf =
batch->tuple_data_pool()->allocate(state->batch_size() * _tuple_desc->byte_size());
Expand All @@ -286,6 +287,9 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
{
SCOPED_TIMER(_parent->_scan_timer);
ObjectPool tmp_object_pool; // store the object which may can't pass the conjuncts temporarily. pushed all objects into agg_object_pool directly may lead to OOM.
ObjectPool unused_object_pool; // release the memory of the object which can't pass the conjuncts by lot.

while (true) {
// Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break
if (batch->is_full() ||
Expand All @@ -295,9 +299,17 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
break;
}
// Read one row from reader
if (tmp_object_pool.size() > 0) {
unused_object_pool.acquire_data(&tmp_object_pool);
}

if (unused_object_pool.size() >= config::object_pool_buffer_size) {
unused_object_pool.clear();
}

auto res = _tablet_reader->next_row_with_aggregation(&_read_row_cursor, mem_pool.get(),
batch->agg_object_pool(), eof);
if (res != OLAP_SUCCESS) {
&tmp_object_pool, eof);
if (!res.ok()) {
std::stringstream ss;
ss << "Internal Error: read storage fail. res=" << res
<< ", tablet=" << _tablet->full_name()
Expand Down Expand Up @@ -377,13 +389,13 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
const TypeDescriptor& item_type = desc->type().children.at(0);
auto pool = batch->tuple_data_pool();
CollectionValue::deep_copy_collection(
slot, item_type, [pool](int size) -> MemFootprint {
int64_t offset = pool->total_allocated_bytes();
uint8_t* data = pool->allocate(size);
return { offset, data };
},
false
);
slot, item_type,
[pool](int size) -> MemFootprint {
int64_t offset = pool->total_allocated_bytes();
uint8_t* data = pool->allocate(size);
return {offset, data};
},
false);
}
// the memory allocate by mem pool has been copied,
// so we should release these memory immediately
Expand All @@ -395,6 +407,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {

// check direct && pushdown conjuncts success then commit tuple
batch->commit_last_row();
batch->agg_object_pool()->acquire_data(&tmp_object_pool);
char* new_tuple = reinterpret_cast<char*>(tuple);
new_tuple += _tuple_desc->byte_size();
tuple = reinterpret_cast<Tuple*>(new_tuple);
Expand Down