Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,8 @@ DEFINE_mInt64(mow_primary_key_index_max_size_in_memory, "52428800");
// current txn's publishing version and the max version of the tablet exceeds this value,
// don't print warning log
DEFINE_mInt32(publish_version_gap_logging_threshold, "200");
// get agg by cache for mow table
DEFINE_mBool(enable_mow_get_agg_by_cache, "true");

// The secure path with user files, used in the `local` table function.
DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,8 @@ DECLARE_mInt64(mow_primary_key_index_max_size_in_memory);
// current txn's publishing version and the max version of the tablet exceeds this value,
// don't print warning log
DECLARE_mInt32(publish_version_gap_logging_threshold);
// get agg by cache for mow table
DECLARE_mBool(enable_mow_get_agg_by_cache);

// The secure path with user files, used in the `local` table function.
DECLARE_mString(user_files_secure_path);
Expand Down
58 changes: 57 additions & 1 deletion be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -928,19 +928,29 @@ void TabletMeta::revise_delete_bitmap_unlocked(const DeleteBitmap& delete_bitmap
}

void TabletMeta::delete_stale_rs_meta_by_version(const Version& version) {
size_t rowset_cache_version_size = 0;
auto it = _stale_rs_metas.begin();
while (it != _stale_rs_metas.end()) {
if ((*it)->version() == version) {
if (_enable_unique_key_merge_on_write) {
// remove rowset delete bitmap
delete_bitmap().remove({(*it)->rowset_id(), 0, 0},
{(*it)->rowset_id(), UINT32_MAX, 0});
rowset_cache_version_size =
delete_bitmap().remove_rowset_cache_version((*it)->rowset_id());
}
it = _stale_rs_metas.erase(it);
} else {
it++;
}
}
if (_enable_unique_key_merge_on_write) {
DCHECK(rowset_cache_version_size <= _rs_metas.size() + _stale_rs_metas.size())
<< "tablet: " << _tablet_id
<< ", rowset_cache_version size: " << rowset_cache_version_size
<< ", _rs_metas size: " << _rs_metas.size()
<< ", _stale_rs_metas size:" << _stale_rs_metas.size();
}
}

RowsetMetaSharedPtr TabletMeta::acquire_rs_meta_by_version(const Version& version) const {
Expand Down Expand Up @@ -1283,6 +1293,24 @@ bool DeleteBitmap::has_calculated_for_multi_segments(const RowsetId& rowset_id)
return contains({rowset_id, INVALID_SEGMENT_ID, TEMP_VERSION_COMMON}, ROWSET_SENTINEL_MARK);
}

size_t DeleteBitmap::remove_rowset_cache_version(const RowsetId& rowset_id) {
std::lock_guard l(_rowset_cache_version_lock);
_rowset_cache_version.erase(rowset_id);
return _rowset_cache_version.size();
}

DeleteBitmap::Version DeleteBitmap::_get_rowset_cache_version(const BitmapKey& bmk) const {
std::shared_lock l(_rowset_cache_version_lock);
if (auto it = _rowset_cache_version.find(std::get<0>(bmk)); it != _rowset_cache_version.end()) {
auto& segment_cache_version = it->second;
if (auto it1 = segment_cache_version.find(std::get<1>(bmk));
it1 != segment_cache_version.end()) {
return it1->second;
}
}
return 0;
}

// We cannot just copy the underlying memory to construct a string
// due to equivalent objects may have different padding bytes.
// Reading padding bytes is undefined behavior, neither copy nor
Expand Down Expand Up @@ -1314,9 +1342,28 @@ std::shared_ptr<roaring::Roaring> DeleteBitmap::get_agg(const BitmapKey& bmk) co
// of cache entries in some cases?
if (val == nullptr) { // Renew if needed, put a new Value to cache
val = new AggCache::Value();
Version start_version =
config::enable_mow_get_agg_by_cache ? _get_rowset_cache_version(bmk) : 0;
if (start_version > 0) {
Cache::Handle* handle2 = _agg_cache->repr()->lookup(
agg_cache_key(_tablet_id, {std::get<0>(bmk), std::get<1>(bmk), start_version}));
if (handle2 == nullptr) {
start_version = 0;
} else {
val->bitmap |=
reinterpret_cast<AggCache::Value*>(_agg_cache->repr()->value(handle2))
->bitmap;
_agg_cache->repr()->release(handle2);
VLOG_DEBUG << "get agg cache version=" << start_version
<< " for tablet=" << _tablet_id
<< ", rowset=" << std::get<0>(bmk).to_string()
<< ", segment=" << std::get<1>(bmk);
start_version += 1;
}
}
{
std::shared_lock l(lock);
DeleteBitmap::BitmapKey start {std::get<0>(bmk), std::get<1>(bmk), 0};
DeleteBitmap::BitmapKey start {std::get<0>(bmk), std::get<1>(bmk), start_version};
for (auto it = delete_bitmap.lower_bound(start); it != delete_bitmap.end(); ++it) {
auto& [k, bm] = *it;
if (std::get<0>(k) != std::get<0>(bmk) || std::get<1>(k) != std::get<1>(bmk) ||
Expand All @@ -1328,6 +1375,15 @@ std::shared_ptr<roaring::Roaring> DeleteBitmap::get_agg(const BitmapKey& bmk) co
}
size_t charge = val->bitmap.getSizeInBytes() + sizeof(AggCache::Value);
handle = _agg_cache->repr()->insert(key, val, charge, charge, CachePriority::NORMAL);
if (config::enable_mow_get_agg_by_cache && !val->bitmap.isEmpty()) {
std::lock_guard l(_rowset_cache_version_lock);
// this version is already agg
_rowset_cache_version[std::get<0>(bmk)][std::get<1>(bmk)] = std::get<2>(bmk);
VLOG_DEBUG << "set agg cache version=" << std::get<2>(bmk)
<< " for tablet=" << _tablet_id
<< ", rowset=" << std::get<0>(bmk).to_string()
<< ", segment=" << std::get<1>(bmk);
}
}

// It is natural for the cache to reclaim the underlying memory
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,9 @@ class DeleteBitmap {

bool has_calculated_for_multi_segments(const RowsetId& rowset_id) const;

// return the size of the map
size_t remove_rowset_cache_version(const RowsetId& rowset_id);

class AggCachePolicy : public LRUCachePolicy {
public:
AggCachePolicy(size_t capacity)
Expand Down Expand Up @@ -588,8 +591,12 @@ class DeleteBitmap {
};

private:
DeleteBitmap::Version _get_rowset_cache_version(const BitmapKey& bmk) const;

mutable std::shared_ptr<AggCache> _agg_cache;
int64_t _tablet_id;
mutable std::shared_mutex _rowset_cache_version_lock;
mutable std::map<RowsetId, std::map<SegmentId, Version>> _rowset_cache_version;
// <version, <tablet_id, BitmapKeyStart, BitmapKeyEnd>>
std::map<std::string,
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>>
Expand Down
Loading