Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 55 additions & 15 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -859,29 +859,40 @@ void TabletMeta::add_rowsets_unchecked(const std::vector<RowsetSharedPtr>& to_ad

void TabletMeta::delete_rs_meta_by_version(const Version& version,
std::vector<RowsetMetaSharedPtr>* deleted_rs_metas) {
size_t rowset_cache_version_size = 0;
auto it = _rs_metas.begin();
while (it != _rs_metas.end()) {
if ((*it)->version() == version) {
if (deleted_rs_metas != nullptr) {
deleted_rs_metas->push_back(*it);
}
_rs_metas.erase(it);
if (_enable_unique_key_merge_on_write) {
rowset_cache_version_size =
_delete_bitmap->remove_rowset_cache_version((*it)->rowset_id());
}
return;
} else {
++it;
}
}
_check_mow_rowset_cache_version_size(rowset_cache_version_size);
}

void TabletMeta::modify_rs_metas(const std::vector<RowsetMetaSharedPtr>& to_add,
const std::vector<RowsetMetaSharedPtr>& to_delete,
bool same_version) {
size_t rowset_cache_version_size = 0;
// Remove to_delete rowsets from _rs_metas
for (auto rs_to_del : to_delete) {
auto it = _rs_metas.begin();
while (it != _rs_metas.end()) {
if (rs_to_del->version() == (*it)->version()) {
_rs_metas.erase(it);
if (_enable_unique_key_merge_on_write) {
rowset_cache_version_size =
_delete_bitmap->remove_rowset_cache_version((*it)->rowset_id());
}
// there should be only one rowset match the version
break;
} else {
Expand All @@ -895,16 +906,22 @@ void TabletMeta::modify_rs_metas(const std::vector<RowsetMetaSharedPtr>& to_add,
}
// put to_add rowsets in _rs_metas.
_rs_metas.insert(_rs_metas.end(), to_add.begin(), to_add.end());
_check_mow_rowset_cache_version_size(rowset_cache_version_size);
}

// Use the passing "rs_metas" to replace the rs meta in this tablet meta
// Also clear the _stale_rs_metas because this tablet meta maybe copyied from
// an existing tablet before. Add after revise, only the passing "rs_metas"
// is needed.
void TabletMeta::revise_rs_metas(std::vector<RowsetMetaSharedPtr>&& rs_metas) {
std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
_rs_metas = std::move(rs_metas);
_stale_rs_metas.clear();
{
std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
_rs_metas = std::move(rs_metas);
_stale_rs_metas.clear();
}
if (_enable_unique_key_merge_on_write) {
_delete_bitmap->clear_rowset_cache_version();
}
}

// This method should call after revise_rs_metas, since new rs_metas might be a subset
Expand Down Expand Up @@ -945,18 +962,7 @@ void TabletMeta::delete_stale_rs_meta_by_version(const Version& version) {
it++;
}
}
if (_enable_unique_key_merge_on_write &&
rowset_cache_version_size > _rs_metas.size() + _stale_rs_metas.size()) {
std::string err_msg = fmt::format(
". tablet: {}, rowset_cache_version size: {}, "
"_rs_metas size: {}, _stale_rs_metas size: {}",
_tablet_id, rowset_cache_version_size, _rs_metas.size(), _stale_rs_metas.size());
if (config::enable_mow_get_agg_correctness_check_core) {
CHECK(false) << err_msg;
} else {
DCHECK(false) << err_msg;
}
}
_check_mow_rowset_cache_version_size(rowset_cache_version_size);
}

RowsetMetaSharedPtr TabletMeta::acquire_rs_meta_by_version(const Version& version) const {
Expand Down Expand Up @@ -986,6 +992,35 @@ Status TabletMeta::set_partition_id(int64_t partition_id) {
return Status::OK();
}

void TabletMeta::clear_stale_rowset() {
_stale_rs_metas.clear();
if (_enable_unique_key_merge_on_write) {
_delete_bitmap->clear_rowset_cache_version();
}
}

void TabletMeta::clear_rowsets() {
_rs_metas.clear();
if (_enable_unique_key_merge_on_write) {
_delete_bitmap->clear_rowset_cache_version();
}
}

void TabletMeta::_check_mow_rowset_cache_version_size(size_t rowset_cache_version_size) {
if (_enable_unique_key_merge_on_write &&
rowset_cache_version_size > _rs_metas.size() + _stale_rs_metas.size()) {
std::string err_msg = fmt::format(
". tablet: {}, rowset_cache_version size: {}, "
"_rs_metas size: {}, _stale_rs_metas size: {}",
_tablet_id, rowset_cache_version_size, _rs_metas.size(), _stale_rs_metas.size());
if (config::enable_mow_get_agg_correctness_check_core) {
CHECK(false) << err_msg;
} else {
DCHECK(false) << err_msg;
}
}
}

bool operator==(const TabletMeta& a, const TabletMeta& b) {
if (a._table_id != b._table_id) return false;
if (a._index_id != b._index_id) return false;
Expand Down Expand Up @@ -1305,6 +1340,11 @@ size_t DeleteBitmap::remove_rowset_cache_version(const RowsetId& rowset_id) {
return _rowset_cache_version.size();
}

void DeleteBitmap::clear_rowset_cache_version() {
std::lock_guard l(_rowset_cache_version_lock);
_rowset_cache_version.clear();
}

DeleteBitmap::Version DeleteBitmap::_get_rowset_cache_version(const BitmapKey& bmk) const {
std::shared_lock l(_rowset_cache_version_lock);
if (auto it = _rowset_cache_version.find(std::get<0>(bmk)); it != _rowset_cache_version.end()) {
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ class TabletMeta : public MetadataAdder<TabletMeta> {
}

// used for after tablet cloned to clear stale rowset
void clear_stale_rowset() { _stale_rs_metas.clear(); }
void clear_stale_rowset();

void clear_rowsets() { _rs_metas.clear(); }
void clear_rowsets();

// MUST hold EXCLUSIVE `_meta_lock` in belonged Tablet
// `to_add` MUST NOT have overlapped version with `_rs_metas` in tablet meta.
Expand Down Expand Up @@ -301,6 +301,7 @@ class TabletMeta : public MetadataAdder<TabletMeta> {

private:
Status _save_meta(DataDir* data_dir);
void _check_mow_rowset_cache_version_size(size_t rowset_cache_version_size);

// _del_predicates is ignored to compare.
friend bool operator==(const TabletMeta& a, const TabletMeta& b);
Expand Down Expand Up @@ -561,6 +562,8 @@ class DeleteBitmap {
// return the size of the map
size_t remove_rowset_cache_version(const RowsetId& rowset_id);

void clear_rowset_cache_version();

class AggCachePolicy : public LRUCachePolicy {
public:
AggCachePolicy(size_t capacity)
Expand Down
Loading