From 91d650ca0c024826cbfae7bad4554bcb650ecfa7 Mon Sep 17 00:00:00 2001 From: Lei Zhang Date: Thu, 12 Jun 2025 17:24:02 +0800 Subject: [PATCH] [feat](cloud) Add unused rowset state for CloudTablet --- be/src/cloud/cloud_tablet.cpp | 57 +++++++++++++++++++++++++++++++ be/src/cloud/cloud_tablet.h | 9 +++++ be/src/cloud/cloud_tablet_mgr.cpp | 16 +++++++++ 3 files changed, 82 insertions(+) diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 039cdb85f22e0e..c044b8361b7af3 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -56,6 +56,8 @@ namespace doris { #include "common/compile_check_begin.h" using namespace ErrorCode; +bvar::Adder g_unused_rowsets_count("unused_rowsets_count"); + static constexpr int LOAD_INITIATOR_ID = -1; CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta) @@ -337,6 +339,19 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ // replace existed rowset with `to_add` rowset. This may occur when: // 1. schema change converts rowsets which have been double written to new tablet // 2. cumu compaction picks single overlapping input rowset to perform compaction + + // add existed rowset to unused_rowsets to remove delete bitmap and recycle cached data + std::vector unused_rowsets; + if (auto find_it = _rs_version_map.find(rs->version()); + find_it != _rs_version_map.end()) { + DCHECK(find_it->second->rowset_id() != rs->rowset_id()) + << "tablet_id=" << tablet_id() + << ", rowset_id=" << rs->rowset_id().to_string() + << ", existed rowset_id=" << find_it->second->rowset_id().to_string(); + unused_rowsets.push_back(find_it->second); + } + add_unused_rowsets(unused_rowsets); + _tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr); _rs_version_map[rs->version()] = rs; _tablet_meta->add_rowsets_unchecked({rs}); @@ -447,9 +462,51 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() { if (config::enable_mow_verbose_log) { LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id()); } + add_unused_rowsets(expired_rowsets); return expired_rowsets.size(); } +bool CloudTablet::need_remove_unused_rowsets() { + std::lock_guard lock(_gc_mutex); + return !_unused_rowsets.empty(); +} + +void CloudTablet::add_unused_rowsets(const std::vector& rowsets) { + std::lock_guard lock(_gc_mutex); + for (const auto& rowset : rowsets) { + _unused_rowsets[rowset->rowset_id()] = rowset; + } + g_unused_rowsets_count << rowsets.size(); +} + +void CloudTablet::remove_unused_rowsets() { + int64_t removed_rowsets_num = 0; + OlapStopWatch watch; + std::lock_guard lock(_gc_mutex); + // 1. remove unused rowsets's cache data and delete bitmap + for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) { + // it->second is std::shared_ptr + auto&& rs = it->second; + if (rs.use_count() > 1) { + LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id() << " has " + << rs.use_count() << " references, it cannot be removed"; + ++it; + continue; + } + rs->clear_cache(); + it = _unused_rowsets.erase(it); + g_unused_rowsets_count << -1; + removed_rowsets_num++; + } + + if (removed_rowsets_num > 0) { + LOG(INFO) << "tablet_id=" << tablet_id() + << ", unused_rowset size=" << _unused_rowsets.size() + << ", removed_rowsets_num=" << removed_rowsets_num + << ", cost(us)=" << watch.get_elapse_time_us(); + } +} + void CloudTablet::update_base_size(const Rowset& rs) { // Define base rowset as the rowset of version [2-x] if (rs.start_version() == 2) { diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 6c41af26d77cce..ee3fdc35c6a2c5 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -261,6 +261,11 @@ class CloudTablet final : public BaseTablet { // check that if the delete bitmap in delete bitmap cache has the same cardinality with the expected_delete_bitmap's Status check_delete_bitmap_cache(int64_t txn_id, DeleteBitmap* expected_delete_bitmap) override; + bool need_remove_unused_rowsets(); + + void add_unused_rowsets(const std::vector& rowsets); + void remove_unused_rowsets(); + private: // FIXME(plat1ko): No need to record base size if rowsets are ordered by version void update_base_size(const Rowset& rs); @@ -320,6 +325,10 @@ class CloudTablet final : public BaseTablet { // Schema will be merged from all rowsets when sync_rowsets TabletSchemaSPtr _merged_tablet_schema; + + // unused_rowsets, [start_version, end_version] + std::mutex _gc_mutex; + std::unordered_map _unused_rowsets; }; using CloudTabletSPtr = std::shared_ptr; diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 17d1b98a95fc77..aa4050f073f246 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -262,6 +262,22 @@ void CloudTabletMgr::vacuum_stale_rowsets(const CountDownLatch& stop_latch) { LOG_INFO("finish vacuum stale rowsets") .tag("num_vacuumed", num_vacuumed) .tag("num_tablets", tablets_to_vacuum.size()); + + { + LOG_INFO("begin to remove unused rowsets"); + std::vector> tablets_to_remove_unused_rowsets; + tablets_to_remove_unused_rowsets.reserve(_tablet_map->size()); + _tablet_map->traverse([&tablets_to_remove_unused_rowsets](auto&& t) { + if (t->need_remove_unused_rowsets()) { + tablets_to_remove_unused_rowsets.push_back(t); + } + }); + for (auto& t : tablets_to_remove_unused_rowsets) { + t->remove_unused_rowsets(); + } + LOG_INFO("finish remove unused rowsets") + .tag("num_tablets", tablets_to_remove_unused_rowsets.size()); + } } std::vector> CloudTabletMgr::get_weak_tablets() {