Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@

namespace doris {

bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_segment_num(
"file_cache_warm_up_cache_async_submitted_segment_num");

CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env)
: BaseBackendService(exec_env), _engine(engine) {}

Expand Down Expand Up @@ -93,8 +96,15 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
LOG_INFO("receive the warm up request.")
.tag("request_type", "SET_JOB")
.tag("job_id", request.job_id);
st = manager.check_and_set_job_id(request.job_id);
if (!st) {
if (request.__isset.event) {
st = manager.set_event(request.job_id, request.event);
if (st.ok()) {
break;
}
} else {
st = manager.check_and_set_job_id(request.job_id);
}
if (!st.ok()) {
LOG_WARNING("SET_JOB failed.").error(st);
break;
}
Expand Down Expand Up @@ -140,7 +150,11 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
LOG_INFO("receive the warm up request.")
.tag("request_type", "CLEAR_JOB")
.tag("job_id", request.job_id);
st = manager.clear_job(request.job_id);
if (request.__isset.event) {
st = manager.set_event(request.job_id, request.event, /* clear: */ true);
} else {
st = manager.clear_job(request.job_id);
}
break;
}
default:
Expand Down Expand Up @@ -179,6 +193,8 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
PGetFileCacheMetaResponse brpc_response;
brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr);
if (!cntl.Failed()) {
g_file_cache_warm_up_cache_async_submitted_segment_num
<< brpc_response.file_cache_block_metas().size();
_engine.file_cache_block_downloader().submit_download_task(
std::move(*brpc_response.mutable_file_cache_block_metas()));
} else {
Expand Down
266 changes: 266 additions & 0 deletions be/src/cloud/cloud_internal_service.cpp

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions be/src/cloud/cloud_internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ class CloudInternalServiceImpl final : public PInternalService {
PGetFileCacheMetaResponse* response,
google::protobuf::Closure* done) override;

void warm_up_rowset(google::protobuf::RpcController* controller,
const PWarmUpRowsetRequest* request, PWarmUpRowsetResponse* response,
google::protobuf::Closure* done) override;

void recycle_cache(google::protobuf::RpcController* controller,
const PRecycleCacheRequest* request, PRecycleCacheResponse* response,
google::protobuf::Closure* done) override;

private:
CloudStorageEngine& _engine;
};
Expand Down
5 changes: 4 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "cloud/pb_convert.h"
#include "cloud/schema_cloud_dictionary_cache.h"
Expand Down Expand Up @@ -968,7 +969,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string
return st;
}

Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
Expand Down Expand Up @@ -1010,6 +1011,8 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, const std::string&
RETURN_IF_ERROR(
engine.get_schema_cloud_dictionary_cache().refresh_dict(rs_meta_pb.index_id()));
}
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.warm_up_rowset(rs_meta);
return st;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class CloudMetaMgr {
Status prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);

Status commit_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
Status commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);

Status update_tmp_rowset(const RowsetMeta& rs_meta);
Expand Down
163 changes: 135 additions & 28 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/cloud_warm_up_manager.h"
#include "common/config.h"
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
Expand Down Expand Up @@ -62,6 +63,30 @@ bvar::Adder<int64_t> g_unused_rowsets_count("unused_rowsets_count");

static constexpr int LOAD_INITIATOR_ID = -1;

bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size(
"file_cache_cloud_tablet_submitted_segment_size");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num(
"file_cache_cloud_tablet_submitted_segment_num");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_size(
"file_cache_cloud_tablet_submitted_index_size");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_num(
"file_cache_cloud_tablet_submitted_index_num");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_size(
"file_cache_cloud_tablet_finished_segment_size");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_num(
"file_cache_cloud_tablet_finished_segment_num");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_size(
"file_cache_cloud_tablet_finished_index_size");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_num(
"file_cache_cloud_tablet_finished_index_num");

bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_num(
"file_cache_recycle_cached_data_segment_num");
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_size(
"file_cache_recycle_cached_data_segment_size");
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_index_num(
"file_cache_recycle_cached_data_index_num");

CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
: BaseTablet(std::move(tablet_meta)), _engine(engine) {}

Expand Down Expand Up @@ -267,48 +292,83 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
? 0
: rowset_meta->newest_write_timestamp() +
_tablet_meta->ttl_seconds();
// clang-format off
g_file_cache_cloud_tablet_submitted_segment_num << 1;
if (rs->rowset_meta()->segment_file_size(seg_id) > 0) {
g_file_cache_cloud_tablet_submitted_segment_size
<< rs->rowset_meta()->segment_file_size(seg_id);
}
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
.path = storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
.path = storage_resource.value()->remote_segment_path(*rowset_meta,
seg_id),
.file_size = rs->rowset_meta()->segment_file_size(seg_id),
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_dryrun = config::
enable_reader_dryrun_when_download_file_cache,
},
.download_done {},
.download_done {[](Status st) {
if (!st) {
LOG_WARNING("add rowset warm up error ").error(st);
}
}},
});

auto download_idx_file = [&](const io::Path& idx_path) {
auto download_idx_file = [&](const io::Path& idx_path, int64_t idx_size) {
io::DownloadFileMeta meta {
.path = idx_path,
.file_size = -1,
.file_size = idx_size,
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_dryrun = config::
enable_reader_dryrun_when_download_file_cache,
},
.download_done {},
.download_done {[](Status st) {
if (!st) {
LOG_WARNING("add rowset warm up error ").error(st);
}
}},
};
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
g_file_cache_cloud_tablet_submitted_index_num << 1;
g_file_cache_cloud_tablet_submitted_index_size << idx_size;
};
// clang-format on
auto schema_ptr = rowset_meta->tablet_schema();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
if (idx_version == InvertedIndexStorageFormatPB::V1) {
std::unordered_map<int64_t, int64_t> index_size_map;
auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id);
for (const auto& info : inverted_index_info.index_info()) {
if (info.index_file_size() != -1) {
index_size_map[info.index_id()] = info.index_file_size();
} else {
VLOG_DEBUG << "Invalid index_file_size for segment_id " << seg_id
<< ", index_id " << info.index_id();
}
}
for (const auto& index : schema_ptr->inverted_indexes()) {
auto idx_path = storage_resource.value()->remote_idx_v1_path(
*rowset_meta, seg_id, index->index_id(),
index->get_index_suffix());
download_idx_file(idx_path);
download_idx_file(idx_path, index_size_map[index->index_id()]);
}
} else {
if (schema_ptr->has_inverted_index()) {
auto&& inverted_index_info =
rowset_meta->inverted_index_file_info(seg_id);
int64_t idx_size = 0;
if (inverted_index_info.has_index_size()) {
idx_size = inverted_index_info.index_size();
} else {
VLOG_DEBUG << "index_size is not set for segment " << seg_id;
}
auto idx_path = storage_resource.value()->remote_idx_v2_path(
*rowset_meta, seg_id);
download_idx_file(idx_path);
download_idx_file(idx_path, idx_size);
}
}
}
Expand Down Expand Up @@ -507,25 +567,53 @@ void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets
}

void CloudTablet::remove_unused_rowsets() {
int64_t removed_rowsets_num = 0;
int64_t removed_delete_bitmap_num = 0;
std::vector<std::shared_ptr<Rowset>> removed_rowsets;
OlapStopWatch watch;
std::lock_guard<std::mutex> lock(_gc_mutex);
// 1. remove unused rowsets's cache data and delete bitmap
for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
// it->second is std::shared_ptr<Rowset>
auto&& rs = it->second;
if (rs.use_count() > 1) {
LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id() << " has "
<< rs.use_count() << " references, it cannot be removed";
++it;
continue;

{
std::lock_guard<std::mutex> lock(_gc_mutex);
// 1. remove unused rowsets's cache data and delete bitmap
for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
auto& rs = it->second;
if (rs.use_count() > 1) {
LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id()
<< " has " << rs.use_count() << " references, it cannot be removed";
++it;
continue;
}
tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
rs->clear_cache();
removed_rowsets.push_back(std::move(rs));
g_unused_rowsets_count << -1;
it = _unused_rowsets.erase(it);
}
}

{
std::vector<RowsetId> rowset_ids;
std::vector<int64_t> num_segments;
std::vector<std::vector<std::string>> index_file_names;

for (auto& rs : removed_rowsets) {
rowset_ids.push_back(rs->rowset_id());
num_segments.push_back(rs->num_segments());
auto index_names = rs->get_index_file_names();
index_file_names.push_back(index_names);
int64_t segment_size_sum = 0;
for (int32_t i = 0; i < rs->num_segments(); i++) {
segment_size_sum += rs->rowset_meta()->segment_file_size(i);
}
g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
g_file_cache_recycle_cached_data_index_num << index_names.size();
}

if (removed_rowsets.size() > 0) {
auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.recycle_cache(tablet_id(), rowset_ids, num_segments, index_file_names);
}
tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
rs->clear_cache();
it = _unused_rowsets.erase(it);
g_unused_rowsets_count << -1;
removed_rowsets_num++;
}

// 2. remove delete bitmap of pre rowsets
Expand All @@ -552,7 +640,7 @@ void CloudTablet::remove_unused_rowsets() {

LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size()
<< ", unused_delete_bitmap size=" << _unused_delete_bitmap.size()
<< ", removed_rowsets_num=" << removed_rowsets_num
<< ", removed_rowsets_num=" << removed_rowsets.size()
<< ", removed_delete_bitmap_num=" << removed_delete_bitmap_num
<< ", cost(us)=" << watch.get_elapse_time_us();
}
Expand All @@ -570,14 +658,33 @@ void CloudTablet::clear_cache() {
}

void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets) {
std::vector<RowsetId> rowset_ids;
std::vector<int64_t> num_segments;
std::vector<std::vector<std::string>> index_file_names;
for (const auto& rs : rowsets) {
// rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
if (rs.use_count() > 2) {
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
return;
continue;
}
rs->clear_cache();
rowset_ids.push_back(rs->rowset_id());
num_segments.push_back(rs->num_segments());
auto index_names = rs->get_index_file_names();
index_file_names.push_back(index_names);
int64_t segment_size_sum = 0;
for (int32_t i = 0; i < rs->num_segments(); i++) {
segment_size_sum += rs->rowset_meta()->segment_file_size(i);
}
g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
g_file_cache_recycle_cached_data_index_num << index_names.size();
}
if (!rowsets.empty()) {
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.recycle_cache(rowsets.front()->rowset_meta()->tablet_id(), rowset_ids, num_segments,
index_file_names);
}
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,6 @@ class CloudTablet final : public BaseTablet {

void build_tablet_report_info(TTabletInfo* tablet_info);

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

// check that if the delete bitmap in delete bitmap cache has the same cardinality with the expected_delete_bitmap's
Status check_delete_bitmap_cache(int64_t txn_id, DeleteBitmap* expected_delete_bitmap) override;

Expand All @@ -273,6 +271,8 @@ class CloudTablet final : public BaseTablet {
void add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets);
void remove_unused_rowsets();

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
void update_base_size(const Rowset& rs);
Expand Down
Loading
Loading