diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index b9269adf1a7a95..723bcdb48faffd 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -154,10 +154,11 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { DeleteBitmapPtr delete_bitmap; RowsetIdUnorderedSet rowset_ids; std::shared_ptr partial_update_info; + std::shared_ptr publish_status; int64_t txn_expiration; Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info( _transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration, - &partial_update_info); + &partial_update_info, &publish_status); if (status != Status::OK()) { LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id << ", status=" << status; @@ -172,8 +173,16 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { txn_info.delete_bitmap = delete_bitmap; txn_info.rowset_ids = rowset_ids; txn_info.partial_update_info = partial_update_info; - status = CloudTablet::update_delete_bitmap(tablet, &txn_info, _transaction_id, txn_expiration); - auto update_delete_bitmap_time_us = MonotonicMicros() - t3; + txn_info.publish_status = publish_status; + auto update_delete_bitmap_time_us = 0; + if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED)) { + LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id + << ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap."; + } else { + status = CloudTablet::update_delete_bitmap(tablet, &txn_info, _transaction_id, + txn_expiration); + update_delete_bitmap_time_us = MonotonicMicros() - t3; + } if (status != Status::OK()) { LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << rowset->rowset_id() << ", tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index d55c884a6c2b7d..f0a377cba67dd1 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -35,6 +35,7 @@ #include #include +#include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet.h" #include "cloud/config.h" #include "cloud/pb_convert.h" @@ -50,6 +51,7 @@ #include "olap/olap_common.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_factory.h" +#include "olap/storage_engine.h" #include "olap/tablet_meta.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" @@ -546,6 +548,36 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ } } +bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version, + std::ranges::range auto&& rs_metas, + DeleteBitmap* delete_bitmap) { + std::set txn_processed; + for (auto& rs_meta : rs_metas) { + auto txn_id = rs_meta.txn_id(); + if (txn_processed.find(txn_id) != txn_processed.end()) { + continue; + } + txn_processed.insert(txn_id); + DeleteBitmapPtr tmp_delete_bitmap; + RowsetIdUnorderedSet tmp_rowset_ids; + std::shared_ptr publish_status = + std::make_shared(PublishStatus::INIT); + CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); + Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap( + txn_id, tablet->tablet_id(), &tmp_delete_bitmap, &tmp_rowset_ids, &publish_status); + if (status.ok() && *(publish_status.get()) == PublishStatus::SUCCEED) { + delete_bitmap->merge(*tmp_delete_bitmap); + engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id, + tablet->tablet_id()); + } else { + LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << tablet->tablet_id() + << ", txn_id=" << txn_id << ", status=" << status; + return false; + } + } + return true; +} + Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version, std::ranges::range auto&& rs_metas, const TabletStatsPB& stats, const TabletIndexPB& idx, @@ -554,6 +586,15 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_ return Status::OK(); } + if (sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) { + return Status::OK(); + } else { + LOG(WARNING) << "failed to sync delete bitmap by txn info. tablet_id=" + << tablet->tablet_id(); + DeleteBitmapPtr new_delete_bitmap = std::make_shared(tablet->tablet_id()); + *delete_bitmap = *new_delete_bitmap; + } + std::shared_ptr stub; RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub)); diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index 0164cf57517ed5..6f6cc9c26b47b4 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -92,6 +92,10 @@ class CloudMetaMgr { int64_t initiator); private: + bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version, + std::ranges::range auto&& rs_metas, + DeleteBitmap* delete_bitmap); + Status sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version, std::ranges::range auto&& rs_metas, const TabletStatsPB& stats, const TabletIndexPB& idx, DeleteBitmap* delete_bitmap); diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 587b1638a00ead..c09d57e500e573 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -604,8 +604,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx RowsetSharedPtr rowset = txn_info->rowset; int64_t cur_version = rowset->start_version(); // update delete bitmap info, in order to avoid recalculation when trying again - _engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap, - cur_rowset_ids); + _engine.txn_delete_bitmap_cache().update_tablet_txn_info( + txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE); if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update && rowset_writer->num_rows() > 0) { @@ -626,6 +626,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap( *this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get())); + _engine.txn_delete_bitmap_cache().update_tablet_txn_info( + txn_id, tablet_id(), new_delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED); return Status::OK(); } diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index 5a242f9af3a6c7..64faf915e612e2 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -53,7 +53,8 @@ Status CloudTxnDeleteBitmapCache::init() { Status CloudTxnDeleteBitmapCache::get_tablet_txn_info( TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* rowset, DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration, - std::shared_ptr* partial_update_info) { + std::shared_ptr* partial_update_info, + std::shared_ptr* publish_status) { { std::shared_lock rlock(_rwlock); TxnKey key(transaction_id, tablet_id); @@ -66,6 +67,26 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info( *rowset = iter->second.rowset; *txn_expiration = iter->second.txn_expiration; *partial_update_info = iter->second.partial_update_info; + *publish_status = iter->second.publish_status; + } + RETURN_IF_ERROR( + get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr)); + return Status::OK(); +} + +Status CloudTxnDeleteBitmapCache::get_delete_bitmap( + TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr* delete_bitmap, + RowsetIdUnorderedSet* rowset_ids, std::shared_ptr* publish_status) { + if (publish_status) { + std::shared_lock rlock(_rwlock); + TxnKey txn_key(transaction_id, tablet_id); + auto iter = _txn_map.find(txn_key); + if (iter == _txn_map.end()) { + return Status::Error( + "not found txn info, tablet_id={}, transaction_id={}", tablet_id, + transaction_id); + } + *publish_status = iter->second.publish_status; } std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id); CacheKey key(key_str); @@ -103,7 +124,10 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info( { std::unique_lock wlock(_rwlock); TxnKey txn_key(transaction_id, tablet_id); - _txn_map[txn_key] = TxnVal(rowset, txn_expiration, std::move(partial_update_info)); + std::shared_ptr publish_status = + std::make_shared(PublishStatus::INIT); + _txn_map[txn_key] = TxnVal(rowset, txn_expiration, std::move(partial_update_info), + std::move(publish_status)); _expiration_txn.emplace(txn_expiration, txn_key); } std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id); @@ -128,7 +152,14 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info( void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, - const RowsetIdUnorderedSet& rowset_ids) { + const RowsetIdUnorderedSet& rowset_ids, + PublishStatus publish_status) { + { + std::unique_lock wlock(_rwlock); + TxnKey txn_key(transaction_id, tablet_id); + CHECK(_txn_map.count(txn_key) > 0); + *(_txn_map[txn_key].publish_status.get()) = publish_status; + } std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id); CacheKey key(key_str); @@ -152,6 +183,10 @@ void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() { std::unique_lock wlock(_rwlock); while (!_expiration_txn.empty()) { auto iter = _expiration_txn.begin(); + if (_txn_map.find(iter->second) == _txn_map.end()) { + _expiration_txn.erase(iter); + continue; + } int64_t current_time = duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); @@ -174,6 +209,23 @@ void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() { } } +void CloudTxnDeleteBitmapCache::remove_unused_tablet_txn_info(TTransactionId transaction_id, + int64_t tablet_id) { + std::unique_lock wlock(_rwlock); + TxnKey txn_key(transaction_id, tablet_id); + auto txn_iter = _txn_map.find(txn_key); + if (txn_iter != _txn_map.end()) { + LOG_INFO("remove unused tablet txn info") + .tag("txn_id", txn_iter->first.txn_id) + .tag("tablt_id", txn_iter->first.tablet_id); + std::string key_str = std::to_string(txn_iter->first.txn_id) + "/" + + std::to_string(txn_iter->first.tablet_id); // Cache key container + CacheKey cache_key(key_str); + erase(cache_key); + _txn_map.erase(txn_key); + } +} + void CloudTxnDeleteBitmapCache::_clean_thread_callback() { do { remove_expired_tablet_txn_info(); diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index 3b1d1d1d85760d..71d5123f34dcc8 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -24,6 +24,7 @@ #include "olap/partial_update_info.h" #include "olap/rowset/rowset.h" #include "olap/tablet_meta.h" +#include "olap/txn_manager.h" #include "util/countdown_latch.h" namespace doris { @@ -40,7 +41,8 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicy { Status get_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* rowset, DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration, - std::shared_ptr* partial_update_info); + std::shared_ptr* partial_update_info, + std::shared_ptr* publish_status); void set_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, @@ -49,10 +51,17 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicy { void update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, - const RowsetIdUnorderedSet& rowset_ids); + const RowsetIdUnorderedSet& rowset_ids, + PublishStatus publish_status); void remove_expired_tablet_txn_info(); + void remove_unused_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id); + + Status get_delete_bitmap(TTransactionId transaction_id, int64_t tablet_id, + DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, + std::shared_ptr* publish_status); + private: void _clean_thread_callback(); @@ -80,12 +89,15 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicy { RowsetSharedPtr rowset; int64_t txn_expiration; std::shared_ptr partial_update_info; + std::shared_ptr publish_status = nullptr; TxnVal() : txn_expiration(0) {}; TxnVal(RowsetSharedPtr rowset_, int64_t txn_expiration_, - std::shared_ptr partial_update_info_) + std::shared_ptr partial_update_info_, + std::shared_ptr publish_status_) : rowset(std::move(rowset_)), txn_expiration(txn_expiration_), - partial_update_info(std::move(partial_update_info_)) {} + partial_update_info(std::move(partial_update_info_)), + publish_status(std::move(publish_status_)) {} }; std::map _txn_map; diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 724255bccafa53..9fd528b91a7e3d 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -61,6 +61,7 @@ enum class TxnState { ABORTED = 4, DELETED = 5, }; +enum class PublishStatus { INIT = 0, PREPARE = 1, SUCCEED = 2 }; struct TabletTxnInfo { PUniqueId load_id; @@ -73,6 +74,7 @@ struct TabletTxnInfo { int64_t creation_time; bool ingest {false}; std::shared_ptr partial_update_info; + std::shared_ptr publish_status; TxnState state {TxnState::PREPARED}; TabletTxnInfo() = default;