Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ DEFINE_mBool(enable_query_memory_overcommit, "true");

DEFINE_mBool(disable_memory_gc, "false");

DEFINE_mBool(enable_stacktrace_in_allocator_check_failed, "false");

DEFINE_mInt64(large_memory_check_bytes, "2147483648");

DEFINE_mBool(enable_memory_orphan_check, "true");
Expand Down
5 changes: 4 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,14 @@ DECLARE_mString(process_full_gc_size);
// used memory and the exec_mem_limit will be canceled.
// If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
DECLARE_mBool(enable_query_memory_overcommit);
//waibibabu

// gc will release cache, cancel task, and task will wait for gc to release memory,
// default gc strategy is conservative, if you want to exclude the interference of gc, let it be true
DECLARE_mBool(disable_memory_gc);

// Allocator check failed log stacktrace if not catch exception
DECLARE_mBool(enable_stacktrace_in_allocator_check_failed);

// malloc or new large memory larger than large_memory_check_bytes, default 2G,
// will print a warning containing the stacktrace, but not prevent memory alloc.
// If is -1, disable large memory check.
Expand Down
35 changes: 34 additions & 1 deletion be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,39 @@
#include "runtime/exec_env.h"

namespace doris {
template <typename TAllocator>
PageBase<TAllocator>::PageBase(size_t b, bool use_cache, segment_v2::PageTypePB page_type)
: LRUCacheValueBase(),
_size(b),
_capacity(b),
_use_cache(use_cache),
_page_type(page_type) {
if (_use_cache) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
StoragePageCache::instance()->mem_tracker(_page_type));
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
} else {
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
}
}

template <typename TAllocator>
PageBase<TAllocator>::~PageBase() {
if (_data != nullptr) {
DCHECK(_capacity != 0 && _size != 0);
if (_use_cache) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
StoragePageCache::instance()->mem_tracker(_page_type));
TAllocator::free(_data, _capacity);
} else {
TAllocator::free(_data, _capacity);
}
}
}

template class PageBase<Allocator<true>>;
template class PageBase<Allocator<false>>;

StoragePageCache* StoragePageCache::create_global_cache(size_t capacity,
int32_t index_cache_percentage,
int64_t pk_index_cache_capacity,
Expand Down Expand Up @@ -70,7 +103,7 @@ void StoragePageCache::insert(const CacheKey& key, DataPage* data, PageCacheHand
}

auto* cache = _get_page_cache(page_type);
auto* lru_handle = cache->insert_no_tracking(key.encode(), data, data->capacity(), priority);
auto* lru_handle = cache->insert(key.encode(), data, data->capacity(), 0, priority);
*handle = PageCacheHandle(cache, lru_handle);
}

Expand Down
54 changes: 18 additions & 36 deletions be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,10 @@ template <typename TAllocator>
class PageBase : private TAllocator, public LRUCacheValueBase {
public:
PageBase() = default;

PageBase(size_t b, const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
: LRUCacheValueBase(), _size(b), _capacity(b), _mem_tracker_by_allocator(mem_tracker) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
}

PageBase(size_t b, bool use_cache, segment_v2::PageTypePB page_type);
PageBase(const PageBase&) = delete;
PageBase& operator=(const PageBase&) = delete;

~PageBase() override {
if (_data != nullptr) {
DCHECK(_capacity != 0 && _size != 0);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
TAllocator::free(_data, _capacity);
}
}
~PageBase() override;

char* data() { return _data; }
size_t size() { return _size; }
Expand All @@ -73,7 +60,8 @@ class PageBase : private TAllocator, public LRUCacheValueBase {
// Effective size, smaller than capacity, such as data page remove checksum suffix.
size_t _size = 0;
size_t _capacity = 0;
std::shared_ptr<MemTrackerLimiter> _mem_tracker_by_allocator;
bool _use_cache;
segment_v2::PageTypePB _page_type;
};

using DataPage = PageBase<Allocator<false>>;
Expand Down Expand Up @@ -105,34 +93,28 @@ class StoragePageCache {
}
};

class DataPageCache : public LRUCachePolicy {
class DataPageCache : public LRUCachePolicyTrackingAllocator {
public:
DataPageCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::DATA_PAGE_CACHE, capacity,
LRUCacheType::SIZE, config::data_page_cache_stale_sweep_time_sec,
num_shards) {
init_mem_tracker_by_allocator(lru_cache_type_string(LRUCacheType::SIZE));
}
: LRUCachePolicyTrackingAllocator(
CachePolicy::CacheType::DATA_PAGE_CACHE, capacity, LRUCacheType::SIZE,
config::data_page_cache_stale_sweep_time_sec, num_shards) {}
};

class IndexPageCache : public LRUCachePolicy {
class IndexPageCache : public LRUCachePolicyTrackingAllocator {
public:
IndexPageCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::INDEXPAGE_CACHE, capacity,
LRUCacheType::SIZE, config::index_page_cache_stale_sweep_time_sec,
num_shards) {
init_mem_tracker_by_allocator(lru_cache_type_string(LRUCacheType::SIZE));
}
: LRUCachePolicyTrackingAllocator(
CachePolicy::CacheType::INDEXPAGE_CACHE, capacity, LRUCacheType::SIZE,
config::index_page_cache_stale_sweep_time_sec, num_shards) {}
};

class PKIndexPageCache : public LRUCachePolicy {
class PKIndexPageCache : public LRUCachePolicyTrackingAllocator {
public:
PKIndexPageCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::PK_INDEX_PAGE_CACHE, capacity,
LRUCacheType::SIZE,
config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {
init_mem_tracker_by_allocator(lru_cache_type_string(LRUCacheType::SIZE));
}
: LRUCachePolicyTrackingAllocator(
CachePolicy::CacheType::PK_INDEX_PAGE_CACHE, capacity, LRUCacheType::SIZE,
config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {}
};

static constexpr uint32_t kDefaultNumShards = 16;
Expand Down Expand Up @@ -169,7 +151,7 @@ class StoragePageCache {
segment_v2::PageTypePB page_type, bool in_memory = false);

std::shared_ptr<MemTrackerLimiter> mem_tracker(segment_v2::PageTypePB page_type) {
return _get_page_cache(page_type)->mem_tracker_by_allocator();
return _get_page_cache(page_type)->mem_tracker();
}

private:
Expand All @@ -183,7 +165,7 @@ class StoragePageCache {
// delete bitmap in unique key with mow
std::unique_ptr<PKIndexPageCache> _pk_index_page_cache;

LRUCachePolicy* _get_page_cache(segment_v2::PageTypePB page_type) {
LRUCachePolicyTrackingAllocator* _get_page_cache(segment_v2::PageTypePB page_type) {
switch (page_type) {
case segment_v2::DATA_PAGE: {
return _data_page_cache.get();
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct BitShufflePagePreDecoder : public DataPagePreDecoder {
* @return Status
*/
Status decode(std::unique_ptr<DataPage>* page, Slice* page_slice, size_t size_of_tail,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) override {
bool _use_cache, segment_v2::PageTypePB page_type) override {
size_t num_elements, compressed_size, num_element_after_padding;
int size_of_element;

Expand Down Expand Up @@ -67,7 +67,7 @@ struct BitShufflePagePreDecoder : public DataPagePreDecoder {
decoded_slice.size = size_of_dict_header + BITSHUFFLE_PAGE_HEADER_SIZE +
num_element_after_padding * size_of_element + size_of_tail;
std::unique_ptr<DataPage> decoded_page =
std::make_unique<DataPage>(decoded_slice.size, mem_tracker);
std::make_unique<DataPage>(decoded_slice.size, _use_cache, page_type);
decoded_slice.data = decoded_page->data();

if constexpr (USED_IN_DICT_ENCODING) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/encoding_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ enum EncodingTypePB : int;
class DataPagePreDecoder {
public:
virtual Status decode(std::unique_ptr<DataPage>* page, Slice* page_slice, size_t size_of_tail,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) = 0;
bool _use_cache, segment_v2::PageTypePB page_type) = 0;
virtual ~DataPagePreDecoder() = default;
};

Expand Down
10 changes: 3 additions & 7 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,10 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptr<roarin
return;
}

auto* lru_handle = LRUCachePolicy::insert(key.encode(), (void*)cache_value_ptr.release(),
bitmap->getSizeInBytes(), bitmap->getSizeInBytes(),
CachePriority::NORMAL);
auto* lru_handle = LRUCachePolicyTrackingManual::insert(
key.encode(), (void*)cache_value_ptr.release(), bitmap->getSizeInBytes(),
bitmap->getSizeInBytes(), CachePriority::NORMAL);
*handle = InvertedIndexQueryCacheHandle(this, lru_handle);
}

int64_t InvertedIndexQueryCache::mem_consumption() {
return LRUCachePolicy::mem_consumption();
}

} // namespace doris::segment_v2
42 changes: 20 additions & 22 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ class InvertedIndexSearcherCache {
size_t size = 0;
int64_t last_visit_time;

CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE) {}
CacheValue() = default;
explicit CacheValue(IndexSearcherPtr searcher, size_t mem_size, int64_t visit_time)
: LRUCacheValueBase(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE),
index_searcher(std::move(searcher)) {
: index_searcher(std::move(searcher)) {
size = mem_size;
last_visit_time = visit_time;
}
Expand Down Expand Up @@ -100,23 +99,23 @@ class InvertedIndexSearcherCache {
private:
InvertedIndexSearcherCache() = default;

class InvertedIndexSearcherCachePolicy : public LRUCachePolicy {
class InvertedIndexSearcherCachePolicy : public LRUCachePolicyTrackingManual {
public:
InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards,
uint32_t element_count_capacity)
: LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity,
LRUCacheType::SIZE,
config::inverted_index_cache_stale_sweep_time_sec, num_shards,
element_count_capacity, true) {}
: LRUCachePolicyTrackingManual(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE,
capacity, LRUCacheType::SIZE,
config::inverted_index_cache_stale_sweep_time_sec,
num_shards, element_count_capacity, true) {}
InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards,
uint32_t element_count_capacity,
CacheValueTimeExtractor cache_value_time_extractor,
bool cache_value_check_timestamp)
: LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity,
LRUCacheType::SIZE,
config::inverted_index_cache_stale_sweep_time_sec, num_shards,
element_count_capacity, cache_value_time_extractor,
cache_value_check_timestamp, true) {}
: LRUCachePolicyTrackingManual(
CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity,
LRUCacheType::SIZE, config::inverted_index_cache_stale_sweep_time_sec,
num_shards, element_count_capacity, cache_value_time_extractor,
cache_value_check_timestamp, true) {}
};
// Insert a cache entry by key.
// And the cache entry will be returned in handle.
Expand Down Expand Up @@ -180,8 +179,10 @@ class InvertedIndexCacheHandle {

class InvertedIndexQueryCacheHandle;

class InvertedIndexQueryCache : public LRUCachePolicy {
class InvertedIndexQueryCache : public LRUCachePolicyTrackingManual {
public:
using LRUCachePolicyTrackingManual::insert;

// cache key
struct CacheKey {
io::Path index_path; // index file path
Expand All @@ -208,14 +209,12 @@ class InvertedIndexQueryCache : public LRUCachePolicy {

class CacheValue : public LRUCacheValueBase {
public:
CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE) {}

std::shared_ptr<roaring::Roaring> bitmap;
};

// Create global instance of this class
static InvertedIndexQueryCache* create_global_cache(size_t capacity, uint32_t num_shards = 16) {
InvertedIndexQueryCache* res = new InvertedIndexQueryCache(capacity, num_shards);
auto* res = new InvertedIndexQueryCache(capacity, num_shards);
return res;
}

Expand All @@ -228,16 +227,15 @@ class InvertedIndexQueryCache : public LRUCachePolicy {
InvertedIndexQueryCache() = delete;

InvertedIndexQueryCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE, capacity,
LRUCacheType::SIZE, config::inverted_index_cache_stale_sweep_time_sec,
num_shards) {}
: LRUCachePolicyTrackingManual(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE,
capacity, LRUCacheType::SIZE,
config::inverted_index_cache_stale_sweep_time_sec,
num_shards) {}

bool lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle);

void insert(const CacheKey& key, std::shared_ptr<roaring::Roaring> bitmap,
InvertedIndexQueryCacheHandle* handle);

int64_t mem_consumption();
};

class InvertedIndexQueryCacheHandle {
Expand Down
14 changes: 4 additions & 10 deletions be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,9 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
opts.file_reader->path().native());
}

std::shared_ptr<MemTrackerLimiter> page_mem_tracker;
if (opts.use_page_cache && cache) {
page_mem_tracker = cache->mem_tracker(opts.type);
} else {
page_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
}

// hold compressed page at first, reset to decompressed page later
std::unique_ptr<DataPage> page = std::make_unique<DataPage>(page_size, page_mem_tracker);
std::unique_ptr<DataPage> page =
std::make_unique<DataPage>(page_size, opts.use_page_cache, opts.type);
Slice page_slice(page->data(), page_size);
{
SCOPED_RAW_TIMER(&opts.stats->io_ns);
Expand Down Expand Up @@ -190,7 +184,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
}
SCOPED_RAW_TIMER(&opts.stats->decompress_ns);
std::unique_ptr<DataPage> decompressed_page = std::make_unique<DataPage>(
footer->uncompressed_size() + footer_size + 4, page_mem_tracker);
footer->uncompressed_size() + footer_size + 4, opts.use_page_cache, opts.type);

// decompress page body
Slice compressed_body(page_slice.data, body_size);
Expand Down Expand Up @@ -218,7 +212,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
if (pre_decoder) {
RETURN_IF_ERROR(pre_decoder->decode(
&page, &page_slice, footer->data_page_footer().nullmap_size() + footer_size + 4,
page_mem_tracker));
opts.use_page_cache, opts.type));
}
}

Expand Down
9 changes: 4 additions & 5 deletions be/src/olap/schema_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ using SegmentIteratorUPtr = std::unique_ptr<SegmentIterator>;
// eliminating the need for frequent allocation and deallocation during usage.
// This caching mechanism proves immensely advantageous, particularly in scenarios
// with high concurrency, where queries are executed simultaneously.
class SchemaCache : public LRUCachePolicy {
class SchemaCache : public LRUCachePolicyTrackingManual {
public:
enum class Type { TABLET_SCHEMA = 0, SCHEMA = 1 };

Expand Down Expand Up @@ -104,17 +104,16 @@ class SchemaCache : public LRUCachePolicy {

class CacheValue : public LRUCacheValueBase {
public:
CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::SCHEMA_CACHE) {}

Type type;
// either tablet_schema or schema
TabletSchemaSPtr tablet_schema = nullptr;
SchemaSPtr schema = nullptr;
};

SchemaCache(size_t capacity)
: LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CACHE, capacity, LRUCacheType::NUMBER,
config::schema_cache_sweep_time_sec) {}
: LRUCachePolicyTrackingManual(CachePolicy::CacheType::SCHEMA_CACHE, capacity,
LRUCacheType::NUMBER,
config::schema_cache_sweep_time_sec) {}

private:
static constexpr char SCHEMA_DELIMITER = '-';
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/segment_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ bool SegmentCache::lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle*

void SegmentCache::insert(const SegmentCache::CacheKey& key, SegmentCache::CacheValue& value,
SegmentCacheHandle* handle) {
auto* lru_handle =
LRUCachePolicy::insert(key.encode(), &value, value.segment->meta_mem_usage(),
value.segment->meta_mem_usage(), CachePriority::NORMAL);
auto* lru_handle = LRUCachePolicyTrackingManual::insert(
key.encode(), &value, value.segment->meta_mem_usage(), value.segment->meta_mem_usage(),
CachePriority::NORMAL);
handle->push_segment(this, lru_handle);
}

Expand Down
Loading