Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
92d2177
in one
bobhan1 Aug 20, 2025
060c745
add option enable_prefer_cached_rowset
bobhan1 Aug 21, 2025
f42b067
set visible ts for rowset in lazy committer and sub txn commit'
bobhan1 Aug 21, 2025
94af6df
change visible_time_ms to visible_ts_ms
bobhan1 Aug 21, 2025
e823317
add callback
bobhan1 Aug 21, 2025
a9db013
add query prefer cache impl
bobhan1 Aug 21, 2025
9d2ef2f
add user property for query_freshness_tolerance_ms
bobhan1 Aug 21, 2025
5a56cab
add enable_prefer_cached_rowset session var and user property
bobhan1 Aug 21, 2025
78aef93
add capture_consistent_versions_prefer_cache
bobhan1 Aug 22, 2025
97174cf
fix typo
bobhan1 Aug 22, 2025
d55db6d
add session var property case
bobhan1 Aug 22, 2025
561b48e
tmp add cases
bobhan1 Aug 22, 2025
aaa52ce
fix test_query_freshness_tolerance
bobhan1 Aug 22, 2025
2c8a736
fix test_enable_prefer_cached_rowset case
bobhan1 Aug 22, 2025
cd6726f
update
bobhan1 Aug 24, 2025
9b65e96
add bvar for rowset warmup task
bobhan1 Sep 5, 2025
b0e23bb
update case
bobhan1 Sep 5, 2025
4e87180
fix test_read_cluster_var_property case
bobhan1 Sep 8, 2025
7ec1fd4
move cases
bobhan1 Sep 8, 2025
d8cdd86
fix
bobhan1 Sep 8, 2025
e116165
tmo
bobhan1 Sep 9, 2025
2e68f6b
consider inverted idx file
bobhan1 Sep 10, 2025
1aa0b8e
t,p
bobhan1 Sep 10, 2025
e256bee
print stack
bobhan1 Sep 10, 2025
29f872f
tmp
bobhan1 Sep 11, 2025
b9b3dda
update
bobhan1 Sep 12, 2025
9e315ab
update
bobhan1 Sep 12, 2025
1dd2406
add metrics exclude warmup
bobhan1 Sep 12, 2025
efe1928
update
bobhan1 Sep 12, 2025
9a4e933
fix
bobhan1 Sep 15, 2025
12932b9
add log
bobhan1 Sep 16, 2025
7e3cfc6
add log
bobhan1 Sep 16, 2025
8dca3c1
update
bobhan1 Sep 16, 2025
42534cb
add comment
bobhan1 Sep 17, 2025
6bb6fb7
Merge branch 'master' into query-freshness-session-var
dataroaring Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 56 additions & 36 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "util/debug_points.h"

namespace doris {
#include "common/compile_check_avoid_begin.h"
Expand Down Expand Up @@ -190,6 +191,7 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
continue;
}
int64_t tablet_id = rs_meta.tablet_id();
auto rowset_id = rs_meta.rowset_id();
bool local_only = !(request->has_skip_existence_check() && request->skip_existence_check());
auto res = _engine.tablet_mgr().get_tablet(tablet_id, /* warmup_data = */ false,
/* sync_delete_bitmap = */ true,
Expand All @@ -216,7 +218,7 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
g_file_cache_warm_up_rowset_request_to_handle_slow_count << 1;
LOG(INFO) << "warm up rowset (request to handle) took " << handle_ts - request_ts
<< " us, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id().to_string();
<< ", rowset_id: " << rowset_id.to_string();
}
int64_t expiration_time =
tablet_meta->ttl_seconds() == 0 || rs_meta.newest_write_timestamp() <= 0
Expand All @@ -227,16 +229,26 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
}

if (!tablet->add_rowset_warmup_state(rs_meta, WarmUpState::TRIGGERED_BY_JOB)) {
LOG(INFO) << "found duplicate warmup task for rowset " << rs_meta.rowset_id()
LOG(INFO) << "found duplicate warmup task for rowset " << rowset_id.to_string()
<< ", skip it";
continue;
}

for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
auto download_done = [&, tablet_id = rs_meta.tablet_id(),
rowset_id = rs_meta.rowset_id().to_string(),
segment_size = rs_meta.segment_file_size(segment_id),
wait](Status st) {
auto segment_size = rs_meta.segment_file_size(segment_id);
auto download_done = [=, version = rs_meta.version()](Status st) {
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}",
rowset_id.to_string(), version.to_string(), sleep_time);
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
});
DBUG_EXECUTE_IF(
"CloudInternalServiceImpl::warm_up_rowset.download_segment.inject_error", {
st = Status::InternalError("injected error");
LOG_INFO("[verbose] inject error, tablet={}, rowset={}, st={}",
tablet_id, rowset_id.to_string(), st.to_string());
});
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_segment_num << 1;
g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
Expand All @@ -250,25 +262,27 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
now_ts - request_ts > config::warm_up_rowset_slow_log_ms * 1000) {
g_file_cache_warm_up_rowset_slow_count << 1;
LOG(INFO) << "warm up rowset took " << now_ts - request_ts
<< " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id
<< " us, tablet_id: " << tablet_id
<< ", rowset_id: " << rowset_id.to_string()
<< ", segment_id: " << segment_id;
}
if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) {
g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1;
LOG(INFO) << "warm up rowset (handle to finish) took " << now_ts - handle_ts
<< " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id
<< " us, tablet_id: " << tablet_id
<< ", rowset_id: " << rowset_id.to_string()
<< ", segment_id: " << segment_id;
}
} else {
g_file_cache_event_driven_warm_up_failed_segment_num << 1;
g_file_cache_event_driven_warm_up_failed_segment_size << segment_size;
LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id << ", error: " << st;
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
}
if (tablet->complete_rowset_segment_warmup(rs_meta.rowset_id(), st) ==
if (tablet->complete_rowset_segment_warmup(rowset_id, st, 1, 0) ==
WarmUpState::DONE) {
VLOG_DEBUG << "warmup rowset " << rs_meta.version() << "(" << rowset_id
<< ") completed";
VLOG_DEBUG << "warmup rowset " << version.to_string() << "("
<< rowset_id.to_string() << ") completed";
}
if (wait) {
wait->signal();
Expand All @@ -277,31 +291,35 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c

io::DownloadFileMeta download_meta {
.path = storage_resource.value()->remote_segment_path(rs_meta, segment_id),
.file_size = rs_meta.segment_file_size(segment_id),
.file_size = segment_size,
.offset = 0,
.download_size = rs_meta.segment_file_size(segment_id),
.download_size = segment_size,
.file_system = storage_resource.value()->fs,
.ctx =
{
.is_index_data = false,
.expiration_time = expiration_time,
.is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
},
.ctx = {.is_index_data = false,
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true},
.download_done = std::move(download_done),
};
g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
g_file_cache_event_driven_warm_up_submitted_segment_size
<< rs_meta.segment_file_size(segment_id);
g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size;
if (wait) {
wait->add_count();
}
_engine.file_cache_block_downloader().submit_download_task(download_meta);

auto download_inverted_index = [&](std::string index_path, uint64_t idx_size) {
auto download_inverted_index = [&, tablet](std::string index_path, uint64_t idx_size) {
auto storage_resource = rs_meta.remote_storage_resource();
auto download_done = [=, tablet_id = rs_meta.tablet_id(),
rowset_id = rs_meta.rowset_id().to_string()](Status st) {
auto download_done = [=, version = rs_meta.version()](Status st) {
DBUG_EXECUTE_IF(
"CloudInternalServiceImpl::warm_up_rowset.download_inverted_idx", {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO(
"[verbose] block download for rowset={}, inverted index "
"file={}, sleep={}",
rowset_id.to_string(), index_path, sleep_time);
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
});
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_index_num << 1;
g_file_cache_event_driven_warm_up_finished_index_size << idx_size;
Expand All @@ -318,14 +336,14 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
g_file_cache_warm_up_rowset_slow_count << 1;
LOG(INFO) << "warm up rowset took " << now_ts - request_ts
<< " us, tablet_id: " << tablet_id
<< ", rowset_id: " << rowset_id
<< ", rowset_id: " << rowset_id.to_string()
<< ", segment_id: " << segment_id;
}
if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) {
g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1;
LOG(INFO) << "warm up rowset (handle to finish) took "
<< now_ts - handle_ts << " us, tablet_id: " << tablet_id
<< ", rowset_id: " << rowset_id
<< ", rowset_id: " << rowset_id.to_string()
<< ", segment_id: " << segment_id;
}
} else {
Expand All @@ -334,6 +352,11 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
LOG(WARNING) << "download inverted index failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id << ", error: " << st;
}
if (tablet->complete_rowset_segment_warmup(rowset_id, st, 0, 1) ==
WarmUpState::DONE) {
VLOG_DEBUG << "warmup rowset " << version.to_string() << "("
<< rowset_id.to_string() << ") completed";
}
if (wait) {
wait->signal();
}
Expand All @@ -342,18 +365,15 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
.path = io::Path(index_path),
.file_size = static_cast<int64_t>(idx_size),
.file_system = storage_resource.value()->fs,
.ctx =
{
.is_index_data = false, // DORIS-20877
.expiration_time = expiration_time,
.is_dryrun = config::
enable_reader_dryrun_when_download_file_cache,
},
.ctx = {.is_index_data = false, // DORIS-20877
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true},
.download_done = std::move(download_done),
};
g_file_cache_event_driven_warm_up_submitted_index_num << 1;
g_file_cache_event_driven_warm_up_submitted_index_size << idx_size;

tablet->update_rowset_warmup_state_inverted_idx_num(rowset_id, 1);
if (wait) {
wait->add_count();
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
if (request.alter_version > 1) {
// [0-1] is a placeholder rowset, no need to convert
RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, start_resp.alter_version()},
&rs_splits, false));
&rs_splits,
{.skip_missing_version = false,
.enable_prefer_cached_rowset = false,
.query_freshness_tolerance_ms = -1}));
}
Defer defer2 {[&]() {
_new_tablet->set_alter_version(-1);
Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ CloudStorageEngine::CloudStorageEngine(const EngineOptions& options)
std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>();
_cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>();
_startup_timepoint = std::chrono::system_clock::now();
}

CloudStorageEngine::~CloudStorageEngine() {
Expand Down
13 changes: 13 additions & 0 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <chrono>
#include <memory>
#include <mutex>

Expand Down Expand Up @@ -161,6 +162,16 @@ class CloudStorageEngine final : public BaseStorageEngine {

void unregister_index_change_compaction(int64_t tablet_id, bool is_base_compact);

std::chrono::time_point<std::chrono::system_clock> startup_timepoint() const {
return _startup_timepoint;
}

#ifdef BE_TEST
void set_startup_timepoint(const std::chrono::time_point<std::chrono::system_clock>& tp) {
_startup_timepoint = tp;
}
#endif

private:
void _refresh_storage_vault_info_thread_callback();
void _vacuum_stale_rowsets_thread_callback();
Expand Down Expand Up @@ -238,6 +249,8 @@ class CloudStorageEngine final : public BaseStorageEngine {

EngineOptions _options;
std::mutex _store_lock;

std::chrono::time_point<std::chrono::system_clock> _startup_timepoint;
};

} // namespace doris
Loading
Loading