Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <bthread/countdown_event.h>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
Expand Down Expand Up @@ -228,7 +229,7 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
expiration_time = 0;
}

if (!tablet->add_rowset_warmup_state(rs_meta, WarmUpState::TRIGGERED_BY_JOB)) {
if (!tablet->add_rowset_warmup_state(rs_meta, WarmUpTriggerSource::EVENT_DRIVEN)) {
LOG(INFO) << "found duplicate warmup task for rowset " << rowset_id.to_string()
<< ", skip it";
continue;
Expand Down Expand Up @@ -279,8 +280,9 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
}
if (tablet->complete_rowset_segment_warmup(rowset_id, st, 1, 0) ==
WarmUpState::DONE) {
if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::EVENT_DRIVEN,
rowset_id, st, 1, 0)
.trigger_source == WarmUpTriggerSource::EVENT_DRIVEN) {
VLOG_DEBUG << "warmup rowset " << version.to_string() << "("
<< rowset_id.to_string() << ") completed";
}
Expand Down Expand Up @@ -352,8 +354,9 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
LOG(WARNING) << "download inverted index failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id << ", error: " << st;
}
if (tablet->complete_rowset_segment_warmup(rowset_id, st, 0, 1) ==
WarmUpState::DONE) {
if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::EVENT_DRIVEN,
rowset_id, st, 0, 1)
.trigger_source == WarmUpTriggerSource::EVENT_DRIVEN) {
VLOG_DEBUG << "warmup rowset " << version.to_string() << "("
<< rowset_id.to_string() << ") completed";
}
Expand All @@ -373,7 +376,8 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
};
g_file_cache_event_driven_warm_up_submitted_index_num << 1;
g_file_cache_event_driven_warm_up_submitted_index_size << idx_size;
tablet->update_rowset_warmup_state_inverted_idx_num(rowset_id, 1);
tablet->update_rowset_warmup_state_inverted_idx_num(
WarmUpTriggerSource::EVENT_DRIVEN, rowset_id, 1);
if (wait) {
wait->add_count();
}
Expand Down
129 changes: 88 additions & 41 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_job_num(
"file_cache_warm_up_rowset_triggered_by_job_num");
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num(
"file_cache_warm_up_rowset_triggered_by_sync_rowset_num");
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_event_driven_num(
"file_cache_warm_up_rowset_triggered_by_event_driven_num");
bvar::LatencyRecorder g_file_cache_warm_up_rowset_all_segments_latency(
"file_cache_warm_up_rowset_all_segments_latency");

Expand Down Expand Up @@ -443,8 +445,8 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
if (!warm_up_state_updated) {
VLOG_DEBUG << "warm up rowset " << rs->version() << "(" << rs->rowset_id()
<< ") triggerd by sync rowset";
if (!add_rowset_warmup_state_unlocked(
*(rs->rowset_meta()), WarmUpState::TRIGGERED_BY_SYNC_ROWSET)) {
if (!add_rowset_warmup_state_unlocked(*(rs->rowset_meta()),
WarmUpTriggerSource::SYNC_ROWSET)) {
LOG(INFO) << "found duplicate warmup task for rowset "
<< rs->rowset_id() << ", skip it";
break;
Expand Down Expand Up @@ -476,7 +478,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
std::chrono::seconds(sleep_time));
}
});
self->complete_rowset_segment_warmup(rowset_meta->rowset_id(), st, 1, 0);
self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0);
if (!st) {
LOG_WARNING("add rowset warm up error ").error(st);
}
Expand Down Expand Up @@ -508,13 +510,13 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
std::chrono::seconds(sleep_time));
// clang-format off
});
self->complete_rowset_segment_warmup(rowset_meta->rowset_id(), st, 0, 1);
self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 0, 1);
if (!st) {
LOG_WARNING("add rowset warm up error ").error(st);
}
}},
};
self->update_rowset_warmup_state_inverted_idx_num_unlocked(rowset_meta->rowset_id(), 1);
self->update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), 1);
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
g_file_cache_cloud_tablet_submitted_index_num << 1;
g_file_cache_cloud_tablet_submitted_index_size << idx_size;
Expand Down Expand Up @@ -1647,51 +1649,103 @@ Status CloudTablet::check_delete_bitmap_cache(int64_t txn_id,
WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
std::shared_lock rlock(_meta_lock);
if (!_rowset_warm_up_states.contains(rowset_id)) {
return WarmUpState::NONE;
return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
}
return _rowset_warm_up_states[rowset_id].state;
auto& warmup_info = _rowset_warm_up_states[rowset_id];
warmup_info.update_state();
return warmup_info.state;
}

bool CloudTablet::add_rowset_warmup_state(const RowsetMeta& rowset, WarmUpState state,
bool CloudTablet::add_rowset_warmup_state(const RowsetMeta& rowset, WarmUpTriggerSource source,
std::chrono::steady_clock::time_point start_tp) {
std::lock_guard wlock(_meta_lock);
return add_rowset_warmup_state_unlocked(rowset, state, start_tp);
return add_rowset_warmup_state_unlocked(rowset, source, start_tp);
}

void CloudTablet::update_rowset_warmup_state_inverted_idx_num(RowsetId rowset_id, int64_t delta) {
bool CloudTablet::update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource source,
RowsetId rowset_id, int64_t delta) {
std::lock_guard wlock(_meta_lock);
update_rowset_warmup_state_inverted_idx_num_unlocked(rowset_id, delta);
return update_rowset_warmup_state_inverted_idx_num_unlocked(source, rowset_id, delta);
}

void CloudTablet::update_rowset_warmup_state_inverted_idx_num_unlocked(RowsetId rowset_id,
bool CloudTablet::update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource source,
RowsetId rowset_id,
int64_t delta) {
if (!_rowset_warm_up_states.contains(rowset_id)) {
return;
auto it = _rowset_warm_up_states.find(rowset_id);
if (it == _rowset_warm_up_states.end()) {
return false;
}
if (it->second.state.trigger_source != source) {
// Only the same trigger source can update the state
return false;
}
_rowset_warm_up_states[rowset_id].num_inverted_idx += delta;
it->second.num_inverted_idx += delta;
return true;
}

bool CloudTablet::add_rowset_warmup_state_unlocked(const RowsetMeta& rowset, WarmUpState state,
bool CloudTablet::add_rowset_warmup_state_unlocked(const RowsetMeta& rowset,
WarmUpTriggerSource source,
std::chrono::steady_clock::time_point start_tp) {
if (_rowset_warm_up_states.contains(rowset.rowset_id())) {
return false;
auto rowset_id = rowset.rowset_id();

// Check if rowset already has warmup state
if (_rowset_warm_up_states.contains(rowset_id)) {
auto existing_state = _rowset_warm_up_states[rowset_id].state;

// For job-triggered warmup (one-time and periodic warmup), allow it to proceed
// except when there's already another job-triggered warmup in progress
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the job-triigered warmup is done? still return false?

if (source == WarmUpTriggerSource::JOB) {
if (existing_state.trigger_source == WarmUpTriggerSource::JOB &&
existing_state.progress == WarmUpProgress::DOING) {
// Same job type already in progress, skip to avoid duplicate warmup
return false;
}
} else {
// For non-job warmup (EVENT_DRIVEN, SYNC_ROWSET), skip if any warmup exists
return false;
}
}
if (state == WarmUpState::TRIGGERED_BY_JOB) {

if (source == WarmUpTriggerSource::JOB) {
g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
} else if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
} else if (source == WarmUpTriggerSource::SYNC_ROWSET) {
g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
}
_rowset_warm_up_states[rowset.rowset_id()] = {
.state = state, .num_segments = rowset.num_segments(), .start_tp = start_tp};
} else if (source == WarmUpTriggerSource::EVENT_DRIVEN) {
g_file_cache_warm_up_rowset_triggered_by_event_driven_num << 1;
}
_rowset_warm_up_states[rowset_id] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since two different trigger types will share the same states (and the same counter in it), will this replacement of elements in the map interfere with other warmup types with the same rowsetid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only one trigger source can update a rowset's state in any time

.state = {.trigger_source = source,
.progress = (rowset.num_segments() == 0 ? WarmUpProgress::DONE
: WarmUpProgress::DOING)},
.num_segments = rowset.num_segments(),
.start_tp = start_tp};
return true;
}

WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id, Status status,
void CloudTablet::RowsetWarmUpInfo::update_state() {
if (has_finished()) {
g_file_cache_warm_up_rowset_complete_num << 1;
auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start_tp)
.count();
g_file_cache_warm_up_rowset_all_segments_latency << cost;
state.progress = WarmUpProgress::DONE;
}
}

WarmUpState CloudTablet::complete_rowset_segment_warmup(WarmUpTriggerSource trigger_source,
RowsetId rowset_id, Status status,
int64_t segment_num,
int64_t inverted_idx_num) {
std::lock_guard wlock(_meta_lock);
if (!_rowset_warm_up_states.contains(rowset_id)) {
return WarmUpState::NONE;
auto it = _rowset_warm_up_states.find(rowset_id);
if (it == _rowset_warm_up_states.end()) {
return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
}
auto& warmup_info = it->second;
if (warmup_info.state.trigger_source != trigger_source) {
// Only the same trigger source can update the state
return warmup_info.state;
}
VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << ", " << status;
if (segment_num > 0) {
Expand All @@ -1706,31 +1760,24 @@ WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id, Stat
g_file_cache_warm_up_inverted_idx_failed_num << inverted_idx_num;
}
}
_rowset_warm_up_states[rowset_id].done(segment_num, inverted_idx_num);
if (_rowset_warm_up_states[rowset_id].has_finished()) {
g_file_cache_warm_up_rowset_complete_num << 1;
auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() -
_rowset_warm_up_states[rowset_id].start_tp)
.count();
g_file_cache_warm_up_rowset_all_segments_latency << cost;
_rowset_warm_up_states[rowset_id].state = WarmUpState::DONE;
}
return _rowset_warm_up_states[rowset_id].state;
warmup_info.done(segment_num, inverted_idx_num);
return warmup_info.state;
}

bool CloudTablet::is_rowset_warmed_up(const RowsetId& rowset_id) const {
auto it = _rowset_warm_up_states.find(rowset_id);
if (it == _rowset_warm_up_states.end()) {
return false;
}
return it->second.state == WarmUpState::DONE;
return it->second.state.progress == WarmUpProgress::DONE;
}

void CloudTablet::add_warmed_up_rowset(const RowsetId& rowset_id) {
_rowset_warm_up_states[rowset_id] = {.state = WarmUpState::DONE,
.num_segments = 1,
.start_tp = std::chrono::steady_clock::now()};
_rowset_warm_up_states[rowset_id] = {
.state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET,
.progress = WarmUpProgress::DONE},
.num_segments = 1,
.start_tp = std::chrono::steady_clock::now()};
}

#include "common/compile_check_end.h"
Expand Down
34 changes: 26 additions & 8 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,19 @@
namespace doris {

class CloudStorageEngine;
enum class WarmUpState : int;

enum class WarmUpTriggerSource : int { NONE, SYNC_ROWSET, EVENT_DRIVEN, JOB };

enum class WarmUpProgress : int { NONE, DOING, DONE };

struct WarmUpState {
WarmUpTriggerSource trigger_source {WarmUpTriggerSource::NONE};
WarmUpProgress progress {WarmUpProgress::NONE};

bool operator==(const WarmUpState& other) const {
return trigger_source == other.trigger_source && progress == other.progress;
}
};

struct SyncRowsetStats {
int64_t get_remote_rowsets_num {0};
Expand Down Expand Up @@ -326,11 +338,14 @@ class CloudTablet final : public BaseTablet {
// Add warmup state management
WarmUpState get_rowset_warmup_state(RowsetId rowset_id);
bool add_rowset_warmup_state(
const RowsetMeta& rowset, WarmUpState state,
const RowsetMeta& rowset, WarmUpTriggerSource source,
std::chrono::steady_clock::time_point start_tp = std::chrono::steady_clock::now());
void update_rowset_warmup_state_inverted_idx_num(RowsetId rowset_id, int64_t delta);
void update_rowset_warmup_state_inverted_idx_num_unlocked(RowsetId rowset_id, int64_t delta);
WarmUpState complete_rowset_segment_warmup(RowsetId rowset_id, Status status,
bool update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource source, RowsetId rowset_id,
int64_t delta);
bool update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource source,
RowsetId rowset_id, int64_t delta);
WarmUpState complete_rowset_segment_warmup(WarmUpTriggerSource trigger_source,
RowsetId rowset_id, Status status,
int64_t segment_num, int64_t inverted_idx_num);

bool is_rowset_warmed_up(const RowsetId& rowset_id) const;
Expand All @@ -343,8 +358,8 @@ class CloudTablet final : public BaseTablet {
auto tmp = fmt::format("{}{}", rs->rowset_id().to_string(), rs->version().to_string());
if (_rowset_warm_up_states.contains(rs->rowset_id())) {
tmp += fmt::format(
", state={}, segments_warmed_up={}/{}, inverted_idx_warmed_up={}/{}",
_rowset_warm_up_states.at(rs->rowset_id()).state,
", progress={}, segments_warmed_up={}/{}, inverted_idx_warmed_up={}/{}",
_rowset_warm_up_states.at(rs->rowset_id()).state.progress,
_rowset_warm_up_states.at(rs->rowset_id()).num_segments_warmed_up,
_rowset_warm_up_states.at(rs->rowset_id()).num_segments,
_rowset_warm_up_states.at(rs->rowset_id()).num_inverted_idx_warmed_up,
Expand All @@ -363,7 +378,7 @@ class CloudTablet final : public BaseTablet {
Status sync_if_not_running(SyncRowsetStats* stats = nullptr);

bool add_rowset_warmup_state_unlocked(
const RowsetMeta& rowset, WarmUpState state,
const RowsetMeta& rowset, WarmUpTriggerSource source,
std::chrono::steady_clock::time_point start_tp = std::chrono::steady_clock::now());

// used by capture_rs_reader_xxx functions
Expand Down Expand Up @@ -438,12 +453,15 @@ class CloudTablet final : public BaseTablet {
void done(int64_t num_segments, int64_t num_inverted_idx) {
num_segments_warmed_up += num_segments;
num_inverted_idx_warmed_up += num_inverted_idx;
update_state();
}

bool has_finished() const {
return (num_segments_warmed_up >= num_segments) &&
(num_inverted_idx_warmed_up >= num_inverted_idx);
}

void update_state();
};
std::unordered_map<RowsetId, RowsetWarmUpInfo> _rowset_warm_up_states;

Expand Down
Loading
Loading