Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,9 @@ Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
int64_t max_version = cloud_tablet()->max_version().second;
DCHECK(max_version >= _output_rowset->version().second);
if (max_version > _output_rowset->version().second) {
RETURN_IF_ERROR(cloud_tablet()->capture_consistent_rowsets_unlocked(
{_output_rowset->version().second + 1, max_version}, &tmp_rowsets));
auto ret = DORIS_TRY(cloud_tablet()->capture_consistent_rowsets_unlocked(
{_output_rowset->version().second + 1, max_version}, CaptureRowsetOps {}));
tmp_rowsets = std::move(ret.rowsets);
}
for (const auto& it : tmp_rowsets) {
int64_t cur_version = it->rowset_meta()->start_version();
Expand Down
18 changes: 8 additions & 10 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
// [0-1] is a placeholder rowset, no need to convert
RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, start_resp.alter_version()},
&rs_splits,
{.skip_missing_version = false,
{.skip_missing_versions = false,
.enable_prefer_cached_rowset = false,
.query_freshness_tolerance_ms = -1}));
}
Expand Down Expand Up @@ -199,7 +199,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS;
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap();
reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap_ptr();
reader_context.version = Version(0, start_resp.alter_version());
std::vector<uint32_t> cluster_key_idxes;
if (!_base_tablet_schema->cluster_key_uids().empty()) {
Expand Down Expand Up @@ -509,22 +509,21 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
}

// step 1, process incremental rowset without delete bitmap update lock
std::vector<RowsetSharedPtr> incremental_rowsets;
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
int64_t max_version = tmp_tablet->max_version().second;
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
<< "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version
<< "-" << max_version << " new_table_id: " << _new_tablet->tablet_id();
if (max_version >= start_calc_delete_bitmap_version) {
RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
{start_calc_delete_bitmap_version, max_version}, &incremental_rowsets));
auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
{start_calc_delete_bitmap_version, max_version}, CaptureRowsetOps {}));
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
DBUG_BLOCK);
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
}
for (auto rowset : incremental_rowsets) {
for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
}
}
Expand All @@ -540,15 +539,14 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
<< "incremental rowsets with lock, version: " << max_version + 1 << "-"
<< new_max_version << " new_tablet_id: " << _new_tablet->tablet_id();
std::vector<RowsetSharedPtr> new_incremental_rowsets;
if (new_max_version > max_version) {
RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
{max_version + 1, new_max_version}, &new_incremental_rowsets));
auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
{max_version + 1, new_max_version}, CaptureRowsetOps {}));
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
}
for (auto rowset : new_incremental_rowsets) {
for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
}
}
Expand Down
107 changes: 40 additions & 67 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "cpp/sync_point.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/base_tablet.h"
#include "olap/compaction.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_define.h"
Expand Down Expand Up @@ -146,83 +147,53 @@ std::string CloudTablet::tablet_path() const {
return "";
}

Status CloudTablet::capture_consistent_rowsets_unlocked(
const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const {
Versions version_path;
auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path);
if (!st.ok()) {
// Check no missed versions or req version is merged
auto missed_versions = get_missed_versions(spec_version.second);
if (missed_versions.empty()) {
st.set_code(VERSION_ALREADY_MERGED); // Reset error code
}
st.append(" tablet_id=" + std::to_string(tablet_id()));
return st;
}
VLOG_DEBUG << "capture consitent versions: " << version_path;
return _capture_consistent_rowsets_unlocked(version_path, rowsets);
}

Status CloudTablet::capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
const CaptureRsReaderOptions& opts) {
if (opts.query_freshness_tolerance_ms > 0) {
return capture_rs_readers_with_freshness_tolerance(spec_version, rs_splits,
opts.query_freshness_tolerance_ms);
} else if (opts.enable_prefer_cached_rowset && !enable_unique_key_merge_on_write()) {
return capture_rs_readers_prefer_cache(spec_version, rs_splits);
}
return capture_rs_readers_internal(spec_version, rs_splits);
}

Status CloudTablet::capture_rs_readers_internal(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits) {
const CaptureRowsetOps& opts) {
DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id());
return Status::Error<false>(-230, "injected error");
});
Versions version_path;
std::shared_lock rlock(_meta_lock);
auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path);
if (!st.ok()) {
rlock.unlock(); // avoid logging in lock range
// Check no missed versions or req version is merged
auto missed_versions = get_missed_versions(spec_version.second);
if (missed_versions.empty()) {
st.set_code(VERSION_ALREADY_MERGED); // Reset error code
st.append(" versions are already compacted, ");
}
st.append(" tablet_id=" + std::to_string(tablet_id()));
// clang-format off
LOG(WARNING) << st << '\n' << [this]() { std::string json; get_compaction_status(&json); return json; }();
// clang-format on
return st;
*rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
spec_version, CaptureRowsetOps {.skip_missing_versions = opts.skip_missing_versions}));
return Status::OK();
}

[[nodiscard]] Result<std::vector<Version>> CloudTablet::capture_consistent_versions_unlocked(
const Version& version_range, const CaptureRowsetOps& options) const {
if (options.query_freshness_tolerance_ms > 0) {
return capture_versions_with_freshness_tolerance(version_range, options);
} else if (options.enable_prefer_cached_rowset && !enable_unique_key_merge_on_write()) {
return capture_versions_prefer_cache(version_range);
}
VLOG_DEBUG << "capture consitent versions: " << version_path;
return capture_rs_readers_unlocked(version_path, rs_splits);
return BaseTablet::capture_consistent_versions_unlocked(version_range, options);
}

Status CloudTablet::capture_rs_readers_prefer_cache(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits) {
Result<std::vector<Version>> CloudTablet::capture_versions_prefer_cache(
const Version& spec_version) const {
g_capture_prefer_cache_count << 1;
Versions version_path;
std::shared_lock rlock(_meta_lock);
RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_prefer_cache(
auto st = _timestamped_version_tracker.capture_consistent_versions_prefer_cache(
spec_version, version_path,
[&](int64_t start, int64_t end) { return rowset_is_warmed_up_unlocked(start, end); }));
[&](int64_t start, int64_t end) { return rowset_is_warmed_up_unlocked(start, end); });
if (!st.ok()) {
return ResultError(st);
}
int64_t path_max_version = version_path.back().second;
VLOG_DEBUG << fmt::format(
"[verbose] CloudTablet::capture_rs_readers_prefer_cache, capture path: {}, "
"[verbose] CloudTablet::capture_versions_prefer_cache, capture path: {}, "
"tablet_id={}, spec_version={}, path_max_version={}",
fmt::join(version_path | std::views::transform([](const auto& version) {
return fmt::format("{}", version.to_string());
}),
", "),
tablet_id(), spec_version.to_string(), path_max_version);
return capture_rs_readers_unlocked(version_path, rs_splits);
return version_path;
}

bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) {
bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const {
if (start_version > end_version) {
return false;
}
Expand All @@ -247,27 +218,29 @@ bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t en
return is_rowset_warmed_up(rs->rowset_id());
};

Status CloudTablet::capture_rs_readers_with_freshness_tolerance(
const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
int64_t query_freshness_tolerance_ms) {
Result<std::vector<Version>> CloudTablet::capture_versions_with_freshness_tolerance(
const Version& spec_version, const CaptureRowsetOps& options) const {
g_capture_with_freshness_tolerance_count << 1;
using namespace std::chrono;
auto query_freshness_tolerance_ms = options.query_freshness_tolerance_ms;
auto freshness_limit_tp = system_clock::now() - milliseconds(query_freshness_tolerance_ms);
// find a version path where every edge(rowset) has been warmuped
Versions version_path;
std::shared_lock rlock(_meta_lock);
if (enable_unique_key_merge_on_write()) {
// For merge-on-write table, newly generated delete bitmap marks will be on the rowsets which are in newest layout.
// So we can ony capture rowsets which are in newest data layout. Otherwise there may be data correctness issue.
RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_with_validator_mow(
spec_version, version_path, [&](int64_t start, int64_t end) {
return rowset_is_warmed_up_unlocked(start, end);
}));
RETURN_IF_ERROR_RESULT(
_timestamped_version_tracker.capture_consistent_versions_with_validator_mow(
spec_version, version_path, [&](int64_t start, int64_t end) {
return rowset_is_warmed_up_unlocked(start, end);
}));
} else {
RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_with_validator(
spec_version, version_path, [&](int64_t start, int64_t end) {
return rowset_is_warmed_up_unlocked(start, end);
}));
RETURN_IF_ERROR_RESULT(
_timestamped_version_tracker.capture_consistent_versions_with_validator(
spec_version, version_path, [&](int64_t start, int64_t end) {
return rowset_is_warmed_up_unlocked(start, end);
}));
}
int64_t path_max_version = version_path.back().second;
auto should_be_visible_but_not_warmed_up = [&](const auto& rs_meta) -> bool {
Expand Down Expand Up @@ -309,17 +282,17 @@ Status CloudTablet::capture_rs_readers_with_freshness_tolerance(
g_capture_with_freshness_tolerance_fallback_count << 1;
// if there exists a rowset which satisfies freshness tolerance and its start version is larger than the path max version
// but has not been warmuped up yet, fallback to capture rowsets as usual
return capture_rs_readers_internal(spec_version, rs_splits);
return BaseTablet::capture_consistent_versions_unlocked(spec_version, options);
}
VLOG_DEBUG << fmt::format(
"[verbose] CloudTablet::capture_rs_readers_with_freshness_tolerance, capture path: {}, "
"[verbose] CloudTablet::capture_versions_with_freshness_tolerance, capture path: {}, "
"tablet_id={}, spec_version={}, path_max_version={}",
fmt::join(version_path | std::views::transform([](const auto& version) {
return fmt::format("{}", version.to_string());
}),
", "),
tablet_id(), spec_version.to_string(), path_max_version);
return capture_rs_readers_unlocked(version_path, rs_splits);
return version_path;
}

// There are only two tablet_states RUNNING and NOT_READY in cloud mode
Expand Down
28 changes: 12 additions & 16 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ class CloudTablet final : public BaseTablet {
bool vertical) override;

Status capture_rs_readers(const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
const CaptureRsReaderOptions& opts) override;
Status capture_rs_readers_internal(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits);
const CaptureRowsetOps& opts) override;

// Capture rowset readers with cache preference optimization.
[[nodiscard]] Result<std::vector<Version>> capture_consistent_versions_unlocked(
const Version& version_range, const CaptureRowsetOps& options) const override;

// Capture versions with cache preference optimization.
// This method prioritizes using cached/warmed-up rowsets when building version paths,
// avoiding cold data reads when possible. It uses capture_consistent_versions_prefer_cache
// to find a consistent version path that prefers already warmed-up rowsets.
Status capture_rs_readers_prefer_cache(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits);
Result<std::vector<Version>> capture_versions_prefer_cache(const Version& spec_version) const;

// Capture rowset readers with query freshness tolerance.
// Capture versions with query freshness tolerance.
// This method finds a consistent version path where all rowsets are warmed up,
// but allows fallback to normal capture if there are newer rowsets that should be
// visible (based on freshness tolerance) but haven't been warmed up yet.
Expand All @@ -102,16 +102,12 @@ class CloudTablet final : public BaseTablet {
// data hasn't been warmed up yet. This can cause different tablets in the same query
// to read from different versions, potentially leading to inconsistent query results.
//
// @param query_freshness_tolerance_ms: Time tolerance in milliseconds. Rowsets that
// @param options.query_freshness_tolerance_ms: Time tolerance in milliseconds. Rowsets that
// became visible within this time range (after current_time - query_freshness_tolerance_ms)
// can be skipped if not warmed up. However, if older rowsets (before this time point)
// are not warmed up, the method will fallback to normal capture.
Status capture_rs_readers_with_freshness_tolerance(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
int64_t query_freshness_tolerance_ms);

Status capture_consistent_rowsets_unlocked(
const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const override;
Result<std::vector<Version>> capture_versions_with_freshness_tolerance(
const Version& spec_version, const CaptureRowsetOps& options) const;

size_t tablet_footprint() override {
return _approximate_data_size.load(std::memory_order_relaxed);
Expand Down Expand Up @@ -352,7 +348,7 @@ class CloudTablet final : public BaseTablet {

void add_warmed_up_rowset(const RowsetId& rowset_id);

std::string rowset_warmup_digest() {
std::string rowset_warmup_digest() const {
std::string res;
auto add_log = [&](const RowsetSharedPtr& rs) {
auto tmp = fmt::format("{}{}", rs->rowset_id().to_string(), rs->version().to_string());
Expand Down Expand Up @@ -382,7 +378,7 @@ class CloudTablet final : public BaseTablet {
std::chrono::steady_clock::time_point start_tp = std::chrono::steady_clock::now());

// used by capture_rs_reader_xxx functions
bool rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version);
bool rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const;

CloudStorageEngine& _engine;

Expand Down
17 changes: 11 additions & 6 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void set_tablet_access_time_ms(CloudTablet* tablet) {
Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data,
bool sync_delete_bitmap,
SyncRowsetStats* sync_stats,
bool force_use_cache) {
bool local_only) {
// LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr`
class Value : public LRUCacheValueBase {
public:
Expand All @@ -177,12 +177,17 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
CacheKey key(tablet_id_str);
auto* handle = _cache->lookup(key);

if (handle == nullptr && force_use_cache) {
return ResultError(
Status::InternalError("failed to get cloud tablet from cache {}", tablet_id));
}

if (handle == nullptr) {
if (local_only) {
LOG(INFO) << "tablet=" << tablet_id
<< "does not exists in local tablet cache, because param local_only=true, "
"treat it as an error";
return ResultError(Status::InternalError(
"tablet={} does not exists in local tablet cache, because param "
"local_only=true, "
"treat it as an error",
tablet_id));
}
if (sync_stats) {
++sync_stats->tablet_meta_cache_miss;
}
Expand Down
Loading
Loading