From 51f120ccdfc6f8a6afdf4f1094e64c5cf2bdff34 Mon Sep 17 00:00:00 2001 From: Siyang Tang Date: Thu, 23 Oct 2025 15:26:45 +0800 Subject: [PATCH] [opt](rowset) Remote fetch rowsets to avoid -230 error when capturing rowsets (#52995) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Related PR: #52440 In read-write splitting scenarios, some BE (Backend) nodes may have already merged certain rowset versions, while another BE still attempts to capture or access those rowsets. When this happens, the BE reports error E-230 (versions already merged), causing data access or synchronization to fail. This PR introduces a remote rowset fetching mechanism, allowing a BE that lacks the required rowset to fetch it from other BE nodes, instead of failing with E-230. - Added a remote fetch mechanism in the rowset management layer: When a BE detects that a rowset is missing locally but has already been merged, it will try to fetch the rowset from other BE nodes. - Updated version and state checking logic to correctly identify the “merged but missing” condition. - Adjusted the rowset access path to trigger remote fetch rather than throwing an immediate error. - Added tests (unit/integration) to cover the new logic where applicable. - Ensured backward compatibility: If the BE already has the rowset locally or read-write splitting is not enabled, the behavior remains unchanged. Introduce a remote rowset fetching mechanism to prevent E-230 (“versions already merged”) errors in read-write splitting scenarios. This improves BE fault tolerance when some nodes have merged versions that others have not yet synchronized. --- be/src/cloud/cloud_full_compaction.cpp | 5 +- be/src/cloud/cloud_schema_change_job.cpp | 18 +- be/src/cloud/cloud_tablet.cpp | 107 ++--- be/src/cloud/cloud_tablet.h | 28 +- be/src/cloud/cloud_tablet_mgr.cpp | 17 +- be/src/cloud/cloud_tablet_mgr.h | 2 +- be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + be/src/olap/base_tablet.cpp | 41 +- be/src/olap/base_tablet.h | 85 ++-- be/src/olap/full_compaction.cpp | 5 +- be/src/olap/merger.cpp | 8 +- be/src/olap/parallel_scanner_builder.cpp | 32 +- be/src/olap/parallel_scanner_builder.h | 9 +- be/src/olap/rowset/rowset_reader_context.h | 2 +- be/src/olap/rowset_version_mgr.cpp | 449 ++++++++++++++++++ be/src/olap/schema_change.cpp | 17 +- be/src/olap/snapshot_manager.cpp | 19 +- be/src/olap/tablet.cpp | 99 +--- be/src/olap/tablet.h | 13 +- be/src/olap/tablet_meta.cpp | 45 ++ be/src/olap/tablet_meta.h | 12 + be/src/olap/tablet_reader.cpp | 15 +- be/src/olap/tablet_reader.h | 16 +- be/src/olap/task/engine_checksum_task.cpp | 12 +- .../task/engine_storage_migration_task.cpp | 7 +- be/src/pipeline/exec/olap_scan_operator.cpp | 17 +- be/src/pipeline/exec/olap_scan_operator.h | 2 +- be/src/service/internal_service.cpp | 68 +++ be/src/service/internal_service.h | 5 + be/src/vec/exec/scan/olap_scanner.cpp | 41 +- be/src/vec/exec/scan/olap_scanner.h | 2 +- .../cloud_tablet_query_prefer_cache_test.cpp | 14 +- ...cloud_tablet_query_with_tolerance_test.cpp | 12 +- be/test/olap/segcompaction_mow_test.cpp | 3 +- .../doris/cloud/catalog/CloudReplica.java | 16 + .../doris/service/FrontendServiceImpl.java | 47 +- gensrc/proto/internal_service.proto | 16 + .../test_cloud_version_already_merged.out | 15 + regression-test/pipeline/p0/conf/be.conf | 1 + .../test_cloud_version_already_merged.groovy | 142 ++++++ 41 files changed, 1089 insertions(+), 377 deletions(-) create mode 100644 be/src/olap/rowset_version_mgr.cpp create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 382360bed3af60..15160cfabd497d 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -349,8 +349,9 @@ Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t int64_t max_version = cloud_tablet()->max_version().second; DCHECK(max_version >= _output_rowset->version().second); if (max_version > _output_rowset->version().second) { - RETURN_IF_ERROR(cloud_tablet()->capture_consistent_rowsets_unlocked( - {_output_rowset->version().second + 1, max_version}, &tmp_rowsets)); + auto ret = DORIS_TRY(cloud_tablet()->capture_consistent_rowsets_unlocked( + {_output_rowset->version().second + 1, max_version}, CaptureRowsetOps {})); + tmp_rowsets = std::move(ret.rowsets); } for (const auto& it : tmp_rowsets) { int64_t cur_version = it->rowset_meta()->start_version(); diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 7b78033b8e3486..df59a78a58356f 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -150,7 +150,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque // [0-1] is a placeholder rowset, no need to convert RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, start_resp.alter_version()}, &rs_splits, - {.skip_missing_version = false, + {.skip_missing_versions = false, .enable_prefer_cached_rowset = false, .query_freshness_tolerance_ms = -1})); } @@ -199,7 +199,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS; reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; - reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap(); + reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap_ptr(); reader_context.version = Version(0, start_resp.alter_version()); std::vector cluster_key_idxes; if (!_base_tablet_schema->cluster_key_uids().empty()) { @@ -509,22 +509,21 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, } // step 1, process incremental rowset without delete bitmap update lock - std::vector incremental_rowsets; RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get())); int64_t max_version = tmp_tablet->max_version().second; LOG(INFO) << "alter table for mow table, calculate delete bitmap of " << "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version << "-" << max_version << " new_table_id: " << _new_tablet->tablet_id(); if (max_version >= start_calc_delete_bitmap_version) { - RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked( - {start_calc_delete_bitmap_version, max_version}, &incremental_rowsets)); + auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked( + {start_calc_delete_bitmap_version, max_version}, CaptureRowsetOps {})); DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock", DBUG_BLOCK); { std::unique_lock wlock(tmp_tablet->get_header_lock()); tmp_tablet->add_rowsets(_output_rowsets, true, wlock); } - for (auto rowset : incremental_rowsets) { + for (auto rowset : ret.rowsets) { RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); } } @@ -540,15 +539,14 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, LOG(INFO) << "alter table for mow table, calculate delete bitmap of " << "incremental rowsets with lock, version: " << max_version + 1 << "-" << new_max_version << " new_tablet_id: " << _new_tablet->tablet_id(); - std::vector new_incremental_rowsets; if (new_max_version > max_version) { - RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked( - {max_version + 1, new_max_version}, &new_incremental_rowsets)); + auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked( + {max_version + 1, new_max_version}, CaptureRowsetOps {})); { std::unique_lock wlock(tmp_tablet->get_header_lock()); tmp_tablet->add_rowsets(_output_rowsets, true, wlock); } - for (auto rowset : new_incremental_rowsets) { + for (auto rowset : ret.rowsets) { RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); } } diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index ba404d9138b88e..913f8127bd04d5 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -47,6 +47,7 @@ #include "cpp/sync_point.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" +#include "olap/base_tablet.h" #include "olap/compaction.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/olap_define.h" @@ -146,83 +147,53 @@ std::string CloudTablet::tablet_path() const { return ""; } -Status CloudTablet::capture_consistent_rowsets_unlocked( - const Version& spec_version, std::vector* rowsets) const { - Versions version_path; - auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path); - if (!st.ok()) { - // Check no missed versions or req version is merged - auto missed_versions = get_missed_versions(spec_version.second); - if (missed_versions.empty()) { - st.set_code(VERSION_ALREADY_MERGED); // Reset error code - } - st.append(" tablet_id=" + std::to_string(tablet_id())); - return st; - } - VLOG_DEBUG << "capture consitent versions: " << version_path; - return _capture_consistent_rowsets_unlocked(version_path, rowsets); -} - Status CloudTablet::capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - const CaptureRsReaderOptions& opts) { - if (opts.query_freshness_tolerance_ms > 0) { - return capture_rs_readers_with_freshness_tolerance(spec_version, rs_splits, - opts.query_freshness_tolerance_ms); - } else if (opts.enable_prefer_cached_rowset && !enable_unique_key_merge_on_write()) { - return capture_rs_readers_prefer_cache(spec_version, rs_splits); - } - return capture_rs_readers_internal(spec_version, rs_splits); -} - -Status CloudTablet::capture_rs_readers_internal(const Version& spec_version, - std::vector* rs_splits) { + const CaptureRowsetOps& opts) { DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", { LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id()); return Status::Error(-230, "injected error"); }); - Versions version_path; std::shared_lock rlock(_meta_lock); - auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path); - if (!st.ok()) { - rlock.unlock(); // avoid logging in lock range - // Check no missed versions or req version is merged - auto missed_versions = get_missed_versions(spec_version.second); - if (missed_versions.empty()) { - st.set_code(VERSION_ALREADY_MERGED); // Reset error code - st.append(" versions are already compacted, "); - } - st.append(" tablet_id=" + std::to_string(tablet_id())); - // clang-format off - LOG(WARNING) << st << '\n' << [this]() { std::string json; get_compaction_status(&json); return json; }(); - // clang-format on - return st; + *rs_splits = DORIS_TRY(capture_rs_readers_unlocked( + spec_version, CaptureRowsetOps {.skip_missing_versions = opts.skip_missing_versions})); + return Status::OK(); +} + +[[nodiscard]] Result> CloudTablet::capture_consistent_versions_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const { + if (options.query_freshness_tolerance_ms > 0) { + return capture_versions_with_freshness_tolerance(version_range, options); + } else if (options.enable_prefer_cached_rowset && !enable_unique_key_merge_on_write()) { + return capture_versions_prefer_cache(version_range); } - VLOG_DEBUG << "capture consitent versions: " << version_path; - return capture_rs_readers_unlocked(version_path, rs_splits); + return BaseTablet::capture_consistent_versions_unlocked(version_range, options); } -Status CloudTablet::capture_rs_readers_prefer_cache(const Version& spec_version, - std::vector* rs_splits) { +Result> CloudTablet::capture_versions_prefer_cache( + const Version& spec_version) const { g_capture_prefer_cache_count << 1; Versions version_path; std::shared_lock rlock(_meta_lock); - RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_prefer_cache( + auto st = _timestamped_version_tracker.capture_consistent_versions_prefer_cache( spec_version, version_path, - [&](int64_t start, int64_t end) { return rowset_is_warmed_up_unlocked(start, end); })); + [&](int64_t start, int64_t end) { return rowset_is_warmed_up_unlocked(start, end); }); + if (!st.ok()) { + return ResultError(st); + } int64_t path_max_version = version_path.back().second; VLOG_DEBUG << fmt::format( - "[verbose] CloudTablet::capture_rs_readers_prefer_cache, capture path: {}, " + "[verbose] CloudTablet::capture_versions_prefer_cache, capture path: {}, " "tablet_id={}, spec_version={}, path_max_version={}", fmt::join(version_path | std::views::transform([](const auto& version) { return fmt::format("{}", version.to_string()); }), ", "), tablet_id(), spec_version.to_string(), path_max_version); - return capture_rs_readers_unlocked(version_path, rs_splits); + return version_path; } -bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) { +bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const { if (start_version > end_version) { return false; } @@ -247,11 +218,11 @@ bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t en return is_rowset_warmed_up(rs->rowset_id()); }; -Status CloudTablet::capture_rs_readers_with_freshness_tolerance( - const Version& spec_version, std::vector* rs_splits, - int64_t query_freshness_tolerance_ms) { +Result> CloudTablet::capture_versions_with_freshness_tolerance( + const Version& spec_version, const CaptureRowsetOps& options) const { g_capture_with_freshness_tolerance_count << 1; using namespace std::chrono; + auto query_freshness_tolerance_ms = options.query_freshness_tolerance_ms; auto freshness_limit_tp = system_clock::now() - milliseconds(query_freshness_tolerance_ms); // find a version path where every edge(rowset) has been warmuped Versions version_path; @@ -259,15 +230,17 @@ Status CloudTablet::capture_rs_readers_with_freshness_tolerance( if (enable_unique_key_merge_on_write()) { // For merge-on-write table, newly generated delete bitmap marks will be on the rowsets which are in newest layout. // So we can ony capture rowsets which are in newest data layout. Otherwise there may be data correctness issue. - RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_with_validator_mow( - spec_version, version_path, [&](int64_t start, int64_t end) { - return rowset_is_warmed_up_unlocked(start, end); - })); + RETURN_IF_ERROR_RESULT( + _timestamped_version_tracker.capture_consistent_versions_with_validator_mow( + spec_version, version_path, [&](int64_t start, int64_t end) { + return rowset_is_warmed_up_unlocked(start, end); + })); } else { - RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_with_validator( - spec_version, version_path, [&](int64_t start, int64_t end) { - return rowset_is_warmed_up_unlocked(start, end); - })); + RETURN_IF_ERROR_RESULT( + _timestamped_version_tracker.capture_consistent_versions_with_validator( + spec_version, version_path, [&](int64_t start, int64_t end) { + return rowset_is_warmed_up_unlocked(start, end); + })); } int64_t path_max_version = version_path.back().second; auto should_be_visible_but_not_warmed_up = [&](const auto& rs_meta) -> bool { @@ -309,17 +282,17 @@ Status CloudTablet::capture_rs_readers_with_freshness_tolerance( g_capture_with_freshness_tolerance_fallback_count << 1; // if there exists a rowset which satisfies freshness tolerance and its start version is larger than the path max version // but has not been warmuped up yet, fallback to capture rowsets as usual - return capture_rs_readers_internal(spec_version, rs_splits); + return BaseTablet::capture_consistent_versions_unlocked(spec_version, options); } VLOG_DEBUG << fmt::format( - "[verbose] CloudTablet::capture_rs_readers_with_freshness_tolerance, capture path: {}, " + "[verbose] CloudTablet::capture_versions_with_freshness_tolerance, capture path: {}, " "tablet_id={}, spec_version={}, path_max_version={}", fmt::join(version_path | std::views::transform([](const auto& version) { return fmt::format("{}", version.to_string()); }), ", "), tablet_id(), spec_version.to_string(), path_max_version); - return capture_rs_readers_unlocked(version_path, rs_splits); + return version_path; } // There are only two tablet_states RUNNING and NOT_READY in cloud mode diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index e485a401221599..7b35b7c4d3fa94 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -81,18 +81,18 @@ class CloudTablet final : public BaseTablet { bool vertical) override; Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - const CaptureRsReaderOptions& opts) override; - Status capture_rs_readers_internal(const Version& spec_version, - std::vector* rs_splits); + const CaptureRowsetOps& opts) override; - // Capture rowset readers with cache preference optimization. + [[nodiscard]] Result> capture_consistent_versions_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const override; + + // Capture versions with cache preference optimization. // This method prioritizes using cached/warmed-up rowsets when building version paths, // avoiding cold data reads when possible. It uses capture_consistent_versions_prefer_cache // to find a consistent version path that prefers already warmed-up rowsets. - Status capture_rs_readers_prefer_cache(const Version& spec_version, - std::vector* rs_splits); + Result> capture_versions_prefer_cache(const Version& spec_version) const; - // Capture rowset readers with query freshness tolerance. + // Capture versions with query freshness tolerance. // This method finds a consistent version path where all rowsets are warmed up, // but allows fallback to normal capture if there are newer rowsets that should be // visible (based on freshness tolerance) but haven't been warmed up yet. @@ -102,16 +102,12 @@ class CloudTablet final : public BaseTablet { // data hasn't been warmed up yet. This can cause different tablets in the same query // to read from different versions, potentially leading to inconsistent query results. // - // @param query_freshness_tolerance_ms: Time tolerance in milliseconds. Rowsets that + // @param options.query_freshness_tolerance_ms: Time tolerance in milliseconds. Rowsets that // became visible within this time range (after current_time - query_freshness_tolerance_ms) // can be skipped if not warmed up. However, if older rowsets (before this time point) // are not warmed up, the method will fallback to normal capture. - Status capture_rs_readers_with_freshness_tolerance(const Version& spec_version, - std::vector* rs_splits, - int64_t query_freshness_tolerance_ms); - - Status capture_consistent_rowsets_unlocked( - const Version& spec_version, std::vector* rowsets) const override; + Result> capture_versions_with_freshness_tolerance( + const Version& spec_version, const CaptureRowsetOps& options) const; size_t tablet_footprint() override { return _approximate_data_size.load(std::memory_order_relaxed); @@ -352,7 +348,7 @@ class CloudTablet final : public BaseTablet { void add_warmed_up_rowset(const RowsetId& rowset_id); - std::string rowset_warmup_digest() { + std::string rowset_warmup_digest() const { std::string res; auto add_log = [&](const RowsetSharedPtr& rs) { auto tmp = fmt::format("{}{}", rs->rowset_id().to_string(), rs->version().to_string()); @@ -382,7 +378,7 @@ class CloudTablet final : public BaseTablet { std::chrono::steady_clock::time_point start_tp = std::chrono::steady_clock::now()); // used by capture_rs_reader_xxx functions - bool rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version); + bool rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const; CloudStorageEngine& _engine; diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index d5414b6bac0db5..28f4b60e1da71b 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -159,7 +159,7 @@ void set_tablet_access_time_ms(CloudTablet* tablet) { Result> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data, bool sync_delete_bitmap, SyncRowsetStats* sync_stats, - bool force_use_cache) { + bool local_only) { // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` class Value : public LRUCacheValueBase { public: @@ -177,12 +177,17 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i CacheKey key(tablet_id_str); auto* handle = _cache->lookup(key); - if (handle == nullptr && force_use_cache) { - return ResultError( - Status::InternalError("failed to get cloud tablet from cache {}", tablet_id)); - } - if (handle == nullptr) { + if (local_only) { + LOG(INFO) << "tablet=" << tablet_id + << "does not exists in local tablet cache, because param local_only=true, " + "treat it as an error"; + return ResultError(Status::InternalError( + "tablet={} does not exists in local tablet cache, because param " + "local_only=true, " + "treat it as an error", + tablet_id)); + } if (sync_stats) { ++sync_stats->tablet_meta_cache_miss; } diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h index ee0b807602da6c..d7dde2134acc18 100644 --- a/be/src/cloud/cloud_tablet_mgr.h +++ b/be/src/cloud/cloud_tablet_mgr.h @@ -47,7 +47,7 @@ class CloudTabletMgr { Result> get_tablet(int64_t tablet_id, bool warmup_data = false, bool sync_delete_bitmap = true, SyncRowsetStats* sync_stats = nullptr, - bool force_use_cache = false); + bool local_only = false); void erase_tablet(int64_t tablet_id); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 32683dc132edd8..a0c589439f27d5 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1570,6 +1570,7 @@ DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false"); DEFINE_mBool(enable_update_delete_bitmap_kv_check_core, "false"); +DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas, "false"); // the max length of segments key bounds, in bytes // ATTENTION: as long as this conf has ever been enabled, cluster downgrade and backup recovery will no longer be supported. DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1"); diff --git a/be/src/common/config.h b/be/src/common/config.h index afba63bebe55a1..aca9ffbf1d14aa 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1626,6 +1626,7 @@ DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently); DECLARE_mBool(enable_update_delete_bitmap_kv_check_core); +DECLARE_mBool(enable_fetch_rowsets_from_peer_replicas); // the max length of segments key bounds, in bytes // ATTENTION: as long as this conf has ever been enabled, cluster downgrade and backup recovery will no longer be supported. DECLARE_mInt32(segments_key_bounds_truncation_threshold); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 00a7db1d982b6e..9cde2bbdf45bdc 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1342,37 +1342,6 @@ void BaseTablet::_rowset_ids_difference(const RowsetIdUnorderedSet& cur, } } -Status BaseTablet::_capture_consistent_rowsets_unlocked( - const std::vector& version_path, std::vector* rowsets) const { - DCHECK(rowsets != nullptr); - rowsets->reserve(version_path.size()); - for (const auto& version : version_path) { - bool is_find = false; - do { - auto it = _rs_version_map.find(version); - if (it != _rs_version_map.end()) { - is_find = true; - rowsets->push_back(it->second); - break; - } - - auto it_expired = _stale_rs_version_map.find(version); - if (it_expired != _stale_rs_version_map.end()) { - is_find = true; - rowsets->push_back(it_expired->second); - break; - } - } while (false); - - if (!is_find) { - return Status::Error( - "fail to find Rowset for version. tablet={}, version={}", tablet_id(), - version.to_string()); - } - } - return Status::OK(); -} - Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, int64_t max_version, int64_t txn_id, const RowsetIdUnorderedSet& rowset_ids, @@ -2143,6 +2112,16 @@ void BaseTablet::get_base_rowset_delete_bitmap_count( } } +void TabletReadSource::fill_delete_predicates() { + DCHECK_EQ(delete_predicates.size(), 0); + auto delete_pred_view = + rs_splits | std::views::transform([](auto&& split) { + return split.rs_reader->rowset()->rowset_meta(); + }) | + std::views::filter([](const auto& rs_meta) { return rs_meta->has_delete_predicate(); }); + delete_predicates = {delete_pred_view.begin(), delete_pred_view.end()}; +} + int32_t BaseTablet::max_version_config() { int32_t max_version = tablet_meta()->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY ? std::max(config::time_series_max_tablet_version_num, diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 3e1dbd82053dca..d2762a8a53ec57 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -43,6 +43,10 @@ class CalcDeleteBitmapToken; class SegmentCacheHandle; class RowIdConversion; struct PartialUpdateInfo; +class PartialUpdateReadPlan; +struct CaptureRowsetOps; +struct CaptureRowsetResult; +struct TabletReadSource; class FixedReadPlan; struct TabletWithVersion { @@ -50,28 +54,6 @@ struct TabletWithVersion { int64_t version; }; -struct CaptureRsReaderOptions { - // Used by local mode only. - // If true, allows skipping missing versions during rowset capture. - // This can be useful when some versions are temporarily unavailable. - bool skip_missing_version {false}; - - // ======== only take effect in cloud mode ======== - - // Enable preference for cached/warmed-up rowsets when building version paths. - // When enabled, the capture process will prioritize already cached rowsets - // to avoid cold data reads and improve query performance. - bool enable_prefer_cached_rowset {false}; - - // Query freshness tolerance in milliseconds. - // Defines the time window for considering data as "fresh enough". - // Rowsets that became visible within this time range can be skipped if not warmed up, - // but older rowsets (before current_time - query_freshness_tolerance_ms) that are - // not warmed up will trigger fallback to normal capture. - // Set to -1 to disable freshness tolerance checking. - int64_t query_freshness_tolerance_ms {-1}; -}; - enum class CompactionStage { NOT_SCHEDULED, PENDING, EXECUTING }; // Base class for all tablet classes @@ -130,12 +112,9 @@ class BaseTablet : public std::enable_shared_from_this { virtual Result> create_rowset_writer(RowsetWriterContext& context, bool vertical) = 0; - virtual Status capture_consistent_rowsets_unlocked( - const Version& spec_version, std::vector* rowsets) const = 0; - virtual Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - const CaptureRsReaderOptions& opts) = 0; + const CaptureRowsetOps& opts) = 0; virtual size_t tablet_footprint() = 0; @@ -324,7 +303,7 @@ class BaseTablet : public std::enable_shared_from_this { } void traverse_rowsets_unlocked(std::function visitor, - bool include_stale = false) { + bool include_stale = false) const { for (auto& [v, rs] : _rs_version_map) { visitor(rs); } @@ -353,6 +332,18 @@ class BaseTablet : public std::enable_shared_from_this { void prefill_dbm_agg_cache(const RowsetSharedPtr& rowset, int64_t version); void prefill_dbm_agg_cache_after_compaction(const RowsetSharedPtr& output_rowset); + [[nodiscard]] Result capture_consistent_rowsets_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const; + + [[nodiscard]] virtual Result> capture_consistent_versions_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const; + + [[nodiscard]] Result> capture_rs_readers_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const; + + [[nodiscard]] Result capture_read_source(const Version& version_range, + const CaptureRowsetOps& options); + protected: // Find the missed versions until the spec_version. // @@ -370,11 +361,10 @@ class BaseTablet : public std::enable_shared_from_this { const RowsetIdUnorderedSet& pre, RowsetIdUnorderedSet* to_add, RowsetIdUnorderedSet* to_del); - Status _capture_consistent_rowsets_unlocked(const std::vector& version_path, - std::vector* rowsets) const; - Status sort_block(vectorized::Block& in_block, vectorized::Block& output_block); + Result _remote_capture_rowsets(const Version& version_range) const; + mutable std::shared_mutex _meta_lock; TimestampedVersionTracker _timestamped_version_tracker; // After version 0.13, all newly created rowsets are saved in _rs_version_map. @@ -413,4 +403,39 @@ class BaseTablet : public std::enable_shared_from_this { Status last_compaction_status = Status::OK(); }; +struct CaptureRowsetOps { + bool skip_missing_versions = false; + bool quiet = false; + bool include_stale_rowsets = true; + bool enable_fetch_rowsets_from_peers = false; + + // ======== only take effect in cloud mode ======== + + // Enable preference for cached/warmed-up rowsets when building version paths. + // When enabled, the capture process will prioritize already cached rowsets + // to avoid cold data reads and improve query performance. + bool enable_prefer_cached_rowset {false}; + + // Query freshness tolerance in milliseconds. + // Defines the time window for considering data as "fresh enough". + // Rowsets that became visible within this time range can be skipped if not warmed up, + // but older rowsets (before current_time - query_freshness_tolerance_ms) that are + // not warmed up will trigger fallback to normal capture. + // Set to -1 to disable freshness tolerance checking. + int64_t query_freshness_tolerance_ms {-1}; +}; + +struct CaptureRowsetResult { + std::vector rowsets; + std::shared_ptr delete_bitmap; +}; + +struct TabletReadSource { + std::vector rs_splits; + std::vector delete_predicates; + std::shared_ptr delete_bitmap; + // Fill delete predicates with `rs_splits` + void fill_delete_predicates(); +}; + } /* namespace doris */ diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 9f2c746ac92eb6..38d2a3fd97af86 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -140,8 +140,9 @@ Status FullCompaction::modify_rowsets() { int64_t max_version = tablet()->max_version().second; DCHECK(max_version >= _output_rowset->version().second); if (max_version > _output_rowset->version().second) { - RETURN_IF_ERROR(_tablet->capture_consistent_rowsets_unlocked( - {_output_rowset->version().second + 1, max_version}, &tmp_rowsets)); + auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked( + {_output_rowset->version().second + 1, max_version}, CaptureRowsetOps {})); + tmp_rowsets = std::move(ret.rowsets); } for (const auto& it : tmp_rowsets) { diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 5e72bec59c0482..62077b9dd7e27e 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -73,7 +73,7 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, reader_params.tablet = tablet; reader_params.reader_type = reader_type; - TabletReader::ReadSource read_source; + TabletReadSource read_source; read_source.rs_splits.reserve(src_rowset_readers.size()); for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) { read_source.rs_splits.emplace_back(rs_reader); @@ -92,7 +92,7 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, } reader_params.tablet_schema = merge_tablet_schema; if (!tablet->tablet_schema()->cluster_key_uids().empty()) { - reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); + reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap_ptr(); } if (stats_output && stats_output->rowid_conversion) { @@ -257,7 +257,7 @@ Status Merger::vertical_compact_one_group( reader_params.tablet = tablet; reader_params.reader_type = reader_type; - TabletReader::ReadSource read_source; + TabletReadSource read_source; read_source.rs_splits.reserve(src_rowset_readers.size()); for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) { read_source.rs_splits.emplace_back(rs_reader); @@ -277,7 +277,7 @@ Status Merger::vertical_compact_one_group( reader_params.tablet_schema = merge_tablet_schema; bool has_cluster_key = false; if (!tablet->tablet_schema()->cluster_key_uids().empty()) { - reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); + reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap_ptr(); has_cluster_key = true; } diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index 6b3940bcb87219..9ccfa347651e1c 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -23,6 +23,7 @@ #include "cloud/cloud_tablet_hotspot.h" #include "cloud/config.h" #include "common/status.h" +#include "olap/base_tablet.h" #include "olap/rowset/beta_rowset.h" #include "olap/segment_loader.h" #include "pipeline/exec/olap_scan_operator.h" @@ -60,7 +61,7 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& // `rs_splits` in `entire read source` will be devided into several partitial read sources // to build several parallel scanners, based on segment rows number. All the partitial read sources // share the same delete predicates from their corresponding entire read source. - TabletReader::ReadSource partitial_read_source; + TabletReadSource partitial_read_source; int64_t rows_collected = 0; for (auto& rs_split : entire_read_source.rs_splits) { auto reader = rs_split.rs_reader; @@ -109,10 +110,11 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& partitial_read_source.rs_splits.emplace_back(std::move(split)); - scanners.emplace_back( - _build_scanner(tablet, version, _key_ranges, - {std::move(partitial_read_source.rs_splits), - entire_read_source.delete_predicates})); + scanners.emplace_back(_build_scanner( + tablet, version, _key_ranges, + {.rs_splits = std::move(partitial_read_source.rs_splits), + .delete_predicates = entire_read_source.delete_predicates, + .delete_bitmap = entire_read_source.delete_bitmap})); partitial_read_source = {}; split = RowSetSplits(reader->clone()); @@ -153,9 +155,11 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& split.segment_offsets.second - split.segment_offsets.first); } #endif - scanners.emplace_back(_build_scanner(tablet, version, _key_ranges, - {std::move(partitial_read_source.rs_splits), - entire_read_source.delete_predicates})); + scanners.emplace_back( + _build_scanner(tablet, version, _key_ranges, + {.rs_splits = std::move(partitial_read_source.rs_splits), + .delete_predicates = entire_read_source.delete_predicates, + .delete_bitmap = entire_read_source.delete_bitmap})); } } @@ -196,12 +200,14 @@ Status ParallelScannerBuilder::_build_scanners_by_segment(std::list // No row-ranges slicing; scan whole segment i. DCHECK_GE(split.segment_offsets.second, split.segment_offsets.first + 1); - TabletReader::ReadSource partitial_read_source; + TabletReadSource partitial_read_source; partitial_read_source.rs_splits.emplace_back(std::move(split)); - scanners.emplace_back(_build_scanner(tablet, version, _key_ranges, - {std::move(partitial_read_source.rs_splits), - entire_read_source.delete_predicates})); + scanners.emplace_back( + _build_scanner(tablet, version, _key_ranges, + {.rs_splits = std::move(partitial_read_source.rs_splits), + .delete_predicates = entire_read_source.delete_predicates, + .delete_bitmap = entire_read_source.delete_bitmap})); } } } @@ -244,7 +250,7 @@ Status ParallelScannerBuilder::_load() { std::shared_ptr ParallelScannerBuilder::_build_scanner( BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, - TabletReader::ReadSource&& read_source) { + TabletReadSource&& read_source) { OlapScanner::Params params {_state, _scanner_profile.get(), key_ranges, std::move(tablet), version, std::move(read_source), _limit, _is_preaggregation}; return OlapScanner::create_shared(_parent, std::move(params)); diff --git a/be/src/olap/parallel_scanner_builder.h b/be/src/olap/parallel_scanner_builder.h index 358398847571b7..de1ea33b149e0c 100644 --- a/be/src/olap/parallel_scanner_builder.h +++ b/be/src/olap/parallel_scanner_builder.h @@ -22,6 +22,7 @@ #include #include +#include "olap/base_tablet.h" #include "olap/rowset/rowset_fwd.h" #include "olap/rowset/segment_v2/row_ranges.h" #include "olap/segment_loader.h" @@ -44,7 +45,7 @@ class ParallelScannerBuilder { public: ParallelScannerBuilder(pipeline::OlapScanLocalState* parent, const std::vector& tablets, - std::vector& read_sources, + std::vector& read_sources, const std::shared_ptr& profile, const std::vector& key_ranges, RuntimeState* state, int64_t limit, bool is_dup_mow_key, bool is_preaggregation) @@ -78,7 +79,7 @@ class ParallelScannerBuilder { std::shared_ptr _build_scanner( BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, - TabletReader::ReadSource&& read_source); + TabletReadSource&& read_source); pipeline::OlapScanLocalState* _parent; @@ -111,8 +112,8 @@ class ParallelScannerBuilder { bool _is_preaggregation; std::vector _tablets; std::vector _key_ranges; - std::unordered_map _all_read_sources; - std::vector& _read_sources; + std::unordered_map _all_read_sources; + std::vector& _read_sources; }; } // namespace doris diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index c6e8dc718c76c4..67c6db392db67a 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -76,7 +76,7 @@ struct RowsetReaderContext { uint64_t* merged_rows = nullptr; // for unique key merge on write bool enable_unique_key_merge_on_write = false; - const DeleteBitmap* delete_bitmap = nullptr; + DeleteBitmapPtr delete_bitmap = nullptr; bool record_rowids = false; RowIdConversion* rowid_conversion = nullptr; bool is_key_column_group = false; diff --git a/be/src/olap/rowset_version_mgr.cpp b/be/src/olap/rowset_version_mgr.cpp new file mode 100644 index 00000000000000..bb815ce859a24e --- /dev/null +++ b/be/src/olap/rowset_version_mgr.cpp @@ -0,0 +1,449 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "cloud/config.h" +#include "common/status.h" +#include "cpp/sync_point.h" +#include "olap/base_tablet.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/rowset_reader.h" +#include "runtime/client_cache.h" +#include "service/backend_options.h" +#include "service/internal_service.h" +#include "util/brpc_client_cache.h" +#include "util/debug_points.h" +#include "util/thrift_rpc_helper.h" +#include "util/time.h" + +namespace doris { + +using namespace ErrorCode; +using namespace std::ranges; + +static bvar::LatencyRecorder g_remote_fetch_tablet_rowsets_single_request_latency( + "remote_fetch_rowsets_single_rpc"); +static bvar::LatencyRecorder g_remote_fetch_tablet_rowsets_latency("remote_fetch_rowsets"); + +[[nodiscard]] Result> BaseTablet::capture_consistent_versions_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const { + std::vector version_path; + auto st = + _timestamped_version_tracker.capture_consistent_versions(version_range, &version_path); + if (!st && !options.quiet) { + auto missed_versions = get_missed_versions_unlocked(version_range.second); + if (missed_versions.empty()) { + LOG(WARNING) << fmt::format( + "version already has been merged. version_range={}, max_version={}, " + "tablet_id={}", + version_range.to_string(), _tablet_meta->max_version().second, tablet_id()); + return ResultError(Status::Error( + "missed versions is empty, version_range={}, max_version={}, tablet_id={}", + version_range.to_string(), _tablet_meta->max_version().second, tablet_id())); + } + LOG(WARNING) << fmt::format("missed version for version_range={}, tablet_id={}, st={}", + version_range.to_string(), tablet_id(), st); + _print_missed_versions(missed_versions); + if (!options.skip_missing_versions) { + return ResultError(std::move(st)); + } + LOG(WARNING) << "force skipping missing version for tablet:" << tablet_id(); + } + DBUG_EXECUTE_IF("Tablet::capture_consistent_versions.inject_failure", { + auto tablet_id = dp->param("tablet_id", -1); + auto skip_by_option = dp->param("skip_by_option", false); + if (skip_by_option && !options.enable_fetch_rowsets_from_peers) { + return version_path; + } + if ((tablet_id != -1 && (tablet_id == _tablet_meta->tablet_id())) || tablet_id == -2) { + return ResultError(Status::Error("version already merged")); + } + }); + return version_path; +} + +[[nodiscard]] Result BaseTablet::capture_consistent_rowsets_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const { + CaptureRowsetResult result; + auto& rowsets = result.rowsets; + auto maybe_versions = capture_consistent_versions_unlocked(version_range, options); + if (maybe_versions) { + const auto& version_paths = maybe_versions.value(); + rowsets.reserve(version_paths.size()); + + auto rowset_for_version = [&](const Version& version, + bool include_stale) -> Result { + if (auto it = _rs_version_map.find(version); it != _rs_version_map.end()) { + return it->second; + } else { + VLOG_NOTICE << "fail to find Rowset in rs_version for version. tablet=" + << tablet_id() << ", version='" << version.first << "-" + << version.second; + } + if (include_stale) { + if (auto it = _stale_rs_version_map.find(version); + it != _stale_rs_version_map.end()) { + return it->second; + } else { + LOG(WARNING) << fmt::format( + "fail to find Rowset in stale_rs_version for version. tablet={}, " + "version={}-{}", + tablet_id(), version.first, version.second); + } + } + return ResultError(Status::Error( + "failed to find rowset for version={}", version.to_string())); + }; + + for (const auto& version : version_paths) { + auto ret = rowset_for_version(version, options.include_stale_rowsets); + if (!ret) { + return ResultError(std::move(ret.error())); + } + + rowsets.push_back(std::move(ret.value())); + } + if (keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write()) { + result.delete_bitmap = _tablet_meta->delete_bitmap_ptr(); + } + return result; + } + + if (!config::is_cloud_mode() || !options.enable_fetch_rowsets_from_peers) { + return ResultError(std::move(maybe_versions.error())); + } + auto ret = _remote_capture_rowsets(version_range); + if (!ret) { + auto st = Status::Error( + "version already merged, meet error during remote capturing rowsets, " + "error={}, version_range={}", + ret.error().to_string(), version_range.to_string()); + return ResultError(std::move(st)); + } + return ret; +} + +[[nodiscard]] Result> BaseTablet::capture_rs_readers_unlocked( + const Version& version_range, const CaptureRowsetOps& options) const { + auto maybe_rs_list = capture_consistent_rowsets_unlocked(version_range, options); + if (!maybe_rs_list) { + return ResultError(std::move(maybe_rs_list.error())); + } + const auto& rs_list = maybe_rs_list.value().rowsets; + std::vector rs_splits; + rs_splits.reserve(rs_list.size()); + for (const auto& rs : rs_list) { + RowsetReaderSharedPtr rs_reader; + auto st = rs->create_reader(&rs_reader); + if (!st) { + return ResultError(Status::Error( + "failed to create reader for rowset={}, reason={}", rs->rowset_id().to_string(), + st.to_string())); + } + rs_splits.emplace_back(std::move(rs_reader)); + } + return rs_splits; +} + +[[nodiscard]] Result BaseTablet::capture_read_source( + const Version& version_range, const CaptureRowsetOps& options) { + std::shared_lock rdlock(get_header_lock()); + auto maybe_result = capture_consistent_rowsets_unlocked(version_range, options); + if (!maybe_result) { + return ResultError(std::move(maybe_result.error())); + } + auto rowsets_result = std::move(maybe_result.value()); + TabletReadSource read_source; + read_source.delete_bitmap = std::move(rowsets_result.delete_bitmap); + const auto& rowsets = rowsets_result.rowsets; + read_source.rs_splits.reserve(rowsets.size()); + for (const auto& rs : rowsets) { + RowsetReaderSharedPtr rs_reader; + auto st = rs->create_reader(&rs_reader); + if (!st) { + return ResultError(Status::Error( + "failed to create reader for rowset={}, reason={}", rs->rowset_id().to_string(), + st.to_string())); + } + read_source.rs_splits.emplace_back(std::move(rs_reader)); + } + return read_source; +} + +template +bool call_bthread(bthread_t& th, const bthread_attr_t* attr, Fn&& fn, Args&&... args) { + auto p_wrap_fn = new auto([=] { fn(args...); }); + auto call_back = [](void* ar) -> void* { + auto f = reinterpret_cast(ar); + (*f)(); + delete f; + return nullptr; + }; + return bthread_start_background(&th, attr, call_back, p_wrap_fn) == 0; +} + +struct GetRowsetsCntl : std::enable_shared_from_this { + struct RemoteGetRowsetResult { + std::vector rowsets; + std::unique_ptr delete_bitmap; + }; + + Status start_req_bg() { + task_cnt = req_addrs.size(); + for (const auto& [ip, port] : req_addrs) { + bthread_t tid; + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + + bool succ = call_bthread(tid, &attr, [self = shared_from_this(), &ip, port]() { + LOG(INFO) << "start to get tablet rowsets from peer BE, ip=" << ip; + Defer defer_log {[&ip, port]() { + LOG(INFO) << "finish to get rowsets from peer BE, ip=" << ip + << ", port=" << port; + }}; + + PGetTabletRowsetsRequest req; + req.set_tablet_id(self->tablet_id); + req.set_version_start(self->version_range.first); + req.set_version_end(self->version_range.second); + if (self->delete_bitmap_keys.has_value()) { + req.mutable_delete_bitmap_keys()->CopyFrom(self->delete_bitmap_keys.value()); + } + brpc::Controller cntl; + cntl.set_timeout_ms(60000); + cntl.set_max_retry(3); + PGetTabletRowsetsResponse response; + auto start_tm_us = MonotonicMicros(); +#ifndef BE_TEST + std::shared_ptr stub = + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(ip, port); + if (stub == nullptr) { + self->result = ResultError(Status::InternalError( + "failed to fetch get_tablet_rowsets stub, ip={}, port={}", ip, port)); + return; + } + stub->get_tablet_rowsets(&cntl, &req, &response, nullptr); +#else + TEST_SYNC_POINT_CALLBACK("get_tablet_rowsets", &response); +#endif + g_remote_fetch_tablet_rowsets_single_request_latency + << MonotonicMicros() - start_tm_us; + + std::unique_lock l(self->butex); + if (self->done) { + return; + } + --self->task_cnt; + auto resp_st = Status::create(response.status()); + DBUG_EXECUTE_IF("GetRowsetCntl::start_req_bg.inject_failure", + { resp_st = Status::InternalError("inject error"); }); + if (cntl.Failed() || !resp_st) { + if (self->task_cnt != 0) { + return; + } + std::stringstream reason; + reason << "failed to get rowsets from all replicas, tablet_id=" + << self->tablet_id; + if (cntl.Failed()) { + reason << ", reason=[" << cntl.ErrorCode() << "] " << cntl.ErrorText(); + } else { + reason << ", reason=" << resp_st.to_string(); + } + self->result = ResultError(Status::InternalError(reason.str())); + self->done = true; + self->event.signal(); + return; + } + + Defer done_cb {[&]() { + self->done = true; + self->event.signal(); + }}; + std::vector rs_metas; + for (auto&& rs_pb : response.rowsets()) { + auto rs_meta = std::make_shared(); + if (!rs_meta->init_from_pb(rs_pb)) { + self->result = + ResultError(Status::InternalError("failed to init rowset from pb")); + return; + } + rs_metas.push_back(std::move(rs_meta)); + } + CaptureRowsetResult result; + self->result->rowsets = std::move(rs_metas); + + if (response.has_delete_bitmap()) { + self->result->delete_bitmap = std::make_unique( + DeleteBitmap::from_pb(response.delete_bitmap(), self->tablet_id)); + } + }); + + if (!succ) { + return Status::InternalError( + "failed to create bthread when request rowsets for tablet={}", tablet_id); + } + } + return Status::OK(); + } + + Result wait_for_ret() { + event.wait(); + return std::move(result); + } + + int64_t tablet_id; + std::vector> req_addrs; + Version version_range; + std::optional delete_bitmap_keys = std::nullopt; + +private: + size_t task_cnt; + + bthread::Mutex butex; + bthread::CountdownEvent event {1}; + bool done = false; + + Result result; +}; + +Result>> get_peer_replicas_addresses( + const int64_t tablet_id) { + auto* cluster_info = ExecEnv::GetInstance()->cluster_info(); + DCHECK_NE(cluster_info, nullptr); + auto master_addr = cluster_info->master_fe_addr; + TGetTabletReplicaInfosRequest req; + req.tablet_ids.push_back(tablet_id); + TGetTabletReplicaInfosResult resp; + auto st = ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&](FrontendServiceConnection& client) { client->getTabletReplicaInfos(resp, req); }); + if (!st) { + return ResultError(Status::InternalError( + "failed to get tablet replica infos, rpc error={}, tablet_id={}", st.to_string(), + tablet_id)); + } + + auto it = resp.tablet_replica_infos.find(tablet_id); + if (it == resp.tablet_replica_infos.end()) { + return ResultError(Status::InternalError("replicas not found, tablet_id={}", tablet_id)); + } + auto replicas = it->second; + auto local_host = BackendOptions::get_localhost(); + bool include_local_host = false; + DBUG_EXECUTE_IF("get_peer_replicas_address.enable_local_host", { include_local_host = true; }); + auto ret_view = + replicas | std::views::filter([&local_host, include_local_host](const auto& replica) { + return local_host.find(replica.host) == std::string::npos || include_local_host; + }) | + std::views::transform([](auto& replica) { + return std::make_pair(std::move(replica.host), replica.brpc_port); + }); + return std::vector(ret_view.begin(), ret_view.end()); +} + +Result BaseTablet::_remote_capture_rowsets( + const Version& version_range) const { + auto start_tm_us = MonotonicMicros(); + Defer defer { + [&]() { g_remote_fetch_tablet_rowsets_latency << MonotonicMicros() - start_tm_us; }}; +#ifndef BE_TEST + auto maybe_be_addresses = get_peer_replicas_addresses(tablet_id()); +#else + Result>> maybe_be_addresses; + TEST_SYNC_POINT_CALLBACK("get_peer_replicas_addresses", &maybe_be_addresses); +#endif + DBUG_EXECUTE_IF("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail", + { maybe_be_addresses = ResultError(Status::InternalError("inject failure")); }); + if (!maybe_be_addresses) { + return ResultError(std::move(maybe_be_addresses.error())); + } + auto be_addresses = std::move(maybe_be_addresses.value()); + if (be_addresses.empty()) { + LOG(WARNING) << "no peers replica for tablet=" << tablet_id(); + return ResultError(Status::InternalError("no replicas for tablet={}", tablet_id())); + } + + auto cntl = std::make_shared(); + cntl->tablet_id = tablet_id(); + cntl->req_addrs = std::move(be_addresses); + cntl->version_range = version_range; + bool is_mow = keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write(); + CaptureRowsetResult result; + if (is_mow) { + result.delete_bitmap = + std::make_unique(_tablet_meta->delete_bitmap().snapshot()); + DeleteBitmapPB delete_bitmap_keys; + auto keyset = result.delete_bitmap->delete_bitmap | + std::views::transform([](const auto& kv) { return kv.first; }); + for (const auto& key : keyset) { + const auto& [rs_id, seg_id, version] = key; + delete_bitmap_keys.mutable_rowset_ids()->Add(rs_id.to_string()); + delete_bitmap_keys.mutable_segment_ids()->Add(seg_id); + delete_bitmap_keys.mutable_versions()->Add(version); + } + cntl->delete_bitmap_keys = std::move(delete_bitmap_keys); + } + + RETURN_IF_ERROR_RESULT(cntl->start_req_bg()); + auto maybe_meta = cntl->wait_for_ret(); + if (!maybe_meta) { + auto err = Status::InternalError( + "tried to get rowsets from peer replicas and failed, " + "reason={}", + maybe_meta.error()); + return ResultError(std::move(err)); + } + + auto& remote_meta = maybe_meta.value(); + const auto& rs_metas = remote_meta.rowsets; + for (const auto& rs_meta : rs_metas) { + RowsetSharedPtr rs; + auto st = RowsetFactory::create_rowset(_tablet_meta->tablet_schema(), {}, rs_meta, &rs); + if (!st) { + return ResultError(std::move(st)); + } + result.rowsets.push_back(std::move(rs)); + } + if (is_mow) { + DCHECK_NE(result.delete_bitmap, nullptr); + result.delete_bitmap->merge(*remote_meta.delete_bitmap); + } + return result; +} + +} // namespace doris diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 427badee862f40..cc1879fce88b43 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1020,7 +1020,7 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS; reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; - reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap(); + reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap_ptr(); reader_context.version = Version(0, end_version); if (!_base_tablet_schema->cluster_key_uids().empty()) { for (const auto& uid : _base_tablet_schema->cluster_key_uids()) { @@ -1144,9 +1144,8 @@ Status SchemaChangeJob::_get_versions_to_be_changed(std::vector* versio } *max_rowset = rowset; - RETURN_IF_ERROR(_base_tablet->capture_consistent_versions_unlocked( - Version(0, rowset->version().second), versions_to_be_changed, false, false)); - + *versions_to_be_changed = DORIS_TRY(_base_tablet->capture_consistent_versions_unlocked( + Version(0, rowset->version().second), {})); return Status::OK(); } @@ -1559,8 +1558,9 @@ Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version) << "double write rowsets for version: " << alter_version + 1 << "-" << max_version << " new_tablet=" << _new_tablet->tablet_id(); std::shared_lock rlock(_new_tablet->get_header_lock()); - RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked( - {alter_version + 1, max_version}, &rowsets)); + auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked( + {alter_version + 1, max_version}, CaptureRowsetOps {})); + rowsets = std::move(ret.rowsets); } for (auto rowset_ptr : rowsets) { std::lock_guard rwlock(_new_tablet->get_rowset_update_lock()); @@ -1578,8 +1578,9 @@ Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version) LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of " << "incremental rowsets for version: " << max_version + 1 << "-" << new_max_version << " new_tablet=" << _new_tablet->tablet_id(); - RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked( - {max_version + 1, new_max_version}, &rowsets)); + auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked( + {max_version + 1, new_max_version}, CaptureRowsetOps {})); + rowsets = std::move(ret.rowsets); } for (auto&& rowset_ptr : rowsets) { RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet, rowset_ptr)); diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index dad9e22c0578ed..899a4bef265dd2 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -568,13 +568,22 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet res = check_version_continuity(consistent_rowsets); if (res.ok() && max_cooldowned_version < version) { // Pick consistent rowsets of remaining required version - res = ref_tablet->capture_consistent_rowsets_unlocked( - {max_cooldowned_version + 1, version}, &consistent_rowsets); + auto ret = ref_tablet->capture_consistent_rowsets_unlocked( + {max_cooldowned_version + 1, version}, CaptureRowsetOps {}); + if (ret) { + consistent_rowsets = std::move(ret->rowsets); + } else { + res = std::move(ret.error()); + } } } else { - // get shortest version path - res = ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version), - &consistent_rowsets); + auto ret = ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version), + CaptureRowsetOps {}); + if (ret) { + consistent_rowsets = std::move(ret->rowsets); + } else { + res = std::move(ret.error()); + } } if (!res.ok()) { LOG(WARNING) << "fail to select versions to span. res=" << res; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 69791626ab8a1b..de64b3a70cd8f8 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -395,12 +395,14 @@ Status Tablet::revise_tablet_meta(const std::vector& to_add, } if (calc_delete_bitmap_ver.first <= calc_delete_bitmap_ver.second) { - calc_bm_status = capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver, - &calc_delete_bitmap_rowsets); - if (!calc_bm_status.ok()) { - LOG(WARNING) << "fail to capture_consistent_rowsets, res: " << calc_bm_status; + auto ret = capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver, + CaptureRowsetOps {}); + if (!ret) { + LOG(WARNING) << "fail to capture_consistent_rowsets, res: " << ret.error(); + calc_bm_status = std::move(ret.error()); break; } + calc_delete_bitmap_rowsets = std::move(ret->rowsets); // FIXME(plat1ko): Use `const TabletSharedPtr&` as parameter auto self = _engine.tablet_manager()->get_tablet(tablet_id()); CHECK(self); @@ -455,17 +457,16 @@ Status Tablet::revise_tablet_meta(const std::vector& to_add, // that we can capture by version if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) { Version full_version = Version(0, max_version_unlocked()); - std::vector expected_rowsets; - auto st = capture_consistent_rowsets_unlocked(full_version, &expected_rowsets); - DCHECK(st.ok()) << st; - DCHECK_EQ(base_rowsets_for_full_clone.size(), expected_rowsets.size()); - if (st.ok() && base_rowsets_for_full_clone.size() != expected_rowsets.size()) - [[unlikely]] { + auto ret = capture_consistent_rowsets_unlocked(full_version, CaptureRowsetOps {}); + DCHECK(ret) << ret.error(); + DCHECK_EQ(base_rowsets_for_full_clone.size(), ret->rowsets.size()); + + if (ret && base_rowsets_for_full_clone.size() != ret->rowsets.size()) [[unlikely]] { LOG(WARNING) << "full clone succeeded, but the count(" << base_rowsets_for_full_clone.size() << ") of base rowsets used for delete bitmap calculation is not match " "expect count(" - << expected_rowsets.size() << ") we capture from tablet meta"; + << ret->rowsets.size() << ") we capture from tablet meta"; } } } @@ -552,7 +553,8 @@ Status Tablet::modify_rowsets(std::vector& to_add, tablet_id()); } else if (rs->rowset_id() != it->second->rowset_id()) { return Status::Error( - "try to delete version {} from {}, but rowset id changed, delete rowset id " + "try to delete version {} from {}, but rowset id changed, delete " + "rowset id " "is {}, exists rowsetid is {}", rs->version().to_string(), tablet_id(), rs->rowset_id().to_string(), it->second->rowset_id().to_string()); @@ -773,10 +775,9 @@ void Tablet::delete_expired_stale_rowset() { Version test_version = Version(0, lastest_delta->end_version()); stale_version_path_map[*path_id_iter] = version_path; - Status status = - capture_consistent_versions_unlocked(test_version, nullptr, false, false); + auto ret = capture_consistent_versions_unlocked(test_version, {}); // 1. When there is no consistent versions, we must reconstruct the tracker. - if (!status.ok()) { + if (!ret) { // 2. fetch missing version after delete Versions after_missed_versions = get_missed_versions_unlocked(lastest_delta->end_version()); @@ -923,51 +924,11 @@ void Tablet::delete_expired_stale_rowset() { { _engine.start_delete_unused_rowset(); }); } -Status Tablet::capture_consistent_versions_unlocked(const Version& spec_version, - Versions* version_path, - bool skip_missing_version, bool quiet) const { - Status status = - _timestamped_version_tracker.capture_consistent_versions(spec_version, version_path); - if (!status.ok() && !quiet) { - Versions missed_versions = get_missed_versions_unlocked(spec_version.second); - if (missed_versions.empty()) { - // if version_path is null, it may be a compaction check logic. - // so to avoid print too many logs. - if (version_path != nullptr) { - LOG(WARNING) << "tablet:" << tablet_id() - << ", version already has been merged. spec_version: " << spec_version - << ", max_version: " << max_version_unlocked(); - } - status = Status::Error( - "versions are already compacted, spec_version " - "{}, max_version {}, tablet_id {}", - spec_version.second, max_version_unlocked(), tablet_id()); - } else { - if (version_path != nullptr) { - LOG(WARNING) << "status:" << status << ", tablet:" << tablet_id() - << ", missed version for version:" << spec_version; - _print_missed_versions(missed_versions); - if (skip_missing_version) { - LOG(WARNING) << "force skipping missing version for tablet:" << tablet_id(); - return Status::OK(); - } - } - } - } - - DBUG_EXECUTE_IF("TTablet::capture_consistent_versions.inject_failure", { - auto tablet_id = dp->param("tablet_id", -1); - if (tablet_id != -1 && tablet_id == _tablet_meta->tablet_id()) { - status = Status::Error("version already merged"); - } - }); - - return status; -} - Status Tablet::check_version_integrity(const Version& version, bool quiet) { std::shared_lock rdlock(_meta_lock); - return capture_consistent_versions_unlocked(version, nullptr, false, quiet); + [[maybe_unused]] auto _versions = DORIS_TRY( + capture_consistent_versions_unlocked(version, CaptureRowsetOps {.quiet = quiet})); + return Status::OK(); } bool Tablet::exceed_version_limit(int32_t limit) { @@ -997,22 +958,12 @@ void Tablet::acquire_version_and_rowsets( } } -Status Tablet::capture_consistent_rowsets_unlocked(const Version& spec_version, - std::vector* rowsets) const { - std::vector version_path; - RETURN_IF_ERROR( - capture_consistent_versions_unlocked(spec_version, &version_path, false, false)); - RETURN_IF_ERROR(_capture_consistent_rowsets_unlocked(version_path, rowsets)); - return Status::OK(); -} - Status Tablet::capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - const CaptureRsReaderOptions& opts) { + const CaptureRowsetOps& opts) { std::shared_lock rlock(_meta_lock); std::vector version_path; - RETURN_IF_ERROR(capture_consistent_versions_unlocked(spec_version, &version_path, - opts.skip_missing_version, false)); - RETURN_IF_ERROR(capture_rs_readers_unlocked(version_path, rs_splits)); + *rs_splits = DORIS_TRY(capture_rs_readers_unlocked( + spec_version, CaptureRowsetOps {.skip_missing_versions = opts.skip_missing_versions})); return Status::OK(); } @@ -2065,7 +2016,8 @@ Status Tablet::cooldown(RowsetSharedPtr rowset) { std::unique_lock schema_change_lock(_schema_change_lock, std::try_to_lock); if (!schema_change_lock.owns_lock()) { return Status::Error( - "try schema_change_lock failed, schema change running or inverted index built on " + "try schema_change_lock failed, schema change running or inverted index built " + "on " "this tablet={}", tablet_id()); } @@ -2353,7 +2305,8 @@ Status Tablet::_follow_cooldowned_data() { } } else if (!rs->is_local()) { return Status::InternalError( - "cooldowned version larger than that to follow with cooldown version {}", + "cooldowned version larger than that to follow with cooldown version " + "{}", cooldowned_version); } } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 1b7e809eec3f94..372c141cc5c208 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -182,24 +182,15 @@ class Tablet final : public BaseTablet { /// need to delete flag. void delete_expired_stale_rowset(); - // Given spec_version, find a continuous version path and store it in version_path. - // If quiet is true, then only "does this path exist" is returned. - // If skip_missing_version is true, return ok even there are missing versions. - Status capture_consistent_versions_unlocked(const Version& spec_version, Versions* version_path, - bool skip_missing_version, bool quiet) const; - // if quiet is true, no error log will be printed if there are missing versions Status check_version_integrity(const Version& version, bool quiet = false); bool check_version_exist(const Version& version) const; void acquire_version_and_rowsets( std::vector>* version_rowsets) const; - Status capture_consistent_rowsets_unlocked( - const Version& spec_version, std::vector* rowsets) const override; - - // If opts.skip_missing_version is true, skip versions if they are missing. + // If skip_missing_version is true, skip versions if they are missing. Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - const CaptureRsReaderOptions& opts) override; + const CaptureRowsetOps& opts) override; // Find the missed versions until the spec_version. // diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 652b8bf6c25927..dc589b9a9925f5 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -1254,6 +1254,35 @@ DeleteBitmap& DeleteBitmap::operator=(DeleteBitmap&& o) noexcept { return *this; } +DeleteBitmap DeleteBitmap::from_pb(const DeleteBitmapPB& pb, int64_t tablet_id) { + size_t len = pb.rowset_ids().size(); + DCHECK_EQ(len, pb.segment_ids().size()); + DCHECK_EQ(len, pb.versions().size()); + DeleteBitmap delete_bitmap(tablet_id); + for (int32_t i = 0; i < len; ++i) { + RowsetId rs_id; + rs_id.init(pb.rowset_ids(i)); + BitmapKey key = {rs_id, pb.segment_ids(i), pb.versions(i)}; + delete_bitmap.delete_bitmap[key] = + roaring::Roaring::read(pb.segment_delete_bitmaps(i).data()); + } + return delete_bitmap; +} + +DeleteBitmapPB DeleteBitmap::to_pb() { + std::shared_lock l(lock); + DeleteBitmapPB ret; + for (const auto& [k, v] : delete_bitmap) { + ret.mutable_rowset_ids()->Add(std::get<0>(k).to_string()); + ret.mutable_segment_ids()->Add(std::get<1>(k)); + ret.mutable_versions()->Add(std::get<2>(k)); + std::string bitmap_data(v.getSizeInBytes(), '\0'); + v.write(bitmap_data.data()); + ret.mutable_segment_delete_bitmaps()->Add(std::move(bitmap_data)); + } + return ret; +} + DeleteBitmap DeleteBitmap::snapshot() const { std::shared_lock l(lock); return DeleteBitmap(*this); @@ -1711,6 +1740,22 @@ std::shared_ptr DeleteBitmap::get_agg_without_cache( return bitmap; } +DeleteBitmap DeleteBitmap::diffset(const std::set& key_set) const { + std::shared_lock l(lock); + auto diff_key_set_view = + delete_bitmap | std::ranges::views::transform([](const auto& kv) { return kv.first; }) | + std::ranges::views::filter( + [&key_set](const auto& key) { return !key_set.contains(key); }); + + DeleteBitmap dbm(_tablet_id); + for (const auto& key : diff_key_set_view) { + const auto* bitmap = get(key); + DCHECK_NE(bitmap, nullptr); + dbm.delete_bitmap[key] = *bitmap; + } + return dbm; +} + std::string tablet_state_name(TabletState state) { switch (state) { case TABLET_NOTREADY: diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index c86d9ad553a6f5..dcefc309a44da6 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -444,6 +444,10 @@ class DeleteBitmap { DeleteBitmap(DeleteBitmap&& r) noexcept; DeleteBitmap& operator=(DeleteBitmap&& r) noexcept; + static DeleteBitmap from_pb(const DeleteBitmapPB& pb, int64_t tablet_id); + + DeleteBitmapPB to_pb(); + /** * Makes a snapshot of delete bitmap, read lock will be acquired in this * process @@ -608,6 +612,14 @@ class DeleteBitmap { void set_tablet_id(int64_t tablet_id); + /** + * Calculate diffset with given `key_set`. All entries with keys contained in this delete bitmap but not + * in given key_set will be added to the output delete bitmap. + * + * @return Deletebitmap containning all entries in diffset + */ + DeleteBitmap diffset(const std::set& key_set) const; + private: DeleteBitmap::Version _get_rowset_cache_version(const BitmapKey& bmk) const; diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index 7eb53414ddbfd5..27668d93fef81c 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -102,16 +102,6 @@ std::string TabletReader::KeysParam::to_string() const { return ss.str(); } -void TabletReader::ReadSource::fill_delete_predicates() { - DCHECK_EQ(delete_predicates.size(), 0); - for (auto&& split : rs_splits) { - auto& rs_meta = split.rs_reader->rowset()->rowset_meta(); - if (rs_meta->has_delete_predicate()) { - delete_predicates.push_back(rs_meta); - } - } -} - TabletReader::~TabletReader() { for (auto* pred : _col_predicates) { delete pred; @@ -680,7 +670,7 @@ Status TabletReader::init_reader_params_and_create_block( reader_params->version = Version(input_rowsets.front()->start_version(), input_rowsets.back()->end_version()); - ReadSource read_source; + TabletReadSource read_source; for (const auto& rowset : input_rowsets) { RowsetReaderSharedPtr rs_reader; RETURN_IF_ERROR(rowset->create_reader(&rs_reader)); @@ -702,9 +692,6 @@ Status TabletReader::init_reader_params_and_create_block( merge_tablet_schema->merge_dropped_columns(*del_pred->tablet_schema()); } reader_params->tablet_schema = merge_tablet_schema; - if (tablet->enable_unique_key_merge_on_write()) { - reader_params->delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); - } reader_params->return_columns.resize(read_tablet_schema->num_columns()); std::iota(reader_params->return_columns.begin(), reader_params->return_columns.end(), 0); diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index 81fb03ef7b2411..a8179f31773aed 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -33,6 +33,7 @@ #include "common/status.h" #include "exprs/function_filter.h" #include "io/io_common.h" +#include "olap/base_tablet.h" #include "olap/delete_handler.h" #include "olap/filter_olap_param.h" #include "olap/iterators.h" @@ -91,12 +92,6 @@ class TabletReader { }; public: - struct ReadSource { - std::vector rs_splits; - std::vector delete_predicates; - // Fill delete predicates with `rs_splits` - void fill_delete_predicates(); - }; // Params for Reader, // mainly include tablet, data version and fetch range. struct ReaderParams { @@ -117,9 +112,14 @@ class TabletReader { return BeExecVersionManager::get_newest_version(); } - void set_read_source(ReadSource read_source) { + void set_read_source(TabletReadSource read_source, bool skip_delete_bitmap = false) { rs_splits = std::move(read_source.rs_splits); delete_predicates = std::move(read_source.delete_predicates); +#ifndef BE_TEST + if (tablet->enable_unique_key_merge_on_write() && !skip_delete_bitmap) { + delete_bitmap = std::move(read_source.delete_bitmap); + } +#endif } BaseTabletSPtr tablet; @@ -148,7 +148,7 @@ class TabletReader { std::vector rs_splits; // For unique key table with merge-on-write - DeleteBitmap* delete_bitmap = nullptr; + DeleteBitmapPtr delete_bitmap = nullptr; // return_columns is init from query schema std::vector return_columns; diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index 05ecfc0401b6d0..9dbaeabad30d70 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -81,13 +81,15 @@ Status EngineChecksumTask::_compute_checksum() { vectorized::Block block; { std::shared_lock rdlock(tablet->get_header_lock()); - Status acquire_reader_st = - tablet->capture_consistent_rowsets_unlocked(version, &input_rowsets); - if (!acquire_reader_st.ok()) { + auto ret = tablet->capture_consistent_rowsets_unlocked(version, CaptureRowsetOps {}); + if (ret) { + input_rowsets = std::move(ret->rowsets); + } else { LOG(WARNING) << "fail to captute consistent rowsets. tablet=" << tablet->tablet_id() - << "res=" << acquire_reader_st; - return acquire_reader_st; + << "res=" << ret.error(); + return std::move(ret.error()); } + RETURN_IF_ERROR(TabletReader::init_reader_params_and_create_block( tablet, ReaderType::READER_CHECKSUM, input_rowsets, &reader_params, &block)); } diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index e7935eae55ff77..497837a9f6bddd 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -33,6 +33,7 @@ #include "common/config.h" #include "common/logging.h" #include "io/fs/local_file_system.h" +#include "olap/base_tablet.h" #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/olap_define.h" @@ -87,8 +88,10 @@ Status EngineStorageMigrationTask::_get_versions(int64_t start_version, int64_t* << ", start_version=" << start_version << ", end_version=" << *end_version; return Status::OK(); } - return _tablet->capture_consistent_rowsets_unlocked(Version(start_version, *end_version), - consistent_rowsets); + auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked( + Version(start_version, *end_version), CaptureRowsetOps {})); + *consistent_rowsets = std::move(ret.rowsets); + return Status::OK(); } bool EngineStorageMigrationTask::_is_timeout() { diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 83f0abb498eb94..b8805897f54af4 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -715,16 +715,15 @@ Status OlapScanLocalState::prepare(RuntimeState* state) { } } - CaptureRsReaderOptions opts { - .skip_missing_version = _state->skip_missing_version(), - .enable_prefer_cached_rowset = - config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() : false, - .query_freshness_tolerance_ms = - config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() : -1, - }; for (size_t i = 0; i < _scan_ranges.size(); i++) { - RETURN_IF_ERROR(_tablets[i].tablet->capture_rs_readers({0, _tablets[i].version}, - &_read_sources[i].rs_splits, opts)); + _read_sources[i] = DORIS_TRY(_tablets[i].tablet->capture_read_source( + {0, _tablets[i].version}, + {.skip_missing_versions = _state->skip_missing_version(), + .enable_fetch_rowsets_from_peers = config::enable_fetch_rowsets_from_peer_replicas, + .enable_prefer_cached_rowset = + config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() : false, + .query_freshness_tolerance_ms = + config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() : -1})); if (!PipelineXLocalState<>::_state->skip_delete_predicate()) { _read_sources[i].fill_delete_predicates(); } diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index f3b37c7fc89ac5..2db857b6e15d40 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -286,7 +286,7 @@ class OlapScanLocalState final : public ScanLocalState { RuntimeProfile::Counter* _variant_subtree_sparse_iter_count = nullptr; std::vector _tablets; - std::vector _read_sources; + std::vector _read_sources; std::map _slot_id_to_virtual_column_expr; std::map _slot_id_to_index_in_block; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 2ff23ff97d0ddb..daf45179f79914 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -54,6 +54,9 @@ #include #include +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet_mgr.h" +#include "cloud/config.h" #include "common/config.h" #include "common/exception.h" #include "common/logging.h" @@ -153,6 +156,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit: DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads, MetricUnit::NOUNIT); +static bvar::LatencyRecorder g_process_remote_fetch_rowsets_latency("process_remote_fetch_rowsets"); + bthread_key_t btls_key; static void thread_context_deleter(void* d) { @@ -2318,5 +2323,68 @@ void PInternalService::abort_refresh_dictionary(google::protobuf::RpcController* request->version_id()); st.to_protobuf(response->mutable_status()); } + +void PInternalService::get_tablet_rowsets(google::protobuf::RpcController* controller, + const PGetTabletRowsetsRequest* request, + PGetTabletRowsetsResponse* response, + google::protobuf::Closure* done) { + DCHECK(config::is_cloud_mode()); + auto start_time = GetMonoTimeMicros(); + Defer defer { + [&]() { g_process_remote_fetch_rowsets_latency << GetMonoTimeMicros() - start_time; }}; + brpc::ClosureGuard closure_guard(done); + LOG(INFO) << "process get tablet rowsets, request=" << request->ShortDebugString(); + if (!request->has_tablet_id() || !request->has_version_start() || !request->has_version_end()) { + Status::InvalidArgument("missing params tablet/version_start/version_end") + .to_protobuf(response->mutable_status()); + return; + } + CloudStorageEngine& storage = ExecEnv::GetInstance()->storage_engine().to_cloud(); + + auto maybe_tablet = + storage.tablet_mgr().get_tablet(request->tablet_id(), /*warmup data*/ false, + /*syn_delete_bitmap*/ false, /*delete_bitmap*/ nullptr, + /*local_only*/ true); + if (!maybe_tablet) { + maybe_tablet.error().to_protobuf(response->mutable_status()); + return; + } + auto tablet = maybe_tablet.value(); + Result ret; + { + std::shared_lock l(tablet->get_header_lock()); + ret = tablet->capture_consistent_rowsets_unlocked( + {request->version_start(), request->version_end()}, + CaptureRowsetOps {.enable_fetch_rowsets_from_peers = false}); + } + if (!ret) { + ret.error().to_protobuf(response->mutable_status()); + return; + } + auto rowsets = std::move(ret.value().rowsets); + for (const auto& rs : rowsets) { + RowsetMetaPB meta; + rs->rowset_meta()->to_rowset_pb(&meta); + response->mutable_rowsets()->Add(std::move(meta)); + } + if (request->has_delete_bitmap_keys()) { + DCHECK(tablet->enable_unique_key_merge_on_write()); + auto delete_bitmap = std::move(ret.value().delete_bitmap); + auto keys_pb = request->delete_bitmap_keys(); + size_t len = keys_pb.rowset_ids().size(); + DCHECK_EQ(len, keys_pb.segment_ids().size()); + DCHECK_EQ(len, keys_pb.versions().size()); + std::set keys; + for (size_t i = 0; i < len; ++i) { + RowsetId rs_id; + rs_id.init(keys_pb.rowset_ids(i)); + keys.emplace(rs_id, keys_pb.segment_ids(i), keys_pb.versions(i)); + } + auto diffset = delete_bitmap->diffset(keys).to_pb(); + *response->mutable_delete_bitmap() = std::move(diffset); + } + Status::OK().to_protobuf(response->mutable_status()); +} + #include "common/compile_check_avoid_end.h" } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 5ebef074b960c0..d73501bfc80861 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -227,6 +227,11 @@ class PInternalService : public PBackendService { PAbortRefreshDictionaryResponse* response, google::protobuf::Closure* done) override; + void get_tablet_rowsets(google::protobuf::RpcController* controller, + const PGetTabletRowsetsRequest* request, + PGetTabletRowsetsResponse* response, + google::protobuf::Closure* done) override; + private: void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, diff --git a/be/src/vec/exec/scan/olap_scanner.cpp b/be/src/vec/exec/scan/olap_scanner.cpp index 0d9c0f0698fb27..afb9b546c49bf3 100644 --- a/be/src/vec/exec/scan/olap_scanner.cpp +++ b/be/src/vec/exec/scan/olap_scanner.cpp @@ -65,7 +65,7 @@ namespace doris::vectorized { #include "common/compile_check_avoid_begin.h" -using ReadSource = TabletReader::ReadSource; +using ReadSource = TabletReadSource; OlapScanner::OlapScanner(pipeline::ScanLocalStateBase* parent, OlapScanner::Params&& params) : Scanner(params.state, parent, params.limit, params.profile), @@ -97,7 +97,8 @@ OlapScanner::OlapScanner(pipeline::ScanLocalStateBase* parent, OlapScanner::Para .score_runtime {}, .collection_statistics {}, .ann_topn_runtime {}}) { - _tablet_reader_params.set_read_source(std::move(params.read_source)); + _tablet_reader_params.set_read_source(std::move(params.read_source), + _state->skip_delete_bitmap()); _has_prepared = false; _vector_search_params = params.state->get_vector_search_params(); } @@ -218,19 +219,26 @@ Status OlapScanner::prepare() { ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); } - CaptureRsReaderOptions opts { - .skip_missing_version = _state->skip_missing_version(), - .enable_prefer_cached_rowset = - config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() : false, - .query_freshness_tolerance_ms = - config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() : -1, - }; - auto st = tablet->capture_rs_readers(_tablet_reader_params.version, - &read_source.rs_splits, opts); - if (!st.ok()) { - LOG(WARNING) << "fail to init reader.res=" << st; - return st; + auto maybe_read_source = tablet->capture_read_source( + _tablet_reader_params.version, + { + .skip_missing_versions = _state->skip_missing_version(), + .enable_fetch_rowsets_from_peers = + config::enable_fetch_rowsets_from_peer_replicas, + .enable_prefer_cached_rowset = + config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() + : false, + .query_freshness_tolerance_ms = + config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() + : -1, + }); + if (!maybe_read_source) { + LOG(WARNING) << "fail to init reader. res=" << maybe_read_source.error(); + return maybe_read_source.error(); } + + read_source = std::move(maybe_read_source.value()); + if (config::enable_mow_verbose_log && tablet->enable_unique_key_merge_on_write()) { LOG_INFO("finish capture_rs_readers for tablet={}, query_id={}", tablet->tablet_id(), print_id(_state->query_id())); @@ -357,7 +365,6 @@ Status OlapScanner::_init_tablet_reader_params( std::inserter(_tablet_reader_params.function_filters, _tablet_reader_params.function_filters.begin())); - auto& tablet = _tablet_reader_params.tablet; auto& tablet_schema = _tablet_reader_params.tablet_schema; // Merge the columns in delete predicate that not in latest schema in to current tablet schema for (auto& del_pred : _tablet_reader_params.delete_predicates) { @@ -417,10 +424,6 @@ Status OlapScanner::_init_tablet_reader_params( _tablet_reader_params.use_page_cache = _state->enable_page_cache(); - if (tablet->enable_unique_key_merge_on_write() && !_state->skip_delete_bitmap()) { - _tablet_reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); - } - DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", DBUG_BLOCK); if (!_state->skip_storage_engine_merge()) { diff --git a/be/src/vec/exec/scan/olap_scanner.h b/be/src/vec/exec/scan/olap_scanner.h index 8f14889222b04c..27e09f298172f2 100644 --- a/be/src/vec/exec/scan/olap_scanner.h +++ b/be/src/vec/exec/scan/olap_scanner.h @@ -66,7 +66,7 @@ class OlapScanner : public Scanner { std::vector key_ranges; BaseTabletSPtr tablet; int64_t version; - TabletReader::ReadSource read_source; + TabletReadSource read_source; int64_t limit; bool aggregation; }; diff --git a/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp b/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp index 8d4d5a37bf7a49..464afb9fc6cf1f 100644 --- a/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp +++ b/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp @@ -130,12 +130,12 @@ class TestQueryPreferCache : public testing::Test { void check_capture_result(CloudTabletSPtr tablet, Version spec_version, const std::vector& expected_versions) { - std::vector rs_splits; - CaptureRsReaderOptions opts {.skip_missing_version = false, - .enable_prefer_cached_rowset = true, - .query_freshness_tolerance_ms = -1}; - auto st = tablet->capture_rs_readers(spec_version, &rs_splits, opts); - ASSERT_TRUE(st.ok()); + CaptureRowsetOps opts {.skip_missing_versions = false, + .enable_prefer_cached_rowset = true, + .query_freshness_tolerance_ms = -1}; + auto res = tablet->capture_read_source(spec_version, opts); + ASSERT_TRUE(res.has_value()); + std::vector rs_splits = std::move(res.value().rs_splits); auto dump_versions = [](const std::vector& expected_versions, const std::vector& splits) { std::vector expected_str; @@ -801,4 +801,4 @@ TEST_F(TestQueryPreferCache, testCapture_4_1) { check_capture_result(tablet, Version {0, 18}, expected_versions); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp b/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp index 5fe5c2e51bcc50..1a24ea275be007 100644 --- a/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp +++ b/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp @@ -128,12 +128,12 @@ class TestFreshnessTolerance : public testing::Test { void check_capture_result(CloudTabletSPtr tablet, Version spec_version, int64_t query_freshness_tolerance_ms, const std::vector& expected_versions) { - std::vector rs_splits; - CaptureRsReaderOptions opts {.skip_missing_version = false, - .enable_prefer_cached_rowset = false, - .query_freshness_tolerance_ms = query_freshness_tolerance_ms}; - auto st = tablet->capture_rs_readers(spec_version, &rs_splits, opts); - ASSERT_TRUE(st.ok()); + CaptureRowsetOps opts {.skip_missing_versions = false, + .enable_prefer_cached_rowset = false, + .query_freshness_tolerance_ms = query_freshness_tolerance_ms}; + auto res = tablet->capture_read_source(spec_version, opts); + ASSERT_TRUE(res.has_value()); + std::vector rs_splits = std::move(res.value().rs_splits); auto dump_versions = [](const std::vector& expected_versions, const std::vector& splits) { std::vector expected_str; diff --git a/be/test/olap/segcompaction_mow_test.cpp b/be/test/olap/segcompaction_mow_test.cpp index a7e3c9fd706741..1dc240d2adafd6 100644 --- a/be/test/olap/segcompaction_mow_test.cpp +++ b/be/test/olap/segcompaction_mow_test.cpp @@ -204,6 +204,7 @@ class SegCompactionMoWTest : public ::testing::TestWithParam { rowset_writer_context->tablet_schema = tablet_schema; rowset_writer_context->version.first = 10; rowset_writer_context->version.second = 10; + rowset_writer_context->enable_segcompaction = true; TabletMetaSharedPtr tablet_meta = std::make_shared(); tablet_meta->_tablet_id = TABLET_ID; @@ -238,7 +239,7 @@ class SegCompactionMoWTest : public ::testing::TestWithParam { std::vector return_columns = {0, 1, 2}; reader_context.return_columns = &return_columns; reader_context.stats = &_stats; - reader_context.delete_bitmap = delete_bitmap.get(); + reader_context.delete_bitmap = delete_bitmap; Status s; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index 7bd487e42c4aad..3fc7c652f589c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -613,4 +613,20 @@ public void checkAndClearSecondaryClusterToBe(String clusterId, long expireTimes return; } } + + public List getAllPrimaryBes() { + List result = new ArrayList(); + primaryClusterToBackends.keySet().forEach(clusterId -> { + List backendIds = primaryClusterToBackends.get(clusterId); + if (backendIds == null || backendIds.isEmpty()) { + return; + } + Long beId = backendIds.get(0); + if (beId != -1) { + Backend backend = Env.getCurrentSystemInfo().getBackend(beId); + result.add(backend); + } + }); + return result; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ee578b378ba633..dd3a6836246b48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2781,29 +2781,36 @@ public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfos LOG.warn("replica {} not normal", replica.getId()); continue; } - Backend backend; - if (Config.isCloudMode() && request.isSetWarmUpJobId()) { + List backends; + if (Config.isCloudMode()) { CloudReplica cloudReplica = (CloudReplica) replica; - // On the cloud, the PrimaryBackend of a tablet indicates the BE where the tablet is stably located, - // while the SecondBackend refers to a BE selected by a new hash when the PrimaryBackend - // is temporarily unavailable. Once the PrimaryBackend recovers, - // the system will switch back to using it. During the preheating phase, - // data needs to be synchronized downstream, which requires a stable BE, - // so the PrimaryBackend is used in this case. - backend = cloudReplica.getPrimaryBackend(clusterId, true); + if (!request.isSetWarmUpJobId()) { + backends = cloudReplica.getAllPrimaryBes(); + } else { + // On the cloud, the PrimaryBackend of a tablet + // indicates the BE where the tablet is stably located, + // while the SecondBackend refers to a BE selected by a new hash when the PrimaryBackend + // is temporarily unavailable. Once the PrimaryBackend recovers, + // the system will switch back to using it. During the preheating phase, + // data needs to be synchronized downstream, which requires a stable BE, + // so the PrimaryBackend is used in this case. + Backend backend = cloudReplica.getPrimaryBackend(clusterId, true); + backends = Lists.newArrayList(backend); + } } else { - backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException()); + Backend backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException()); + backends = Lists.newArrayList(backend); } - if (backend != null) { - TReplicaInfo replicaInfo = new TReplicaInfo(); - replicaInfo.setHost(backend.getHost()); - replicaInfo.setBePort(backend.getBePort()); - replicaInfo.setHttpPort(backend.getHttpPort()); - replicaInfo.setBrpcPort(backend.getBrpcPort()); - replicaInfo.setIsAlive(backend.isAlive()); - replicaInfo.setBackendId(backend.getId()); - replicaInfo.setReplicaId(replica.getId()); - replicaInfos.add(replicaInfo); + for (Backend backend : backends) { + if (backend != null) { + TReplicaInfo replicaInfo = new TReplicaInfo(); + replicaInfo.setHost(backend.getHost()); + replicaInfo.setBePort(backend.getBePort()); + replicaInfo.setHttpPort(backend.getHttpPort()); + replicaInfo.setBrpcPort(backend.getBrpcPort()); + replicaInfo.setReplicaId(replica.getId()); + replicaInfos.add(replicaInfo); + } } } tabletReplicaInfos.put(tabletId, replicaInfos); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 8bbc80adfbb70a..1ddfbcf2502442 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -1066,6 +1066,21 @@ message PAbortRefreshDictionaryResponse { optional PStatus status = 1; } +message PGetTabletRowsetsRequest { + optional int64 tablet_id = 1; + optional int64 version_start = 2; + optional int64 version_end = 3; + + optional DeleteBitmapPB delete_bitmap_keys = 4; +} + +message PGetTabletRowsetsResponse { + required PStatus status = 1; + repeated RowsetMetaPB rowsets = 2; + + optional DeleteBitmapPB delete_bitmap = 3; +} + service PBackendService { // If #fragments of a query is < 3, use exec_plan_fragment directly. // If #fragments of a query is >=3, use exec_plan_fragment_prepare + exec_plan_fragment_start @@ -1121,5 +1136,6 @@ service PBackendService { rpc delete_dictionary(PDeleteDictionaryRequest) returns (PDeleteDictionaryResponse); rpc commit_refresh_dictionary(PCommitRefreshDictionaryRequest) returns (PCommitRefreshDictionaryResponse); rpc abort_refresh_dictionary(PAbortRefreshDictionaryRequest) returns (PAbortRefreshDictionaryResponse); + rpc get_tablet_rowsets(PGetTabletRowsetsRequest) returns (PGetTabletRowsetsResponse); }; diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out new file mode 100644 index 00000000000000..78964812ebfb24 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out @@ -0,0 +1,15 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 + +-- !sql -- +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 + diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index aa533b0f89f88f..4c22d3bf86971c 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -90,3 +90,4 @@ enable_parquet_page_index=true enable_graceful_exit_check=true enable_prefill_all_dbm_agg_cache_after_compaction=true +enable_fetch_rowsets_from_peer_replicas = true diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy new file mode 100644 index 00000000000000..0dd1f92dc18274 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.apache.doris.regression.util.NodeType + + +suite("test_cloud_version_already_merged", "nonConcurrent") { + if (!isCloudMode()) { + return + } + def tblName = "test_cloud_version_already_merged" + sql """ DROP TABLE IF EXISTS ${tblName} FORCE; """ + sql """ + CREATE TABLE IF NOT EXISTS ${tblName} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); + """ + + + sql "insert into ${tblName} values(1,-1,-1,-1);" + sql "insert into ${tblName} values(2,-2,-2,-2);" + sql "insert into ${tblName} values(3,-3,-3,-3);" + sql "insert into ${tblName} values(4,-4,-4,-4)" + sql "insert into ${tblName} values(5,-5,-5,-5)" + sql "insert into ${tblName} values(1,1,1,1);" + sql "insert into ${tblName} values(2,2,2,2);" + sql "insert into ${tblName} values(3,3,3,3);" + sql "insert into ${tblName} values(4,4,4,4)" + sql "insert into ${tblName} values(5,5,5,5)" + + + + + sql "sync;" + qt_sql "select * from ${tblName} order by k1;" + + + def backends = sql_return_maparray('show backends') + def tabletStats = sql_return_maparray("show tablets from ${tblName};") + assert tabletStats.size() == 1 + def tabletId = tabletStats[0].TabletId + def tabletBackendId = tabletStats[0].BackendId + def tabletBackend + for (def be : backends) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + sleep(10000) + + try { + GetDebugPoint().enableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId]) + GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host") + + test { + sql """ SELECT * from ${tblName} ORDER BY k1 """ + exception "version already merged, meet error during remote capturing rowsets" + } + + + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + try { + GetDebugPoint().enableDebugPoint(tabletBackend.Host, tabletBackend.HttpPort as int, NodeType.BE, "Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId, skip_by_option: true]) + GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host") + GetDebugPoint().enableDebugPointForAllBEs("GetRowsetCntl::start_req_bg.inject_failure"); + + + test { + sql """ SELECT * from ${tblName} ORDER BY k1 """ + exception "version already merged, meet error during remote capturing rowsets" + } + + + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + + try { + GetDebugPoint().enableDebugPoint(tabletBackend.Host, tabletBackend.HttpPort as int, NodeType.BE, "Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId, skip_by_option: true]) + GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host") + GetDebugPoint().enableDebugPointForAllBEs("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail"); + + + test { + sql """ SELECT * from ${tblName} ORDER BY k1 """ + exception "version already merged, meet error during remote capturing rowsets" + } + + + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + try { + GetDebugPoint().enableDebugPoint(tabletBackend.Host, tabletBackend.HttpPort as int, NodeType.BE, "Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId, skip_by_option: true]) + GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host") + + qt_sql """ SELECT * from ${tblName} ORDER BY k1 """ + + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } +} +