diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 382360bed3af60..15160cfabd497d 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -349,8 +349,9 @@ Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t int64_t max_version = cloud_tablet()->max_version().second; DCHECK(max_version >= _output_rowset->version().second); if (max_version > _output_rowset->version().second) { - RETURN_IF_ERROR(cloud_tablet()->capture_consistent_rowsets_unlocked( - {_output_rowset->version().second + 1, max_version}, &tmp_rowsets)); + auto ret = DORIS_TRY(cloud_tablet()->capture_consistent_rowsets_unlocked( + {_output_rowset->version().second + 1, max_version}, CaptureRowsetOps {})); + tmp_rowsets = std::move(ret.rowsets); } for (const auto& it : tmp_rowsets) { int64_t cur_version = it->rowset_meta()->start_version(); diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 7b78033b8e3486..df59a78a58356f 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -150,7 +150,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque // [0-1] is a placeholder rowset, no need to convert RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, start_resp.alter_version()}, &rs_splits, - {.skip_missing_version = false, + {.skip_missing_versions = false, .enable_prefer_cached_rowset = false, .query_freshness_tolerance_ms = -1})); } @@ -199,7 +199,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS; reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; - reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap(); + reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap_ptr(); reader_context.version = Version(0, start_resp.alter_version()); std::vector cluster_key_idxes; if (!_base_tablet_schema->cluster_key_uids().empty()) { @@ -509,22 +509,21 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, } // step 1, process incremental rowset without delete bitmap update lock - std::vector incremental_rowsets; RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get())); int64_t max_version = tmp_tablet->max_version().second; LOG(INFO) << "alter table for mow table, calculate delete bitmap of " << "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version << "-" << max_version << " new_table_id: " << _new_tablet->tablet_id(); if (max_version >= start_calc_delete_bitmap_version) { - RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked( - {start_calc_delete_bitmap_version, max_version}, &incremental_rowsets)); + auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked( + {start_calc_delete_bitmap_version, max_version}, CaptureRowsetOps {})); DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock", DBUG_BLOCK); { std::unique_lock wlock(tmp_tablet->get_header_lock()); tmp_tablet->add_rowsets(_output_rowsets, true, wlock); } - for (auto rowset : incremental_rowsets) { + for (auto rowset : ret.rowsets) { RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); } } @@ -540,15 +539,14 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, LOG(INFO) << "alter table for mow table, calculate delete bitmap of " << "incremental rowsets with lock, version: " << max_version + 1 << "-" << new_max_version << " new_tablet_id: " << _new_tablet->tablet_id(); - std::vector new_incremental_rowsets; if (new_max_version > max_version) { - RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked( - {max_version + 1, new_max_version}, &new_incremental_rowsets)); + auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked( + {max_version + 1, new_max_version}, CaptureRowsetOps {})); { std::unique_lock wlock(tmp_tablet->get_header_lock()); tmp_tablet->add_rowsets(_output_rowsets, true, wlock); } - for (auto rowset : new_incremental_rowsets) { + for (auto rowset : ret.rowsets) { RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); } } diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index ba404d9138b88e..913f8127bd04d5 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -47,6 +47,7 @@ #include "cpp/sync_point.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" +#include "olap/base_tablet.h" #include "olap/compaction.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/olap_define.h" @@ -146,83 +147,53 @@ std::string CloudTablet::tablet_path() const { return ""; } -Status CloudTablet::capture_consistent_rowsets_unlocked( - const Version& spec_version, std::vector* rowsets) const { - Versions version_path; - auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path); - if (!st.ok()) { - // Check no missed versions or req version is merged - auto missed_versions = get_missed_versions(spec_version.second); - if (missed_versions.empty()) { - st.set_code(VERSION_ALREADY_MERGED); // Reset error code - } - st.append(" tablet_id=" + std::to_string(tablet_id())); - return st; - } - VLOG_DEBUG << "capture consitent versions: " << version_path; - return _capture_consistent_rowsets_unlocked(version_path, rowsets); -} - Status CloudTablet::capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - const CaptureRsReaderOptions& opts) { - if (opts.query_freshness_tolerance_ms > 0) { - return capture_rs_readers_with_freshness_tolerance(spec_version, rs_splits, - opts.query_freshness_tolerance_ms); - } else if (opts.enable_prefer_cached_rowset && !enable_unique_key_merge_on_write()) { - return capture_rs_readers_prefer_cache(spec_version, rs_splits); - } - return capture_rs_readers_internal(spec_version, rs_splits); -} - -Status CloudTablet::capture_rs_readers_internal(const Version& spec_version, - std::vector* rs_splits) { + const CaptureRowsetOps& opts) { DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", { LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id()); return Status::Error(-230, "injected error"); }); - Versions version_path; std::shared_lock rlock(_meta_lock); - auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path); - if (!st.ok()) { - rlock.unlock(); // avoid logging in lock range - // Check no missed versions or req version is merged - auto missed_versions = get_missed_versions(spec_version.second); - if (missed_versions.empty()) { - st.set_code(VERSION_ALREADY_MERGED); // Reset error code - st.append(" versions are already compacted, "); - } - st.append(" tablet_id=" + std::to_string(tablet_id())); - // clang-format off - LOG(WARNING) << st << '\n' << [this]() { std::string json; get_compaction_status(&json); return json; }(); - // clang-format on - return st; + *rs_splits = DORIS_TRY(capture_rs_readers_unlocked( + spec_version, CaptureRowsetOps {.skip_missing_versions = opts.skip_missing_versions})); + return Status::OK(); +} + +[[nodiscard]] Result> CloudTablet::capture_consistent_versions_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const { + if (options.query_freshness_tolerance_ms > 0) { + return capture_versions_with_freshness_tolerance(version_range, options); + } else if (options.enable_prefer_cached_rowset && !enable_unique_key_merge_on_write()) { + return capture_versions_prefer_cache(version_range); } - VLOG_DEBUG << "capture consitent versions: " << version_path; - return capture_rs_readers_unlocked(version_path, rs_splits); + return BaseTablet::capture_consistent_versions_unlocked(version_range, options); } -Status CloudTablet::capture_rs_readers_prefer_cache(const Version& spec_version, - std::vector* rs_splits) { +Result> CloudTablet::capture_versions_prefer_cache( + const Version& spec_version) const { g_capture_prefer_cache_count << 1; Versions version_path; std::shared_lock rlock(_meta_lock); - RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_prefer_cache( + auto st = _timestamped_version_tracker.capture_consistent_versions_prefer_cache( spec_version, version_path, - [&](int64_t start, int64_t end) { return rowset_is_warmed_up_unlocked(start, end); })); + [&](int64_t start, int64_t end) { return rowset_is_warmed_up_unlocked(start, end); }); + if (!st.ok()) { + return ResultError(st); + } int64_t path_max_version = version_path.back().second; VLOG_DEBUG << fmt::format( - "[verbose] CloudTablet::capture_rs_readers_prefer_cache, capture path: {}, " + "[verbose] CloudTablet::capture_versions_prefer_cache, capture path: {}, " "tablet_id={}, spec_version={}, path_max_version={}", fmt::join(version_path | std::views::transform([](const auto& version) { return fmt::format("{}", version.to_string()); }), ", "), tablet_id(), spec_version.to_string(), path_max_version); - return capture_rs_readers_unlocked(version_path, rs_splits); + return version_path; } -bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) { +bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const { if (start_version > end_version) { return false; } @@ -247,11 +218,11 @@ bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t en return is_rowset_warmed_up(rs->rowset_id()); }; -Status CloudTablet::capture_rs_readers_with_freshness_tolerance( - const Version& spec_version, std::vector* rs_splits, - int64_t query_freshness_tolerance_ms) { +Result> CloudTablet::capture_versions_with_freshness_tolerance( + const Version& spec_version, const CaptureRowsetOps& options) const { g_capture_with_freshness_tolerance_count << 1; using namespace std::chrono; + auto query_freshness_tolerance_ms = options.query_freshness_tolerance_ms; auto freshness_limit_tp = system_clock::now() - milliseconds(query_freshness_tolerance_ms); // find a version path where every edge(rowset) has been warmuped Versions version_path; @@ -259,15 +230,17 @@ Status CloudTablet::capture_rs_readers_with_freshness_tolerance( if (enable_unique_key_merge_on_write()) { // For merge-on-write table, newly generated delete bitmap marks will be on the rowsets which are in newest layout. // So we can ony capture rowsets which are in newest data layout. Otherwise there may be data correctness issue. - RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_with_validator_mow( - spec_version, version_path, [&](int64_t start, int64_t end) { - return rowset_is_warmed_up_unlocked(start, end); - })); + RETURN_IF_ERROR_RESULT( + _timestamped_version_tracker.capture_consistent_versions_with_validator_mow( + spec_version, version_path, [&](int64_t start, int64_t end) { + return rowset_is_warmed_up_unlocked(start, end); + })); } else { - RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_with_validator( - spec_version, version_path, [&](int64_t start, int64_t end) { - return rowset_is_warmed_up_unlocked(start, end); - })); + RETURN_IF_ERROR_RESULT( + _timestamped_version_tracker.capture_consistent_versions_with_validator( + spec_version, version_path, [&](int64_t start, int64_t end) { + return rowset_is_warmed_up_unlocked(start, end); + })); } int64_t path_max_version = version_path.back().second; auto should_be_visible_but_not_warmed_up = [&](const auto& rs_meta) -> bool { @@ -309,17 +282,17 @@ Status CloudTablet::capture_rs_readers_with_freshness_tolerance( g_capture_with_freshness_tolerance_fallback_count << 1; // if there exists a rowset which satisfies freshness tolerance and its start version is larger than the path max version // but has not been warmuped up yet, fallback to capture rowsets as usual - return capture_rs_readers_internal(spec_version, rs_splits); + return BaseTablet::capture_consistent_versions_unlocked(spec_version, options); } VLOG_DEBUG << fmt::format( - "[verbose] CloudTablet::capture_rs_readers_with_freshness_tolerance, capture path: {}, " + "[verbose] CloudTablet::capture_versions_with_freshness_tolerance, capture path: {}, " "tablet_id={}, spec_version={}, path_max_version={}", fmt::join(version_path | std::views::transform([](const auto& version) { return fmt::format("{}", version.to_string()); }), ", "), tablet_id(), spec_version.to_string(), path_max_version); - return capture_rs_readers_unlocked(version_path, rs_splits); + return version_path; } // There are only two tablet_states RUNNING and NOT_READY in cloud mode diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index e485a401221599..7b35b7c4d3fa94 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -81,18 +81,18 @@ class CloudTablet final : public BaseTablet { bool vertical) override; Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - const CaptureRsReaderOptions& opts) override; - Status capture_rs_readers_internal(const Version& spec_version, - std::vector* rs_splits); + const CaptureRowsetOps& opts) override; - // Capture rowset readers with cache preference optimization. + [[nodiscard]] Result> capture_consistent_versions_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const override; + + // Capture versions with cache preference optimization. // This method prioritizes using cached/warmed-up rowsets when building version paths, // avoiding cold data reads when possible. It uses capture_consistent_versions_prefer_cache // to find a consistent version path that prefers already warmed-up rowsets. - Status capture_rs_readers_prefer_cache(const Version& spec_version, - std::vector* rs_splits); + Result> capture_versions_prefer_cache(const Version& spec_version) const; - // Capture rowset readers with query freshness tolerance. + // Capture versions with query freshness tolerance. // This method finds a consistent version path where all rowsets are warmed up, // but allows fallback to normal capture if there are newer rowsets that should be // visible (based on freshness tolerance) but haven't been warmed up yet. @@ -102,16 +102,12 @@ class CloudTablet final : public BaseTablet { // data hasn't been warmed up yet. This can cause different tablets in the same query // to read from different versions, potentially leading to inconsistent query results. // - // @param query_freshness_tolerance_ms: Time tolerance in milliseconds. Rowsets that + // @param options.query_freshness_tolerance_ms: Time tolerance in milliseconds. Rowsets that // became visible within this time range (after current_time - query_freshness_tolerance_ms) // can be skipped if not warmed up. However, if older rowsets (before this time point) // are not warmed up, the method will fallback to normal capture. - Status capture_rs_readers_with_freshness_tolerance(const Version& spec_version, - std::vector* rs_splits, - int64_t query_freshness_tolerance_ms); - - Status capture_consistent_rowsets_unlocked( - const Version& spec_version, std::vector* rowsets) const override; + Result> capture_versions_with_freshness_tolerance( + const Version& spec_version, const CaptureRowsetOps& options) const; size_t tablet_footprint() override { return _approximate_data_size.load(std::memory_order_relaxed); @@ -352,7 +348,7 @@ class CloudTablet final : public BaseTablet { void add_warmed_up_rowset(const RowsetId& rowset_id); - std::string rowset_warmup_digest() { + std::string rowset_warmup_digest() const { std::string res; auto add_log = [&](const RowsetSharedPtr& rs) { auto tmp = fmt::format("{}{}", rs->rowset_id().to_string(), rs->version().to_string()); @@ -382,7 +378,7 @@ class CloudTablet final : public BaseTablet { std::chrono::steady_clock::time_point start_tp = std::chrono::steady_clock::now()); // used by capture_rs_reader_xxx functions - bool rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version); + bool rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const; CloudStorageEngine& _engine; diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index d5414b6bac0db5..28f4b60e1da71b 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -159,7 +159,7 @@ void set_tablet_access_time_ms(CloudTablet* tablet) { Result> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data, bool sync_delete_bitmap, SyncRowsetStats* sync_stats, - bool force_use_cache) { + bool local_only) { // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` class Value : public LRUCacheValueBase { public: @@ -177,12 +177,17 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i CacheKey key(tablet_id_str); auto* handle = _cache->lookup(key); - if (handle == nullptr && force_use_cache) { - return ResultError( - Status::InternalError("failed to get cloud tablet from cache {}", tablet_id)); - } - if (handle == nullptr) { + if (local_only) { + LOG(INFO) << "tablet=" << tablet_id + << "does not exists in local tablet cache, because param local_only=true, " + "treat it as an error"; + return ResultError(Status::InternalError( + "tablet={} does not exists in local tablet cache, because param " + "local_only=true, " + "treat it as an error", + tablet_id)); + } if (sync_stats) { ++sync_stats->tablet_meta_cache_miss; } diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h index ee0b807602da6c..d7dde2134acc18 100644 --- a/be/src/cloud/cloud_tablet_mgr.h +++ b/be/src/cloud/cloud_tablet_mgr.h @@ -47,7 +47,7 @@ class CloudTabletMgr { Result> get_tablet(int64_t tablet_id, bool warmup_data = false, bool sync_delete_bitmap = true, SyncRowsetStats* sync_stats = nullptr, - bool force_use_cache = false); + bool local_only = false); void erase_tablet(int64_t tablet_id); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 32683dc132edd8..a0c589439f27d5 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1570,6 +1570,7 @@ DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false"); DEFINE_mBool(enable_update_delete_bitmap_kv_check_core, "false"); +DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas, "false"); // the max length of segments key bounds, in bytes // ATTENTION: as long as this conf has ever been enabled, cluster downgrade and backup recovery will no longer be supported. DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1"); diff --git a/be/src/common/config.h b/be/src/common/config.h index afba63bebe55a1..aca9ffbf1d14aa 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1626,6 +1626,7 @@ DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently); DECLARE_mBool(enable_update_delete_bitmap_kv_check_core); +DECLARE_mBool(enable_fetch_rowsets_from_peer_replicas); // the max length of segments key bounds, in bytes // ATTENTION: as long as this conf has ever been enabled, cluster downgrade and backup recovery will no longer be supported. DECLARE_mInt32(segments_key_bounds_truncation_threshold); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 00a7db1d982b6e..9cde2bbdf45bdc 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1342,37 +1342,6 @@ void BaseTablet::_rowset_ids_difference(const RowsetIdUnorderedSet& cur, } } -Status BaseTablet::_capture_consistent_rowsets_unlocked( - const std::vector& version_path, std::vector* rowsets) const { - DCHECK(rowsets != nullptr); - rowsets->reserve(version_path.size()); - for (const auto& version : version_path) { - bool is_find = false; - do { - auto it = _rs_version_map.find(version); - if (it != _rs_version_map.end()) { - is_find = true; - rowsets->push_back(it->second); - break; - } - - auto it_expired = _stale_rs_version_map.find(version); - if (it_expired != _stale_rs_version_map.end()) { - is_find = true; - rowsets->push_back(it_expired->second); - break; - } - } while (false); - - if (!is_find) { - return Status::Error( - "fail to find Rowset for version. tablet={}, version={}", tablet_id(), - version.to_string()); - } - } - return Status::OK(); -} - Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, int64_t max_version, int64_t txn_id, const RowsetIdUnorderedSet& rowset_ids, @@ -2143,6 +2112,16 @@ void BaseTablet::get_base_rowset_delete_bitmap_count( } } +void TabletReadSource::fill_delete_predicates() { + DCHECK_EQ(delete_predicates.size(), 0); + auto delete_pred_view = + rs_splits | std::views::transform([](auto&& split) { + return split.rs_reader->rowset()->rowset_meta(); + }) | + std::views::filter([](const auto& rs_meta) { return rs_meta->has_delete_predicate(); }); + delete_predicates = {delete_pred_view.begin(), delete_pred_view.end()}; +} + int32_t BaseTablet::max_version_config() { int32_t max_version = tablet_meta()->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY ? std::max(config::time_series_max_tablet_version_num, diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 3e1dbd82053dca..d2762a8a53ec57 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -43,6 +43,10 @@ class CalcDeleteBitmapToken; class SegmentCacheHandle; class RowIdConversion; struct PartialUpdateInfo; +class PartialUpdateReadPlan; +struct CaptureRowsetOps; +struct CaptureRowsetResult; +struct TabletReadSource; class FixedReadPlan; struct TabletWithVersion { @@ -50,28 +54,6 @@ struct TabletWithVersion { int64_t version; }; -struct CaptureRsReaderOptions { - // Used by local mode only. - // If true, allows skipping missing versions during rowset capture. - // This can be useful when some versions are temporarily unavailable. - bool skip_missing_version {false}; - - // ======== only take effect in cloud mode ======== - - // Enable preference for cached/warmed-up rowsets when building version paths. - // When enabled, the capture process will prioritize already cached rowsets - // to avoid cold data reads and improve query performance. - bool enable_prefer_cached_rowset {false}; - - // Query freshness tolerance in milliseconds. - // Defines the time window for considering data as "fresh enough". - // Rowsets that became visible within this time range can be skipped if not warmed up, - // but older rowsets (before current_time - query_freshness_tolerance_ms) that are - // not warmed up will trigger fallback to normal capture. - // Set to -1 to disable freshness tolerance checking. - int64_t query_freshness_tolerance_ms {-1}; -}; - enum class CompactionStage { NOT_SCHEDULED, PENDING, EXECUTING }; // Base class for all tablet classes @@ -130,12 +112,9 @@ class BaseTablet : public std::enable_shared_from_this { virtual Result> create_rowset_writer(RowsetWriterContext& context, bool vertical) = 0; - virtual Status capture_consistent_rowsets_unlocked( - const Version& spec_version, std::vector* rowsets) const = 0; - virtual Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - const CaptureRsReaderOptions& opts) = 0; + const CaptureRowsetOps& opts) = 0; virtual size_t tablet_footprint() = 0; @@ -324,7 +303,7 @@ class BaseTablet : public std::enable_shared_from_this { } void traverse_rowsets_unlocked(std::function visitor, - bool include_stale = false) { + bool include_stale = false) const { for (auto& [v, rs] : _rs_version_map) { visitor(rs); } @@ -353,6 +332,18 @@ class BaseTablet : public std::enable_shared_from_this { void prefill_dbm_agg_cache(const RowsetSharedPtr& rowset, int64_t version); void prefill_dbm_agg_cache_after_compaction(const RowsetSharedPtr& output_rowset); + [[nodiscard]] Result capture_consistent_rowsets_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const; + + [[nodiscard]] virtual Result> capture_consistent_versions_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const; + + [[nodiscard]] Result> capture_rs_readers_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const; + + [[nodiscard]] Result capture_read_source(const Version& version_range, + const CaptureRowsetOps& options); + protected: // Find the missed versions until the spec_version. // @@ -370,11 +361,10 @@ class BaseTablet : public std::enable_shared_from_this { const RowsetIdUnorderedSet& pre, RowsetIdUnorderedSet* to_add, RowsetIdUnorderedSet* to_del); - Status _capture_consistent_rowsets_unlocked(const std::vector& version_path, - std::vector* rowsets) const; - Status sort_block(vectorized::Block& in_block, vectorized::Block& output_block); + Result _remote_capture_rowsets(const Version& version_range) const; + mutable std::shared_mutex _meta_lock; TimestampedVersionTracker _timestamped_version_tracker; // After version 0.13, all newly created rowsets are saved in _rs_version_map. @@ -413,4 +403,39 @@ class BaseTablet : public std::enable_shared_from_this { Status last_compaction_status = Status::OK(); }; +struct CaptureRowsetOps { + bool skip_missing_versions = false; + bool quiet = false; + bool include_stale_rowsets = true; + bool enable_fetch_rowsets_from_peers = false; + + // ======== only take effect in cloud mode ======== + + // Enable preference for cached/warmed-up rowsets when building version paths. + // When enabled, the capture process will prioritize already cached rowsets + // to avoid cold data reads and improve query performance. + bool enable_prefer_cached_rowset {false}; + + // Query freshness tolerance in milliseconds. + // Defines the time window for considering data as "fresh enough". + // Rowsets that became visible within this time range can be skipped if not warmed up, + // but older rowsets (before current_time - query_freshness_tolerance_ms) that are + // not warmed up will trigger fallback to normal capture. + // Set to -1 to disable freshness tolerance checking. + int64_t query_freshness_tolerance_ms {-1}; +}; + +struct CaptureRowsetResult { + std::vector rowsets; + std::shared_ptr delete_bitmap; +}; + +struct TabletReadSource { + std::vector rs_splits; + std::vector delete_predicates; + std::shared_ptr delete_bitmap; + // Fill delete predicates with `rs_splits` + void fill_delete_predicates(); +}; + } /* namespace doris */ diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 9f2c746ac92eb6..38d2a3fd97af86 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -140,8 +140,9 @@ Status FullCompaction::modify_rowsets() { int64_t max_version = tablet()->max_version().second; DCHECK(max_version >= _output_rowset->version().second); if (max_version > _output_rowset->version().second) { - RETURN_IF_ERROR(_tablet->capture_consistent_rowsets_unlocked( - {_output_rowset->version().second + 1, max_version}, &tmp_rowsets)); + auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked( + {_output_rowset->version().second + 1, max_version}, CaptureRowsetOps {})); + tmp_rowsets = std::move(ret.rowsets); } for (const auto& it : tmp_rowsets) { diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 5e72bec59c0482..62077b9dd7e27e 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -73,7 +73,7 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, reader_params.tablet = tablet; reader_params.reader_type = reader_type; - TabletReader::ReadSource read_source; + TabletReadSource read_source; read_source.rs_splits.reserve(src_rowset_readers.size()); for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) { read_source.rs_splits.emplace_back(rs_reader); @@ -92,7 +92,7 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, } reader_params.tablet_schema = merge_tablet_schema; if (!tablet->tablet_schema()->cluster_key_uids().empty()) { - reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); + reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap_ptr(); } if (stats_output && stats_output->rowid_conversion) { @@ -257,7 +257,7 @@ Status Merger::vertical_compact_one_group( reader_params.tablet = tablet; reader_params.reader_type = reader_type; - TabletReader::ReadSource read_source; + TabletReadSource read_source; read_source.rs_splits.reserve(src_rowset_readers.size()); for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) { read_source.rs_splits.emplace_back(rs_reader); @@ -277,7 +277,7 @@ Status Merger::vertical_compact_one_group( reader_params.tablet_schema = merge_tablet_schema; bool has_cluster_key = false; if (!tablet->tablet_schema()->cluster_key_uids().empty()) { - reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); + reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap_ptr(); has_cluster_key = true; } diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index 6b3940bcb87219..9ccfa347651e1c 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -23,6 +23,7 @@ #include "cloud/cloud_tablet_hotspot.h" #include "cloud/config.h" #include "common/status.h" +#include "olap/base_tablet.h" #include "olap/rowset/beta_rowset.h" #include "olap/segment_loader.h" #include "pipeline/exec/olap_scan_operator.h" @@ -60,7 +61,7 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& // `rs_splits` in `entire read source` will be devided into several partitial read sources // to build several parallel scanners, based on segment rows number. All the partitial read sources // share the same delete predicates from their corresponding entire read source. - TabletReader::ReadSource partitial_read_source; + TabletReadSource partitial_read_source; int64_t rows_collected = 0; for (auto& rs_split : entire_read_source.rs_splits) { auto reader = rs_split.rs_reader; @@ -109,10 +110,11 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& partitial_read_source.rs_splits.emplace_back(std::move(split)); - scanners.emplace_back( - _build_scanner(tablet, version, _key_ranges, - {std::move(partitial_read_source.rs_splits), - entire_read_source.delete_predicates})); + scanners.emplace_back(_build_scanner( + tablet, version, _key_ranges, + {.rs_splits = std::move(partitial_read_source.rs_splits), + .delete_predicates = entire_read_source.delete_predicates, + .delete_bitmap = entire_read_source.delete_bitmap})); partitial_read_source = {}; split = RowSetSplits(reader->clone()); @@ -153,9 +155,11 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& split.segment_offsets.second - split.segment_offsets.first); } #endif - scanners.emplace_back(_build_scanner(tablet, version, _key_ranges, - {std::move(partitial_read_source.rs_splits), - entire_read_source.delete_predicates})); + scanners.emplace_back( + _build_scanner(tablet, version, _key_ranges, + {.rs_splits = std::move(partitial_read_source.rs_splits), + .delete_predicates = entire_read_source.delete_predicates, + .delete_bitmap = entire_read_source.delete_bitmap})); } } @@ -196,12 +200,14 @@ Status ParallelScannerBuilder::_build_scanners_by_segment(std::list // No row-ranges slicing; scan whole segment i. DCHECK_GE(split.segment_offsets.second, split.segment_offsets.first + 1); - TabletReader::ReadSource partitial_read_source; + TabletReadSource partitial_read_source; partitial_read_source.rs_splits.emplace_back(std::move(split)); - scanners.emplace_back(_build_scanner(tablet, version, _key_ranges, - {std::move(partitial_read_source.rs_splits), - entire_read_source.delete_predicates})); + scanners.emplace_back( + _build_scanner(tablet, version, _key_ranges, + {.rs_splits = std::move(partitial_read_source.rs_splits), + .delete_predicates = entire_read_source.delete_predicates, + .delete_bitmap = entire_read_source.delete_bitmap})); } } } @@ -244,7 +250,7 @@ Status ParallelScannerBuilder::_load() { std::shared_ptr ParallelScannerBuilder::_build_scanner( BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, - TabletReader::ReadSource&& read_source) { + TabletReadSource&& read_source) { OlapScanner::Params params {_state, _scanner_profile.get(), key_ranges, std::move(tablet), version, std::move(read_source), _limit, _is_preaggregation}; return OlapScanner::create_shared(_parent, std::move(params)); diff --git a/be/src/olap/parallel_scanner_builder.h b/be/src/olap/parallel_scanner_builder.h index 358398847571b7..de1ea33b149e0c 100644 --- a/be/src/olap/parallel_scanner_builder.h +++ b/be/src/olap/parallel_scanner_builder.h @@ -22,6 +22,7 @@ #include #include +#include "olap/base_tablet.h" #include "olap/rowset/rowset_fwd.h" #include "olap/rowset/segment_v2/row_ranges.h" #include "olap/segment_loader.h" @@ -44,7 +45,7 @@ class ParallelScannerBuilder { public: ParallelScannerBuilder(pipeline::OlapScanLocalState* parent, const std::vector& tablets, - std::vector& read_sources, + std::vector& read_sources, const std::shared_ptr& profile, const std::vector& key_ranges, RuntimeState* state, int64_t limit, bool is_dup_mow_key, bool is_preaggregation) @@ -78,7 +79,7 @@ class ParallelScannerBuilder { std::shared_ptr _build_scanner( BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, - TabletReader::ReadSource&& read_source); + TabletReadSource&& read_source); pipeline::OlapScanLocalState* _parent; @@ -111,8 +112,8 @@ class ParallelScannerBuilder { bool _is_preaggregation; std::vector _tablets; std::vector _key_ranges; - std::unordered_map _all_read_sources; - std::vector& _read_sources; + std::unordered_map _all_read_sources; + std::vector& _read_sources; }; } // namespace doris diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index c6e8dc718c76c4..67c6db392db67a 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -76,7 +76,7 @@ struct RowsetReaderContext { uint64_t* merged_rows = nullptr; // for unique key merge on write bool enable_unique_key_merge_on_write = false; - const DeleteBitmap* delete_bitmap = nullptr; + DeleteBitmapPtr delete_bitmap = nullptr; bool record_rowids = false; RowIdConversion* rowid_conversion = nullptr; bool is_key_column_group = false; diff --git a/be/src/olap/rowset_version_mgr.cpp b/be/src/olap/rowset_version_mgr.cpp new file mode 100644 index 00000000000000..bb815ce859a24e --- /dev/null +++ b/be/src/olap/rowset_version_mgr.cpp @@ -0,0 +1,449 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "cloud/config.h" +#include "common/status.h" +#include "cpp/sync_point.h" +#include "olap/base_tablet.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/rowset_reader.h" +#include "runtime/client_cache.h" +#include "service/backend_options.h" +#include "service/internal_service.h" +#include "util/brpc_client_cache.h" +#include "util/debug_points.h" +#include "util/thrift_rpc_helper.h" +#include "util/time.h" + +namespace doris { + +using namespace ErrorCode; +using namespace std::ranges; + +static bvar::LatencyRecorder g_remote_fetch_tablet_rowsets_single_request_latency( + "remote_fetch_rowsets_single_rpc"); +static bvar::LatencyRecorder g_remote_fetch_tablet_rowsets_latency("remote_fetch_rowsets"); + +[[nodiscard]] Result> BaseTablet::capture_consistent_versions_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const { + std::vector version_path; + auto st = + _timestamped_version_tracker.capture_consistent_versions(version_range, &version_path); + if (!st && !options.quiet) { + auto missed_versions = get_missed_versions_unlocked(version_range.second); + if (missed_versions.empty()) { + LOG(WARNING) << fmt::format( + "version already has been merged. version_range={}, max_version={}, " + "tablet_id={}", + version_range.to_string(), _tablet_meta->max_version().second, tablet_id()); + return ResultError(Status::Error( + "missed versions is empty, version_range={}, max_version={}, tablet_id={}", + version_range.to_string(), _tablet_meta->max_version().second, tablet_id())); + } + LOG(WARNING) << fmt::format("missed version for version_range={}, tablet_id={}, st={}", + version_range.to_string(), tablet_id(), st); + _print_missed_versions(missed_versions); + if (!options.skip_missing_versions) { + return ResultError(std::move(st)); + } + LOG(WARNING) << "force skipping missing version for tablet:" << tablet_id(); + } + DBUG_EXECUTE_IF("Tablet::capture_consistent_versions.inject_failure", { + auto tablet_id = dp->param("tablet_id", -1); + auto skip_by_option = dp->param("skip_by_option", false); + if (skip_by_option && !options.enable_fetch_rowsets_from_peers) { + return version_path; + } + if ((tablet_id != -1 && (tablet_id == _tablet_meta->tablet_id())) || tablet_id == -2) { + return ResultError(Status::Error("version already merged")); + } + }); + return version_path; +} + +[[nodiscard]] Result BaseTablet::capture_consistent_rowsets_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const { + CaptureRowsetResult result; + auto& rowsets = result.rowsets; + auto maybe_versions = capture_consistent_versions_unlocked(version_range, options); + if (maybe_versions) { + const auto& version_paths = maybe_versions.value(); + rowsets.reserve(version_paths.size()); + + auto rowset_for_version = [&](const Version& version, + bool include_stale) -> Result { + if (auto it = _rs_version_map.find(version); it != _rs_version_map.end()) { + return it->second; + } else { + VLOG_NOTICE << "fail to find Rowset in rs_version for version. tablet=" + << tablet_id() << ", version='" << version.first << "-" + << version.second; + } + if (include_stale) { + if (auto it = _stale_rs_version_map.find(version); + it != _stale_rs_version_map.end()) { + return it->second; + } else { + LOG(WARNING) << fmt::format( + "fail to find Rowset in stale_rs_version for version. tablet={}, " + "version={}-{}", + tablet_id(), version.first, version.second); + } + } + return ResultError(Status::Error( + "failed to find rowset for version={}", version.to_string())); + }; + + for (const auto& version : version_paths) { + auto ret = rowset_for_version(version, options.include_stale_rowsets); + if (!ret) { + return ResultError(std::move(ret.error())); + } + + rowsets.push_back(std::move(ret.value())); + } + if (keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write()) { + result.delete_bitmap = _tablet_meta->delete_bitmap_ptr(); + } + return result; + } + + if (!config::is_cloud_mode() || !options.enable_fetch_rowsets_from_peers) { + return ResultError(std::move(maybe_versions.error())); + } + auto ret = _remote_capture_rowsets(version_range); + if (!ret) { + auto st = Status::Error( + "version already merged, meet error during remote capturing rowsets, " + "error={}, version_range={}", + ret.error().to_string(), version_range.to_string()); + return ResultError(std::move(st)); + } + return ret; +} + +[[nodiscard]] Result> BaseTablet::capture_rs_readers_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const { + auto maybe_rs_list = capture_consistent_rowsets_unlocked(version_range, options); + if (!maybe_rs_list) { + return ResultError(std::move(maybe_rs_list.error())); + } + const auto& rs_list = maybe_rs_list.value().rowsets; + std::vector rs_splits; + rs_splits.reserve(rs_list.size()); + for (const auto& rs : rs_list) { + RowsetReaderSharedPtr rs_reader; + auto st = rs->create_reader(&rs_reader); + if (!st) { + return ResultError(Status::Error( + "failed to create reader for rowset={}, reason={}", rs->rowset_id().to_string(), + st.to_string())); + } + rs_splits.emplace_back(std::move(rs_reader)); + } + return rs_splits; +} + +[[nodiscard]] Result BaseTablet::capture_read_source( + const Version& version_range, const CaptureRowsetOps& options) { + std::shared_lock rdlock(get_header_lock()); + auto maybe_result = capture_consistent_rowsets_unlocked(version_range, options); + if (!maybe_result) { + return ResultError(std::move(maybe_result.error())); + } + auto rowsets_result = std::move(maybe_result.value()); + TabletReadSource read_source; + read_source.delete_bitmap = std::move(rowsets_result.delete_bitmap); + const auto& rowsets = rowsets_result.rowsets; + read_source.rs_splits.reserve(rowsets.size()); + for (const auto& rs : rowsets) { + RowsetReaderSharedPtr rs_reader; + auto st = rs->create_reader(&rs_reader); + if (!st) { + return ResultError(Status::Error( + "failed to create reader for rowset={}, reason={}", rs->rowset_id().to_string(), + st.to_string())); + } + read_source.rs_splits.emplace_back(std::move(rs_reader)); + } + return read_source; +} + +template +bool call_bthread(bthread_t& th, const bthread_attr_t* attr, Fn&& fn, Args&&... args) { + auto p_wrap_fn = new auto([=] { fn(args...); }); + auto call_back = [](void* ar) -> void* { + auto f = reinterpret_cast(ar); + (*f)(); + delete f; + return nullptr; + }; + return bthread_start_background(&th, attr, call_back, p_wrap_fn) == 0; +} + +struct GetRowsetsCntl : std::enable_shared_from_this { + struct RemoteGetRowsetResult { + std::vector rowsets; + std::unique_ptr delete_bitmap; + }; + + Status start_req_bg() { + task_cnt = req_addrs.size(); + for (const auto& [ip, port] : req_addrs) { + bthread_t tid; + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + + bool succ = call_bthread(tid, &attr, [self = shared_from_this(), &ip, port]() { + LOG(INFO) << "start to get tablet rowsets from peer BE, ip=" << ip; + Defer defer_log {[&ip, port]() { + LOG(INFO) << "finish to get rowsets from peer BE, ip=" << ip + << ", port=" << port; + }}; + + PGetTabletRowsetsRequest req; + req.set_tablet_id(self->tablet_id); + req.set_version_start(self->version_range.first); + req.set_version_end(self->version_range.second); + if (self->delete_bitmap_keys.has_value()) { + req.mutable_delete_bitmap_keys()->CopyFrom(self->delete_bitmap_keys.value()); + } + brpc::Controller cntl; + cntl.set_timeout_ms(60000); + cntl.set_max_retry(3); + PGetTabletRowsetsResponse response; + auto start_tm_us = MonotonicMicros(); +#ifndef BE_TEST + std::shared_ptr stub = + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(ip, port); + if (stub == nullptr) { + self->result = ResultError(Status::InternalError( + "failed to fetch get_tablet_rowsets stub, ip={}, port={}", ip, port)); + return; + } + stub->get_tablet_rowsets(&cntl, &req, &response, nullptr); +#else + TEST_SYNC_POINT_CALLBACK("get_tablet_rowsets", &response); +#endif + g_remote_fetch_tablet_rowsets_single_request_latency + << MonotonicMicros() - start_tm_us; + + std::unique_lock l(self->butex); + if (self->done) { + return; + } + --self->task_cnt; + auto resp_st = Status::create(response.status()); + DBUG_EXECUTE_IF("GetRowsetCntl::start_req_bg.inject_failure", + { resp_st = Status::InternalError("inject error"); }); + if (cntl.Failed() || !resp_st) { + if (self->task_cnt != 0) { + return; + } + std::stringstream reason; + reason << "failed to get rowsets from all replicas, tablet_id=" + << self->tablet_id; + if (cntl.Failed()) { + reason << ", reason=[" << cntl.ErrorCode() << "] " << cntl.ErrorText(); + } else { + reason << ", reason=" << resp_st.to_string(); + } + self->result = ResultError(Status::InternalError(reason.str())); + self->done = true; + self->event.signal(); + return; + } + + Defer done_cb {[&]() { + self->done = true; + self->event.signal(); + }}; + std::vector rs_metas; + for (auto&& rs_pb : response.rowsets()) { + auto rs_meta = std::make_shared(); + if (!rs_meta->init_from_pb(rs_pb)) { + self->result = + ResultError(Status::InternalError("failed to init rowset from pb")); + return; + } + rs_metas.push_back(std::move(rs_meta)); + } + CaptureRowsetResult result; + self->result->rowsets = std::move(rs_metas); + + if (response.has_delete_bitmap()) { + self->result->delete_bitmap = std::make_unique( + DeleteBitmap::from_pb(response.delete_bitmap(), self->tablet_id)); + } + }); + + if (!succ) { + return Status::InternalError( + "failed to create bthread when request rowsets for tablet={}", tablet_id); + } + } + return Status::OK(); + } + + Result wait_for_ret() { + event.wait(); + return std::move(result); + } + + int64_t tablet_id; + std::vector> req_addrs; + Version version_range; + std::optional delete_bitmap_keys = std::nullopt; + +private: + size_t task_cnt; + + bthread::Mutex butex; + bthread::CountdownEvent event {1}; + bool done = false; + + Result result; +}; + +Result>> get_peer_replicas_addresses( + const int64_t tablet_id) { + auto* cluster_info = ExecEnv::GetInstance()->cluster_info(); + DCHECK_NE(cluster_info, nullptr); + auto master_addr = cluster_info->master_fe_addr; + TGetTabletReplicaInfosRequest req; + req.tablet_ids.push_back(tablet_id); + TGetTabletReplicaInfosResult resp; + auto st = ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&](FrontendServiceConnection& client) { client->getTabletReplicaInfos(resp, req); }); + if (!st) { + return ResultError(Status::InternalError( + "failed to get tablet replica infos, rpc error={}, tablet_id={}", st.to_string(), + tablet_id)); + } + + auto it = resp.tablet_replica_infos.find(tablet_id); + if (it == resp.tablet_replica_infos.end()) { + return ResultError(Status::InternalError("replicas not found, tablet_id={}", tablet_id)); + } + auto replicas = it->second; + auto local_host = BackendOptions::get_localhost(); + bool include_local_host = false; + DBUG_EXECUTE_IF("get_peer_replicas_address.enable_local_host", { include_local_host = true; }); + auto ret_view = + replicas | std::views::filter([&local_host, include_local_host](const auto& replica) { + return local_host.find(replica.host) == std::string::npos || include_local_host; + }) | + std::views::transform([](auto& replica) { + return std::make_pair(std::move(replica.host), replica.brpc_port); + }); + return std::vector(ret_view.begin(), ret_view.end()); +} + +Result BaseTablet::_remote_capture_rowsets( + const Version& version_range) const { + auto start_tm_us = MonotonicMicros(); + Defer defer { + [&]() { g_remote_fetch_tablet_rowsets_latency << MonotonicMicros() - start_tm_us; }}; +#ifndef BE_TEST + auto maybe_be_addresses = get_peer_replicas_addresses(tablet_id()); +#else + Result>> maybe_be_addresses; + TEST_SYNC_POINT_CALLBACK("get_peer_replicas_addresses", &maybe_be_addresses); +#endif + DBUG_EXECUTE_IF("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail", + { maybe_be_addresses = ResultError(Status::InternalError("inject failure")); }); + if (!maybe_be_addresses) { + return ResultError(std::move(maybe_be_addresses.error())); + } + auto be_addresses = std::move(maybe_be_addresses.value()); + if (be_addresses.empty()) { + LOG(WARNING) << "no peers replica for tablet=" << tablet_id(); + return ResultError(Status::InternalError("no replicas for tablet={}", tablet_id())); + } + + auto cntl = std::make_shared(); + cntl->tablet_id = tablet_id(); + cntl->req_addrs = std::move(be_addresses); + cntl->version_range = version_range; + bool is_mow = keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write(); + CaptureRowsetResult result; + if (is_mow) { + result.delete_bitmap = + std::make_unique(_tablet_meta->delete_bitmap().snapshot()); + DeleteBitmapPB delete_bitmap_keys; + auto keyset = result.delete_bitmap->delete_bitmap | + std::views::transform([](const auto& kv) { return kv.first; }); + for (const auto& key : keyset) { + const auto& [rs_id, seg_id, version] = key; + delete_bitmap_keys.mutable_rowset_ids()->Add(rs_id.to_string()); + delete_bitmap_keys.mutable_segment_ids()->Add(seg_id); + delete_bitmap_keys.mutable_versions()->Add(version); + } + cntl->delete_bitmap_keys = std::move(delete_bitmap_keys); + } + + RETURN_IF_ERROR_RESULT(cntl->start_req_bg()); + auto maybe_meta = cntl->wait_for_ret(); + if (!maybe_meta) { + auto err = Status::InternalError( + "tried to get rowsets from peer replicas and failed, " + "reason={}", + maybe_meta.error()); + return ResultError(std::move(err)); + } + + auto& remote_meta = maybe_meta.value(); + const auto& rs_metas = remote_meta.rowsets; + for (const auto& rs_meta : rs_metas) { + RowsetSharedPtr rs; + auto st = RowsetFactory::create_rowset(_tablet_meta->tablet_schema(), {}, rs_meta, &rs); + if (!st) { + return ResultError(std::move(st)); + } + result.rowsets.push_back(std::move(rs)); + } + if (is_mow) { + DCHECK_NE(result.delete_bitmap, nullptr); + result.delete_bitmap->merge(*remote_meta.delete_bitmap); + } + return result; +} + +} // namespace doris diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 427badee862f40..cc1879fce88b43 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1020,7 +1020,7 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS; reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; - reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap(); + reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap_ptr(); reader_context.version = Version(0, end_version); if (!_base_tablet_schema->cluster_key_uids().empty()) { for (const auto& uid : _base_tablet_schema->cluster_key_uids()) { @@ -1144,9 +1144,8 @@ Status SchemaChangeJob::_get_versions_to_be_changed(std::vector* versio } *max_rowset = rowset; - RETURN_IF_ERROR(_base_tablet->capture_consistent_versions_unlocked( - Version(0, rowset->version().second), versions_to_be_changed, false, false)); - + *versions_to_be_changed = DORIS_TRY(_base_tablet->capture_consistent_versions_unlocked( + Version(0, rowset->version().second), {})); return Status::OK(); } @@ -1559,8 +1558,9 @@ Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version) << "double write rowsets for version: " << alter_version + 1 << "-" << max_version << " new_tablet=" << _new_tablet->tablet_id(); std::shared_lock rlock(_new_tablet->get_header_lock()); - RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked( - {alter_version + 1, max_version}, &rowsets)); + auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked( + {alter_version + 1, max_version}, CaptureRowsetOps {})); + rowsets = std::move(ret.rowsets); } for (auto rowset_ptr : rowsets) { std::lock_guard rwlock(_new_tablet->get_rowset_update_lock()); @@ -1578,8 +1578,9 @@ Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version) LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of " << "incremental rowsets for version: " << max_version + 1 << "-" << new_max_version << " new_tablet=" << _new_tablet->tablet_id(); - RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked( - {max_version + 1, new_max_version}, &rowsets)); + auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked( + {max_version + 1, new_max_version}, CaptureRowsetOps {})); + rowsets = std::move(ret.rowsets); } for (auto&& rowset_ptr : rowsets) { RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet, rowset_ptr)); diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index dad9e22c0578ed..899a4bef265dd2 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -568,13 +568,22 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet res = check_version_continuity(consistent_rowsets); if (res.ok() && max_cooldowned_version < version) { // Pick consistent rowsets of remaining required version - res = ref_tablet->capture_consistent_rowsets_unlocked( - {max_cooldowned_version + 1, version}, &consistent_rowsets); + auto ret = ref_tablet->capture_consistent_rowsets_unlocked( + {max_cooldowned_version + 1, version}, CaptureRowsetOps {}); + if (ret) { + consistent_rowsets = std::move(ret->rowsets); + } else { + res = std::move(ret.error()); + } } } else { - // get shortest version path - res = ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version), - &consistent_rowsets); + auto ret = ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version), + CaptureRowsetOps {}); + if (ret) { + consistent_rowsets = std::move(ret->rowsets); + } else { + res = std::move(ret.error()); + } } if (!res.ok()) { LOG(WARNING) << "fail to select versions to span. res=" << res; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 69791626ab8a1b..de64b3a70cd8f8 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -395,12 +395,14 @@ Status Tablet::revise_tablet_meta(const std::vector& to_add, } if (calc_delete_bitmap_ver.first <= calc_delete_bitmap_ver.second) { - calc_bm_status = capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver, - &calc_delete_bitmap_rowsets); - if (!calc_bm_status.ok()) { - LOG(WARNING) << "fail to capture_consistent_rowsets, res: " << calc_bm_status; + auto ret = capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver, + CaptureRowsetOps {}); + if (!ret) { + LOG(WARNING) << "fail to capture_consistent_rowsets, res: " << ret.error(); + calc_bm_status = std::move(ret.error()); break; } + calc_delete_bitmap_rowsets = std::move(ret->rowsets); // FIXME(plat1ko): Use `const TabletSharedPtr&` as parameter auto self = _engine.tablet_manager()->get_tablet(tablet_id()); CHECK(self); @@ -455,17 +457,16 @@ Status Tablet::revise_tablet_meta(const std::vector& to_add, // that we can capture by version if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) { Version full_version = Version(0, max_version_unlocked()); - std::vector expected_rowsets; - auto st = capture_consistent_rowsets_unlocked(full_version, &expected_rowsets); - DCHECK(st.ok()) << st; - DCHECK_EQ(base_rowsets_for_full_clone.size(), expected_rowsets.size()); - if (st.ok() && base_rowsets_for_full_clone.size() != expected_rowsets.size()) - [[unlikely]] { + auto ret = capture_consistent_rowsets_unlocked(full_version, CaptureRowsetOps {}); + DCHECK(ret) << ret.error(); + DCHECK_EQ(base_rowsets_for_full_clone.size(), ret->rowsets.size()); + + if (ret && base_rowsets_for_full_clone.size() != ret->rowsets.size()) [[unlikely]] { LOG(WARNING) << "full clone succeeded, but the count(" << base_rowsets_for_full_clone.size() << ") of base rowsets used for delete bitmap calculation is not match " "expect count(" - << expected_rowsets.size() << ") we capture from tablet meta"; + << ret->rowsets.size() << ") we capture from tablet meta"; } } } @@ -552,7 +553,8 @@ Status Tablet::modify_rowsets(std::vector& to_add, tablet_id()); } else if (rs->rowset_id() != it->second->rowset_id()) { return Status::Error( - "try to delete version {} from {}, but rowset id changed, delete rowset id " + "try to delete version {} from {}, but rowset id changed, delete " + "rowset id " "is {}, exists rowsetid is {}", rs->version().to_string(), tablet_id(), rs->rowset_id().to_string(), it->second->rowset_id().to_string()); @@ -773,10 +775,9 @@ void Tablet::delete_expired_stale_rowset() { Version test_version = Version(0, lastest_delta->end_version()); stale_version_path_map[*path_id_iter] = version_path; - Status status = - capture_consistent_versions_unlocked(test_version, nullptr, false, false); + auto ret = capture_consistent_versions_unlocked(test_version, {}); // 1. When there is no consistent versions, we must reconstruct the tracker. - if (!status.ok()) { + if (!ret) { // 2. fetch missing version after delete Versions after_missed_versions = get_missed_versions_unlocked(lastest_delta->end_version()); @@ -923,51 +924,11 @@ void Tablet::delete_expired_stale_rowset() { { _engine.start_delete_unused_rowset(); }); } -Status Tablet::capture_consistent_versions_unlocked(const Version& spec_version, - Versions* version_path, - bool skip_missing_version, bool quiet) const { - Status status = - _timestamped_version_tracker.capture_consistent_versions(spec_version, version_path); - if (!status.ok() && !quiet) { - Versions missed_versions = get_missed_versions_unlocked(spec_version.second); - if (missed_versions.empty()) { - // if version_path is null, it may be a compaction check logic. - // so to avoid print too many logs. - if (version_path != nullptr) { - LOG(WARNING) << "tablet:" << tablet_id() - << ", version already has been merged. spec_version: " << spec_version - << ", max_version: " << max_version_unlocked(); - } - status = Status::Error( - "versions are already compacted, spec_version " - "{}, max_version {}, tablet_id {}", - spec_version.second, max_version_unlocked(), tablet_id()); - } else { - if (version_path != nullptr) { - LOG(WARNING) << "status:" << status << ", tablet:" << tablet_id() - << ", missed version for version:" << spec_version; - _print_missed_versions(missed_versions); - if (skip_missing_version) { - LOG(WARNING) << "force skipping missing version for tablet:" << tablet_id(); - return Status::OK(); - } - } - } - } - - DBUG_EXECUTE_IF("TTablet::capture_consistent_versions.inject_failure", { - auto tablet_id = dp->param("tablet_id", -1); - if (tablet_id != -1 && tablet_id == _tablet_meta->tablet_id()) { - status = Status::Error("version already merged"); - } - }); - - return status; -} - Status Tablet::check_version_integrity(const Version& version, bool quiet) { std::shared_lock rdlock(_meta_lock); - return capture_consistent_versions_unlocked(version, nullptr, false, quiet); + [[maybe_unused]] auto _versions = DORIS_TRY( + capture_consistent_versions_unlocked(version, CaptureRowsetOps {.quiet = quiet})); + return Status::OK(); } bool Tablet::exceed_version_limit(int32_t limit) { @@ -997,22 +958,12 @@ void Tablet::acquire_version_and_rowsets( } } -Status Tablet::capture_consistent_rowsets_unlocked(const Version& spec_version, - std::vector* rowsets) const { - std::vector version_path; - RETURN_IF_ERROR( - capture_consistent_versions_unlocked(spec_version, &version_path, false, false)); - RETURN_IF_ERROR(_capture_consistent_rowsets_unlocked(version_path, rowsets)); - return Status::OK(); -} - Status Tablet::capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - const CaptureRsReaderOptions& opts) { + const CaptureRowsetOps& opts) { std::shared_lock rlock(_meta_lock); std::vector version_path; - RETURN_IF_ERROR(capture_consistent_versions_unlocked(spec_version, &version_path, - opts.skip_missing_version, false)); - RETURN_IF_ERROR(capture_rs_readers_unlocked(version_path, rs_splits)); + *rs_splits = DORIS_TRY(capture_rs_readers_unlocked( + spec_version, CaptureRowsetOps {.skip_missing_versions = opts.skip_missing_versions})); return Status::OK(); } @@ -2065,7 +2016,8 @@ Status Tablet::cooldown(RowsetSharedPtr rowset) { std::unique_lock schema_change_lock(_schema_change_lock, std::try_to_lock); if (!schema_change_lock.owns_lock()) { return Status::Error( - "try schema_change_lock failed, schema change running or inverted index built on " + "try schema_change_lock failed, schema change running or inverted index built " + "on " "this tablet={}", tablet_id()); } @@ -2353,7 +2305,8 @@ Status Tablet::_follow_cooldowned_data() { } } else if (!rs->is_local()) { return Status::InternalError( - "cooldowned version larger than that to follow with cooldown version {}", + "cooldowned version larger than that to follow with cooldown version " + "{}", cooldowned_version); } } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 1b7e809eec3f94..372c141cc5c208 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -182,24 +182,15 @@ class Tablet final : public BaseTablet { /// need to delete flag. void delete_expired_stale_rowset(); - // Given spec_version, find a continuous version path and store it in version_path. - // If quiet is true, then only "does this path exist" is returned. - // If skip_missing_version is true, return ok even there are missing versions. - Status capture_consistent_versions_unlocked(const Version& spec_version, Versions* version_path, - bool skip_missing_version, bool quiet) const; - // if quiet is true, no error log will be printed if there are missing versions Status check_version_integrity(const Version& version, bool quiet = false); bool check_version_exist(const Version& version) const; void acquire_version_and_rowsets( std::vector>* version_rowsets) const; - Status capture_consistent_rowsets_unlocked( - const Version& spec_version, std::vector* rowsets) const override; - - // If opts.skip_missing_version is true, skip versions if they are missing. + // If skip_missing_version is true, skip versions if they are missing. Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - const CaptureRsReaderOptions& opts) override; + const CaptureRowsetOps& opts) override; // Find the missed versions until the spec_version. // diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 652b8bf6c25927..dc589b9a9925f5 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -1254,6 +1254,35 @@ DeleteBitmap& DeleteBitmap::operator=(DeleteBitmap&& o) noexcept { return *this; } +DeleteBitmap DeleteBitmap::from_pb(const DeleteBitmapPB& pb, int64_t tablet_id) { + size_t len = pb.rowset_ids().size(); + DCHECK_EQ(len, pb.segment_ids().size()); + DCHECK_EQ(len, pb.versions().size()); + DeleteBitmap delete_bitmap(tablet_id); + for (int32_t i = 0; i < len; ++i) { + RowsetId rs_id; + rs_id.init(pb.rowset_ids(i)); + BitmapKey key = {rs_id, pb.segment_ids(i), pb.versions(i)}; + delete_bitmap.delete_bitmap[key] = + roaring::Roaring::read(pb.segment_delete_bitmaps(i).data()); + } + return delete_bitmap; +} + +DeleteBitmapPB DeleteBitmap::to_pb() { + std::shared_lock l(lock); + DeleteBitmapPB ret; + for (const auto& [k, v] : delete_bitmap) { + ret.mutable_rowset_ids()->Add(std::get<0>(k).to_string()); + ret.mutable_segment_ids()->Add(std::get<1>(k)); + ret.mutable_versions()->Add(std::get<2>(k)); + std::string bitmap_data(v.getSizeInBytes(), '\0'); + v.write(bitmap_data.data()); + ret.mutable_segment_delete_bitmaps()->Add(std::move(bitmap_data)); + } + return ret; +} + DeleteBitmap DeleteBitmap::snapshot() const { std::shared_lock l(lock); return DeleteBitmap(*this); @@ -1711,6 +1740,22 @@ std::shared_ptr DeleteBitmap::get_agg_without_cache( return bitmap; } +DeleteBitmap DeleteBitmap::diffset(const std::set& key_set) const { + std::shared_lock l(lock); + auto diff_key_set_view = + delete_bitmap | std::ranges::views::transform([](const auto& kv) { return kv.first; }) | + std::ranges::views::filter( + [&key_set](const auto& key) { return !key_set.contains(key); }); + + DeleteBitmap dbm(_tablet_id); + for (const auto& key : diff_key_set_view) { + const auto* bitmap = get(key); + DCHECK_NE(bitmap, nullptr); + dbm.delete_bitmap[key] = *bitmap; + } + return dbm; +} + std::string tablet_state_name(TabletState state) { switch (state) { case TABLET_NOTREADY: diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index c86d9ad553a6f5..dcefc309a44da6 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -444,6 +444,10 @@ class DeleteBitmap { DeleteBitmap(DeleteBitmap&& r) noexcept; DeleteBitmap& operator=(DeleteBitmap&& r) noexcept; + static DeleteBitmap from_pb(const DeleteBitmapPB& pb, int64_t tablet_id); + + DeleteBitmapPB to_pb(); + /** * Makes a snapshot of delete bitmap, read lock will be acquired in this * process @@ -608,6 +612,14 @@ class DeleteBitmap { void set_tablet_id(int64_t tablet_id); + /** + * Calculate diffset with given `key_set`. All entries with keys contained in this delete bitmap but not + * in given key_set will be added to the output delete bitmap. + * + * @return Deletebitmap containning all entries in diffset + */ + DeleteBitmap diffset(const std::set& key_set) const; + private: DeleteBitmap::Version _get_rowset_cache_version(const BitmapKey& bmk) const; diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index 7eb53414ddbfd5..27668d93fef81c 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -102,16 +102,6 @@ std::string TabletReader::KeysParam::to_string() const { return ss.str(); } -void TabletReader::ReadSource::fill_delete_predicates() { - DCHECK_EQ(delete_predicates.size(), 0); - for (auto&& split : rs_splits) { - auto& rs_meta = split.rs_reader->rowset()->rowset_meta(); - if (rs_meta->has_delete_predicate()) { - delete_predicates.push_back(rs_meta); - } - } -} - TabletReader::~TabletReader() { for (auto* pred : _col_predicates) { delete pred; @@ -680,7 +670,7 @@ Status TabletReader::init_reader_params_and_create_block( reader_params->version = Version(input_rowsets.front()->start_version(), input_rowsets.back()->end_version()); - ReadSource read_source; + TabletReadSource read_source; for (const auto& rowset : input_rowsets) { RowsetReaderSharedPtr rs_reader; RETURN_IF_ERROR(rowset->create_reader(&rs_reader)); @@ -702,9 +692,6 @@ Status TabletReader::init_reader_params_and_create_block( merge_tablet_schema->merge_dropped_columns(*del_pred->tablet_schema()); } reader_params->tablet_schema = merge_tablet_schema; - if (tablet->enable_unique_key_merge_on_write()) { - reader_params->delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); - } reader_params->return_columns.resize(read_tablet_schema->num_columns()); std::iota(reader_params->return_columns.begin(), reader_params->return_columns.end(), 0); diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index 81fb03ef7b2411..a8179f31773aed 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -33,6 +33,7 @@ #include "common/status.h" #include "exprs/function_filter.h" #include "io/io_common.h" +#include "olap/base_tablet.h" #include "olap/delete_handler.h" #include "olap/filter_olap_param.h" #include "olap/iterators.h" @@ -91,12 +92,6 @@ class TabletReader { }; public: - struct ReadSource { - std::vector rs_splits; - std::vector delete_predicates; - // Fill delete predicates with `rs_splits` - void fill_delete_predicates(); - }; // Params for Reader, // mainly include tablet, data version and fetch range. struct ReaderParams { @@ -117,9 +112,14 @@ class TabletReader { return BeExecVersionManager::get_newest_version(); } - void set_read_source(ReadSource read_source) { + void set_read_source(TabletReadSource read_source, bool skip_delete_bitmap = false) { rs_splits = std::move(read_source.rs_splits); delete_predicates = std::move(read_source.delete_predicates); +#ifndef BE_TEST + if (tablet->enable_unique_key_merge_on_write() && !skip_delete_bitmap) { + delete_bitmap = std::move(read_source.delete_bitmap); + } +#endif } BaseTabletSPtr tablet; @@ -148,7 +148,7 @@ class TabletReader { std::vector rs_splits; // For unique key table with merge-on-write - DeleteBitmap* delete_bitmap = nullptr; + DeleteBitmapPtr delete_bitmap = nullptr; // return_columns is init from query schema std::vector return_columns; diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index 05ecfc0401b6d0..9dbaeabad30d70 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -81,13 +81,15 @@ Status EngineChecksumTask::_compute_checksum() { vectorized::Block block; { std::shared_lock rdlock(tablet->get_header_lock()); - Status acquire_reader_st = - tablet->capture_consistent_rowsets_unlocked(version, &input_rowsets); - if (!acquire_reader_st.ok()) { + auto ret = tablet->capture_consistent_rowsets_unlocked(version, CaptureRowsetOps {}); + if (ret) { + input_rowsets = std::move(ret->rowsets); + } else { LOG(WARNING) << "fail to captute consistent rowsets. tablet=" << tablet->tablet_id() - << "res=" << acquire_reader_st; - return acquire_reader_st; + << "res=" << ret.error(); + return std::move(ret.error()); } + RETURN_IF_ERROR(TabletReader::init_reader_params_and_create_block( tablet, ReaderType::READER_CHECKSUM, input_rowsets, &reader_params, &block)); } diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index e7935eae55ff77..497837a9f6bddd 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -33,6 +33,7 @@ #include "common/config.h" #include "common/logging.h" #include "io/fs/local_file_system.h" +#include "olap/base_tablet.h" #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/olap_define.h" @@ -87,8 +88,10 @@ Status EngineStorageMigrationTask::_get_versions(int64_t start_version, int64_t* << ", start_version=" << start_version << ", end_version=" << *end_version; return Status::OK(); } - return _tablet->capture_consistent_rowsets_unlocked(Version(start_version, *end_version), - consistent_rowsets); + auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked( + Version(start_version, *end_version), CaptureRowsetOps {})); + *consistent_rowsets = std::move(ret.rowsets); + return Status::OK(); } bool EngineStorageMigrationTask::_is_timeout() { diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 83f0abb498eb94..b8805897f54af4 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -715,16 +715,15 @@ Status OlapScanLocalState::prepare(RuntimeState* state) { } } - CaptureRsReaderOptions opts { - .skip_missing_version = _state->skip_missing_version(), - .enable_prefer_cached_rowset = - config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() : false, - .query_freshness_tolerance_ms = - config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() : -1, - }; for (size_t i = 0; i < _scan_ranges.size(); i++) { - RETURN_IF_ERROR(_tablets[i].tablet->capture_rs_readers({0, _tablets[i].version}, - &_read_sources[i].rs_splits, opts)); + _read_sources[i] = DORIS_TRY(_tablets[i].tablet->capture_read_source( + {0, _tablets[i].version}, + {.skip_missing_versions = _state->skip_missing_version(), + .enable_fetch_rowsets_from_peers = config::enable_fetch_rowsets_from_peer_replicas, + .enable_prefer_cached_rowset = + config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() : false, + .query_freshness_tolerance_ms = + config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() : -1})); if (!PipelineXLocalState<>::_state->skip_delete_predicate()) { _read_sources[i].fill_delete_predicates(); } diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index f3b37c7fc89ac5..2db857b6e15d40 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -286,7 +286,7 @@ class OlapScanLocalState final : public ScanLocalState { RuntimeProfile::Counter* _variant_subtree_sparse_iter_count = nullptr; std::vector _tablets; - std::vector _read_sources; + std::vector _read_sources; std::map _slot_id_to_virtual_column_expr; std::map _slot_id_to_index_in_block; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 2ff23ff97d0ddb..daf45179f79914 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -54,6 +54,9 @@ #include #include +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet_mgr.h" +#include "cloud/config.h" #include "common/config.h" #include "common/exception.h" #include "common/logging.h" @@ -153,6 +156,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit: DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads, MetricUnit::NOUNIT); +static bvar::LatencyRecorder g_process_remote_fetch_rowsets_latency("process_remote_fetch_rowsets"); + bthread_key_t btls_key; static void thread_context_deleter(void* d) { @@ -2318,5 +2323,68 @@ void PInternalService::abort_refresh_dictionary(google::protobuf::RpcController* request->version_id()); st.to_protobuf(response->mutable_status()); } + +void PInternalService::get_tablet_rowsets(google::protobuf::RpcController* controller, + const PGetTabletRowsetsRequest* request, + PGetTabletRowsetsResponse* response, + google::protobuf::Closure* done) { + DCHECK(config::is_cloud_mode()); + auto start_time = GetMonoTimeMicros(); + Defer defer { + [&]() { g_process_remote_fetch_rowsets_latency << GetMonoTimeMicros() - start_time; }}; + brpc::ClosureGuard closure_guard(done); + LOG(INFO) << "process get tablet rowsets, request=" << request->ShortDebugString(); + if (!request->has_tablet_id() || !request->has_version_start() || !request->has_version_end()) { + Status::InvalidArgument("missing params tablet/version_start/version_end") + .to_protobuf(response->mutable_status()); + return; + } + CloudStorageEngine& storage = ExecEnv::GetInstance()->storage_engine().to_cloud(); + + auto maybe_tablet = + storage.tablet_mgr().get_tablet(request->tablet_id(), /*warmup data*/ false, + /*syn_delete_bitmap*/ false, /*delete_bitmap*/ nullptr, + /*local_only*/ true); + if (!maybe_tablet) { + maybe_tablet.error().to_protobuf(response->mutable_status()); + return; + } + auto tablet = maybe_tablet.value(); + Result ret; + { + std::shared_lock l(tablet->get_header_lock()); + ret = tablet->capture_consistent_rowsets_unlocked( + {request->version_start(), request->version_end()}, + CaptureRowsetOps {.enable_fetch_rowsets_from_peers = false}); + } + if (!ret) { + ret.error().to_protobuf(response->mutable_status()); + return; + } + auto rowsets = std::move(ret.value().rowsets); + for (const auto& rs : rowsets) { + RowsetMetaPB meta; + rs->rowset_meta()->to_rowset_pb(&meta); + response->mutable_rowsets()->Add(std::move(meta)); + } + if (request->has_delete_bitmap_keys()) { + DCHECK(tablet->enable_unique_key_merge_on_write()); + auto delete_bitmap = std::move(ret.value().delete_bitmap); + auto keys_pb = request->delete_bitmap_keys(); + size_t len = keys_pb.rowset_ids().size(); + DCHECK_EQ(len, keys_pb.segment_ids().size()); + DCHECK_EQ(len, keys_pb.versions().size()); + std::set keys; + for (size_t i = 0; i < len; ++i) { + RowsetId rs_id; + rs_id.init(keys_pb.rowset_ids(i)); + keys.emplace(rs_id, keys_pb.segment_ids(i), keys_pb.versions(i)); + } + auto diffset = delete_bitmap->diffset(keys).to_pb(); + *response->mutable_delete_bitmap() = std::move(diffset); + } + Status::OK().to_protobuf(response->mutable_status()); +} + #include "common/compile_check_avoid_end.h" } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 5ebef074b960c0..d73501bfc80861 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -227,6 +227,11 @@ class PInternalService : public PBackendService { PAbortRefreshDictionaryResponse* response, google::protobuf::Closure* done) override; + void get_tablet_rowsets(google::protobuf::RpcController* controller, + const PGetTabletRowsetsRequest* request, + PGetTabletRowsetsResponse* response, + google::protobuf::Closure* done) override; + private: void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, diff --git a/be/src/vec/exec/scan/olap_scanner.cpp b/be/src/vec/exec/scan/olap_scanner.cpp index 0d9c0f0698fb27..afb9b546c49bf3 100644 --- a/be/src/vec/exec/scan/olap_scanner.cpp +++ b/be/src/vec/exec/scan/olap_scanner.cpp @@ -65,7 +65,7 @@ namespace doris::vectorized { #include "common/compile_check_avoid_begin.h" -using ReadSource = TabletReader::ReadSource; +using ReadSource = TabletReadSource; OlapScanner::OlapScanner(pipeline::ScanLocalStateBase* parent, OlapScanner::Params&& params) : Scanner(params.state, parent, params.limit, params.profile), @@ -97,7 +97,8 @@ OlapScanner::OlapScanner(pipeline::ScanLocalStateBase* parent, OlapScanner::Para .score_runtime {}, .collection_statistics {}, .ann_topn_runtime {}}) { - _tablet_reader_params.set_read_source(std::move(params.read_source)); + _tablet_reader_params.set_read_source(std::move(params.read_source), + _state->skip_delete_bitmap()); _has_prepared = false; _vector_search_params = params.state->get_vector_search_params(); } @@ -218,19 +219,26 @@ Status OlapScanner::prepare() { ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); } - CaptureRsReaderOptions opts { - .skip_missing_version = _state->skip_missing_version(), - .enable_prefer_cached_rowset = - config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() : false, - .query_freshness_tolerance_ms = - config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() : -1, - }; - auto st = tablet->capture_rs_readers(_tablet_reader_params.version, - &read_source.rs_splits, opts); - if (!st.ok()) { - LOG(WARNING) << "fail to init reader.res=" << st; - return st; + auto maybe_read_source = tablet->capture_read_source( + _tablet_reader_params.version, + { + .skip_missing_versions = _state->skip_missing_version(), + .enable_fetch_rowsets_from_peers = + config::enable_fetch_rowsets_from_peer_replicas, + .enable_prefer_cached_rowset = + config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() + : false, + .query_freshness_tolerance_ms = + config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() + : -1, + }); + if (!maybe_read_source) { + LOG(WARNING) << "fail to init reader. res=" << maybe_read_source.error(); + return maybe_read_source.error(); } + + read_source = std::move(maybe_read_source.value()); + if (config::enable_mow_verbose_log && tablet->enable_unique_key_merge_on_write()) { LOG_INFO("finish capture_rs_readers for tablet={}, query_id={}", tablet->tablet_id(), print_id(_state->query_id())); @@ -357,7 +365,6 @@ Status OlapScanner::_init_tablet_reader_params( std::inserter(_tablet_reader_params.function_filters, _tablet_reader_params.function_filters.begin())); - auto& tablet = _tablet_reader_params.tablet; auto& tablet_schema = _tablet_reader_params.tablet_schema; // Merge the columns in delete predicate that not in latest schema in to current tablet schema for (auto& del_pred : _tablet_reader_params.delete_predicates) { @@ -417,10 +424,6 @@ Status OlapScanner::_init_tablet_reader_params( _tablet_reader_params.use_page_cache = _state->enable_page_cache(); - if (tablet->enable_unique_key_merge_on_write() && !_state->skip_delete_bitmap()) { - _tablet_reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); - } - DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", DBUG_BLOCK); if (!_state->skip_storage_engine_merge()) { diff --git a/be/src/vec/exec/scan/olap_scanner.h b/be/src/vec/exec/scan/olap_scanner.h index 8f14889222b04c..27e09f298172f2 100644 --- a/be/src/vec/exec/scan/olap_scanner.h +++ b/be/src/vec/exec/scan/olap_scanner.h @@ -66,7 +66,7 @@ class OlapScanner : public Scanner { std::vector key_ranges; BaseTabletSPtr tablet; int64_t version; - TabletReader::ReadSource read_source; + TabletReadSource read_source; int64_t limit; bool aggregation; }; diff --git a/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp b/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp index 8d4d5a37bf7a49..464afb9fc6cf1f 100644 --- a/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp +++ b/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp @@ -130,12 +130,12 @@ class TestQueryPreferCache : public testing::Test { void check_capture_result(CloudTabletSPtr tablet, Version spec_version, const std::vector& expected_versions) { - std::vector rs_splits; - CaptureRsReaderOptions opts {.skip_missing_version = false, - .enable_prefer_cached_rowset = true, - .query_freshness_tolerance_ms = -1}; - auto st = tablet->capture_rs_readers(spec_version, &rs_splits, opts); - ASSERT_TRUE(st.ok()); + CaptureRowsetOps opts {.skip_missing_versions = false, + .enable_prefer_cached_rowset = true, + .query_freshness_tolerance_ms = -1}; + auto res = tablet->capture_read_source(spec_version, opts); + ASSERT_TRUE(res.has_value()); + std::vector rs_splits = std::move(res.value().rs_splits); auto dump_versions = [](const std::vector& expected_versions, const std::vector& splits) { std::vector expected_str; @@ -801,4 +801,4 @@ TEST_F(TestQueryPreferCache, testCapture_4_1) { check_capture_result(tablet, Version {0, 18}, expected_versions); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp b/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp index 5fe5c2e51bcc50..1a24ea275be007 100644 --- a/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp +++ b/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp @@ -128,12 +128,12 @@ class TestFreshnessTolerance : public testing::Test { void check_capture_result(CloudTabletSPtr tablet, Version spec_version, int64_t query_freshness_tolerance_ms, const std::vector& expected_versions) { - std::vector rs_splits; - CaptureRsReaderOptions opts {.skip_missing_version = false, - .enable_prefer_cached_rowset = false, - .query_freshness_tolerance_ms = query_freshness_tolerance_ms}; - auto st = tablet->capture_rs_readers(spec_version, &rs_splits, opts); - ASSERT_TRUE(st.ok()); + CaptureRowsetOps opts {.skip_missing_versions = false, + .enable_prefer_cached_rowset = false, + .query_freshness_tolerance_ms = query_freshness_tolerance_ms}; + auto res = tablet->capture_read_source(spec_version, opts); + ASSERT_TRUE(res.has_value()); + std::vector rs_splits = std::move(res.value().rs_splits); auto dump_versions = [](const std::vector& expected_versions, const std::vector& splits) { std::vector expected_str; diff --git a/be/test/olap/segcompaction_mow_test.cpp b/be/test/olap/segcompaction_mow_test.cpp index a7e3c9fd706741..1dc240d2adafd6 100644 --- a/be/test/olap/segcompaction_mow_test.cpp +++ b/be/test/olap/segcompaction_mow_test.cpp @@ -204,6 +204,7 @@ class SegCompactionMoWTest : public ::testing::TestWithParam { rowset_writer_context->tablet_schema = tablet_schema; rowset_writer_context->version.first = 10; rowset_writer_context->version.second = 10; + rowset_writer_context->enable_segcompaction = true; TabletMetaSharedPtr tablet_meta = std::make_shared(); tablet_meta->_tablet_id = TABLET_ID; @@ -238,7 +239,7 @@ class SegCompactionMoWTest : public ::testing::TestWithParam { std::vector return_columns = {0, 1, 2}; reader_context.return_columns = &return_columns; reader_context.stats = &_stats; - reader_context.delete_bitmap = delete_bitmap.get(); + reader_context.delete_bitmap = delete_bitmap; Status s; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index 7bd487e42c4aad..3fc7c652f589c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -613,4 +613,20 @@ public void checkAndClearSecondaryClusterToBe(String clusterId, long expireTimes return; } } + + public List getAllPrimaryBes() { + List result = new ArrayList(); + primaryClusterToBackends.keySet().forEach(clusterId -> { + List backendIds = primaryClusterToBackends.get(clusterId); + if (backendIds == null || backendIds.isEmpty()) { + return; + } + Long beId = backendIds.get(0); + if (beId != -1) { + Backend backend = Env.getCurrentSystemInfo().getBackend(beId); + result.add(backend); + } + }); + return result; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ee578b378ba633..dd3a6836246b48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2781,29 +2781,36 @@ public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfos LOG.warn("replica {} not normal", replica.getId()); continue; } - Backend backend; - if (Config.isCloudMode() && request.isSetWarmUpJobId()) { + List backends; + if (Config.isCloudMode()) { CloudReplica cloudReplica = (CloudReplica) replica; - // On the cloud, the PrimaryBackend of a tablet indicates the BE where the tablet is stably located, - // while the SecondBackend refers to a BE selected by a new hash when the PrimaryBackend - // is temporarily unavailable. Once the PrimaryBackend recovers, - // the system will switch back to using it. During the preheating phase, - // data needs to be synchronized downstream, which requires a stable BE, - // so the PrimaryBackend is used in this case. - backend = cloudReplica.getPrimaryBackend(clusterId, true); + if (!request.isSetWarmUpJobId()) { + backends = cloudReplica.getAllPrimaryBes(); + } else { + // On the cloud, the PrimaryBackend of a tablet + // indicates the BE where the tablet is stably located, + // while the SecondBackend refers to a BE selected by a new hash when the PrimaryBackend + // is temporarily unavailable. Once the PrimaryBackend recovers, + // the system will switch back to using it. During the preheating phase, + // data needs to be synchronized downstream, which requires a stable BE, + // so the PrimaryBackend is used in this case. + Backend backend = cloudReplica.getPrimaryBackend(clusterId, true); + backends = Lists.newArrayList(backend); + } } else { - backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException()); + Backend backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException()); + backends = Lists.newArrayList(backend); } - if (backend != null) { - TReplicaInfo replicaInfo = new TReplicaInfo(); - replicaInfo.setHost(backend.getHost()); - replicaInfo.setBePort(backend.getBePort()); - replicaInfo.setHttpPort(backend.getHttpPort()); - replicaInfo.setBrpcPort(backend.getBrpcPort()); - replicaInfo.setIsAlive(backend.isAlive()); - replicaInfo.setBackendId(backend.getId()); - replicaInfo.setReplicaId(replica.getId()); - replicaInfos.add(replicaInfo); + for (Backend backend : backends) { + if (backend != null) { + TReplicaInfo replicaInfo = new TReplicaInfo(); + replicaInfo.setHost(backend.getHost()); + replicaInfo.setBePort(backend.getBePort()); + replicaInfo.setHttpPort(backend.getHttpPort()); + replicaInfo.setBrpcPort(backend.getBrpcPort()); + replicaInfo.setReplicaId(replica.getId()); + replicaInfos.add(replicaInfo); + } } } tabletReplicaInfos.put(tabletId, replicaInfos); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 8bbc80adfbb70a..1ddfbcf2502442 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -1066,6 +1066,21 @@ message PAbortRefreshDictionaryResponse { optional PStatus status = 1; } +message PGetTabletRowsetsRequest { + optional int64 tablet_id = 1; + optional int64 version_start = 2; + optional int64 version_end = 3; + + optional DeleteBitmapPB delete_bitmap_keys = 4; +} + +message PGetTabletRowsetsResponse { + required PStatus status = 1; + repeated RowsetMetaPB rowsets = 2; + + optional DeleteBitmapPB delete_bitmap = 3; +} + service PBackendService { // If #fragments of a query is < 3, use exec_plan_fragment directly. // If #fragments of a query is >=3, use exec_plan_fragment_prepare + exec_plan_fragment_start @@ -1121,5 +1136,6 @@ service PBackendService { rpc delete_dictionary(PDeleteDictionaryRequest) returns (PDeleteDictionaryResponse); rpc commit_refresh_dictionary(PCommitRefreshDictionaryRequest) returns (PCommitRefreshDictionaryResponse); rpc abort_refresh_dictionary(PAbortRefreshDictionaryRequest) returns (PAbortRefreshDictionaryResponse); + rpc get_tablet_rowsets(PGetTabletRowsetsRequest) returns (PGetTabletRowsetsResponse); }; diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out new file mode 100644 index 00000000000000..78964812ebfb24 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out @@ -0,0 +1,15 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 + +-- !sql -- +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 + diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index aa533b0f89f88f..4c22d3bf86971c 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -90,3 +90,4 @@ enable_parquet_page_index=true enable_graceful_exit_check=true enable_prefill_all_dbm_agg_cache_after_compaction=true +enable_fetch_rowsets_from_peer_replicas = true diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy new file mode 100644 index 00000000000000..0dd1f92dc18274 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.apache.doris.regression.util.NodeType + + +suite("test_cloud_version_already_merged", "nonConcurrent") { + if (!isCloudMode()) { + return + } + def tblName = "test_cloud_version_already_merged" + sql """ DROP TABLE IF EXISTS ${tblName} FORCE; """ + sql """ + CREATE TABLE IF NOT EXISTS ${tblName} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); + """ + + + sql "insert into ${tblName} values(1,-1,-1,-1);" + sql "insert into ${tblName} values(2,-2,-2,-2);" + sql "insert into ${tblName} values(3,-3,-3,-3);" + sql "insert into ${tblName} values(4,-4,-4,-4)" + sql "insert into ${tblName} values(5,-5,-5,-5)" + sql "insert into ${tblName} values(1,1,1,1);" + sql "insert into ${tblName} values(2,2,2,2);" + sql "insert into ${tblName} values(3,3,3,3);" + sql "insert into ${tblName} values(4,4,4,4)" + sql "insert into ${tblName} values(5,5,5,5)" + + + + + sql "sync;" + qt_sql "select * from ${tblName} order by k1;" + + + def backends = sql_return_maparray('show backends') + def tabletStats = sql_return_maparray("show tablets from ${tblName};") + assert tabletStats.size() == 1 + def tabletId = tabletStats[0].TabletId + def tabletBackendId = tabletStats[0].BackendId + def tabletBackend + for (def be : backends) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + sleep(10000) + + try { + GetDebugPoint().enableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId]) + GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host") + + test { + sql """ SELECT * from ${tblName} ORDER BY k1 """ + exception "version already merged, meet error during remote capturing rowsets" + } + + + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + try { + GetDebugPoint().enableDebugPoint(tabletBackend.Host, tabletBackend.HttpPort as int, NodeType.BE, "Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId, skip_by_option: true]) + GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host") + GetDebugPoint().enableDebugPointForAllBEs("GetRowsetCntl::start_req_bg.inject_failure"); + + + test { + sql """ SELECT * from ${tblName} ORDER BY k1 """ + exception "version already merged, meet error during remote capturing rowsets" + } + + + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + + try { + GetDebugPoint().enableDebugPoint(tabletBackend.Host, tabletBackend.HttpPort as int, NodeType.BE, "Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId, skip_by_option: true]) + GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host") + GetDebugPoint().enableDebugPointForAllBEs("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail"); + + + test { + sql """ SELECT * from ${tblName} ORDER BY k1 """ + exception "version already merged, meet error during remote capturing rowsets" + } + + + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + try { + GetDebugPoint().enableDebugPoint(tabletBackend.Host, tabletBackend.HttpPort as int, NodeType.BE, "Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId, skip_by_option: true]) + GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host") + + qt_sql """ SELECT * from ${tblName} ORDER BY k1 """ + + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } +} +