Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 20 additions & 22 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,11 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
}

// if the rowset is not used by any query, we can recycle its cached data early.
recycle_cached_data(expired_rowsets);
auto recycled_rowsets = recycle_cached_data(expired_rowsets);
if (!recycled_rowsets.empty()) {
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.recycle_cache(tablet_id(), recycled_rowsets);
}
if (config::enable_mow_verbose_log) {
LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id());
}
Expand Down Expand Up @@ -609,15 +613,11 @@ void CloudTablet::remove_unused_rowsets() {
}

{
std::vector<RowsetId> rowset_ids;
std::vector<int64_t> num_segments;
std::vector<std::vector<std::string>> index_file_names;
std::vector<RecycledRowsets> recycled_rowsets;

for (auto& rs : removed_rowsets) {
rowset_ids.push_back(rs->rowset_id());
num_segments.push_back(rs->num_segments());
auto index_names = rs->get_index_file_names();
index_file_names.push_back(index_names);
recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
int64_t segment_size_sum = 0;
for (int32_t i = 0; i < rs->num_segments(); i++) {
segment_size_sum += rs->rowset_meta()->segment_file_size(i);
Expand All @@ -627,10 +627,10 @@ void CloudTablet::remove_unused_rowsets() {
g_file_cache_recycle_cached_data_index_num << index_names.size();
}

if (removed_rowsets.size() > 0) {
if (recycled_rowsets.size() > 0) {
auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.recycle_cache(tablet_id(), rowset_ids, num_segments, index_file_names);
manager.recycle_cache(tablet_id(), recycled_rowsets);
}
}

Expand Down Expand Up @@ -675,14 +675,17 @@ void CloudTablet::update_base_size(const Rowset& rs) {
}

void CloudTablet::clear_cache() {
CloudTablet::recycle_cached_data(get_snapshot_rowset(true));
auto recycled_rowsets = CloudTablet::recycle_cached_data(get_snapshot_rowset(true));
if (!recycled_rowsets.empty()) {
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.recycle_cache(tablet_id(), recycled_rowsets);
}
_engine.tablet_mgr().erase_tablet(tablet_id());
}

void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets) {
std::vector<RowsetId> rowset_ids;
std::vector<int64_t> num_segments;
std::vector<std::vector<std::string>> index_file_names;
std::vector<RecycledRowsets> CloudTablet::recycle_cached_data(
const std::vector<RowsetSharedPtr>& rowsets) {
std::vector<RecycledRowsets> recycled_rowsets;
for (const auto& rs : rowsets) {
// rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
if (rs.use_count() > 2) {
Expand All @@ -691,10 +694,9 @@ void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowset
continue;
}
rs->clear_cache();
rowset_ids.push_back(rs->rowset_id());
num_segments.push_back(rs->num_segments());
auto index_names = rs->get_index_file_names();
index_file_names.push_back(index_names);
recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);

int64_t segment_size_sum = 0;
for (int32_t i = 0; i < rs->num_segments(); i++) {
segment_size_sum += rs->rowset_meta()->segment_file_size(i);
Expand All @@ -703,11 +705,7 @@ void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowset
g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
g_file_cache_recycle_cached_data_index_num << index_names.size();
}
if (!rowsets.empty()) {
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.recycle_cache(rowsets.front()->rowset_meta()->tablet_id(), rowset_ids, num_segments,
index_file_names);
}
return recycled_rowsets;
}

void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments,
Expand Down
11 changes: 10 additions & 1 deletion be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ struct SyncOptions {
int64_t query_version = -1;
};

struct RecycledRowsets {
RowsetId rowset_id;
int64_t num_segments;
std::vector<std::string> index_file_names;
};

class CloudTablet final : public BaseTablet {
public:
CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta);
Expand Down Expand Up @@ -284,7 +290,10 @@ class CloudTablet final : public BaseTablet {
void add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets);
void remove_unused_rowsets();

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);
// For each given rowset not in active use, clears its file cache and returns its
// ID, segment count, and index file names as RecycledRowsets entries.
static std::vector<RecycledRowsets> recycle_cached_data(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment to describe the behavior, what is the returned value and the input param

const std::vector<RowsetSharedPtr>& rowsets);

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
Expand Down
26 changes: 10 additions & 16 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,36 +581,30 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta) {
}
}

void CloudWarmUpManager::recycle_cache(
int64_t tablet_id, const std::vector<RowsetId>& rowset_ids,
const std::vector<int64_t>& num_segments,
const std::vector<std::vector<std::string>>& index_file_names) {
LOG(INFO) << "recycle_cache: tablet_id=" << tablet_id << ", num_rowsets=" << rowset_ids.size();
void CloudWarmUpManager::recycle_cache(int64_t tablet_id,
const std::vector<RecycledRowsets>& rowsets) {
LOG(INFO) << "recycle_cache: tablet_id=" << tablet_id << ", num_rowsets=" << rowsets.size();
auto replicas = get_replica_info(tablet_id);
if (replicas.empty()) {
return;
}
if (rowset_ids.size() != num_segments.size()) {
LOG(WARNING) << "recycle_cache: rowset_ids size mismatch with num_segments";
return;
}

PRecycleCacheRequest request;
for (int i = 0; i < rowset_ids.size(); i++) {
for (const auto& rowset : rowsets) {
RecycleCacheMeta* meta = request.add_cache_metas();
meta->set_tablet_id(tablet_id);
meta->set_rowset_id(rowset_ids[i].to_string());
meta->set_num_segments(num_segments[i]);
for (const auto& name : index_file_names[i]) {
meta->set_rowset_id(rowset.rowset_id.to_string());
meta->set_num_segments(rowset.num_segments);
for (const auto& name : rowset.index_file_names) {
meta->add_index_file_names(name);
}
g_file_cache_recycle_cache_requested_segment_num << num_segments[i];
g_file_cache_recycle_cache_requested_index_num << index_file_names[i].size();
g_file_cache_recycle_cache_requested_segment_num << rowset.num_segments;
g_file_cache_recycle_cache_requested_index_num << rowset.index_file_names.size();
}
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
for (auto& replica : replicas) {
// send sync request
std::string host = replica.host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(replica.host)) {
Expand Down
5 changes: 2 additions & 3 deletions be/src/cloud/cloud_warm_up_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <vector>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "common/status.h"
#include "gen_cpp/BackendService.h"

Expand Down Expand Up @@ -73,9 +74,7 @@ class CloudWarmUpManager {

void warm_up_rowset(RowsetMeta& rs_meta);

void recycle_cache(int64_t tablet_id, const std::vector<RowsetId>& rowset_ids,
const std::vector<int64_t>& num_segments,
const std::vector<std::vector<std::string>>& index_file_names);
void recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);

private:
void handle_jobs();
Expand Down
Loading