Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;

bvar::Adder<int64_t> g_unused_rowsets_count("unused_rowsets_count");

static constexpr int LOAD_INITIATOR_ID = -1;

CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
Expand Down Expand Up @@ -337,6 +339,19 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
// replace existed rowset with `to_add` rowset. This may occur when:
// 1. schema change converts rowsets which have been double written to new tablet
// 2. cumu compaction picks single overlapping input rowset to perform compaction

// add existed rowset to unused_rowsets to remove delete bitmap and recycle cached data
std::vector<RowsetSharedPtr> unused_rowsets;
if (auto find_it = _rs_version_map.find(rs->version());
find_it != _rs_version_map.end()) {
DCHECK(find_it->second->rowset_id() != rs->rowset_id())
<< "tablet_id=" << tablet_id()
<< ", rowset_id=" << rs->rowset_id().to_string()
<< ", existed rowset_id=" << find_it->second->rowset_id().to_string();
unused_rowsets.push_back(find_it->second);
}
add_unused_rowsets(unused_rowsets);

_tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr);
_rs_version_map[rs->version()] = rs;
_tablet_meta->add_rowsets_unchecked({rs});
Expand Down Expand Up @@ -447,9 +462,51 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
if (config::enable_mow_verbose_log) {
LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id());
}
add_unused_rowsets(expired_rowsets);
return expired_rowsets.size();
}

bool CloudTablet::need_remove_unused_rowsets() {
std::lock_guard<std::mutex> lock(_gc_mutex);
return !_unused_rowsets.empty();
}

void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
std::lock_guard<std::mutex> lock(_gc_mutex);
for (const auto& rowset : rowsets) {
_unused_rowsets[rowset->rowset_id()] = rowset;
}
g_unused_rowsets_count << rowsets.size();
}

void CloudTablet::remove_unused_rowsets() {
int64_t removed_rowsets_num = 0;
OlapStopWatch watch;
std::lock_guard<std::mutex> lock(_gc_mutex);
// 1. remove unused rowsets's cache data and delete bitmap
for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
// it->second is std::shared_ptr<Rowset>
auto&& rs = it->second;
if (rs.use_count() > 1) {
LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id() << " has "
<< rs.use_count() << " references, it cannot be removed";
++it;
continue;
}
rs->clear_cache();
it = _unused_rowsets.erase(it);
g_unused_rowsets_count << -1;
removed_rowsets_num++;
}

if (removed_rowsets_num > 0) {
LOG(INFO) << "tablet_id=" << tablet_id()
<< ", unused_rowset size=" << _unused_rowsets.size()
<< ", removed_rowsets_num=" << removed_rowsets_num
<< ", cost(us)=" << watch.get_elapse_time_us();
}
}

void CloudTablet::update_base_size(const Rowset& rs) {
// Define base rowset as the rowset of version [2-x]
if (rs.start_version() == 2) {
Expand Down
9 changes: 9 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ class CloudTablet final : public BaseTablet {
// check that if the delete bitmap in delete bitmap cache has the same cardinality with the expected_delete_bitmap's
Status check_delete_bitmap_cache(int64_t txn_id, DeleteBitmap* expected_delete_bitmap) override;

bool need_remove_unused_rowsets();

void add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets);
void remove_unused_rowsets();

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
void update_base_size(const Rowset& rs);
Expand Down Expand Up @@ -320,6 +325,10 @@ class CloudTablet final : public BaseTablet {

// Schema will be merged from all rowsets when sync_rowsets
TabletSchemaSPtr _merged_tablet_schema;

// unused_rowsets, [start_version, end_version]
std::mutex _gc_mutex;
std::unordered_map<RowsetId, RowsetSharedPtr> _unused_rowsets;
};

using CloudTabletSPtr = std::shared_ptr<CloudTablet>;
Expand Down
16 changes: 16 additions & 0 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,22 @@ void CloudTabletMgr::vacuum_stale_rowsets(const CountDownLatch& stop_latch) {
LOG_INFO("finish vacuum stale rowsets")
.tag("num_vacuumed", num_vacuumed)
.tag("num_tablets", tablets_to_vacuum.size());

{
LOG_INFO("begin to remove unused rowsets");
std::vector<std::shared_ptr<CloudTablet>> tablets_to_remove_unused_rowsets;
tablets_to_remove_unused_rowsets.reserve(_tablet_map->size());
_tablet_map->traverse([&tablets_to_remove_unused_rowsets](auto&& t) {
if (t->need_remove_unused_rowsets()) {
tablets_to_remove_unused_rowsets.push_back(t);
}
});
for (auto& t : tablets_to_remove_unused_rowsets) {
t->remove_unused_rowsets();
}
LOG_INFO("finish remove unused rowsets")
.tag("num_tablets", tablets_to_remove_unused_rowsets.size());
}
}

std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() {
Expand Down
Loading