Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,11 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
DeleteBitmapPtr delete_bitmap;
RowsetIdUnorderedSet rowset_ids;
std::shared_ptr<PartialUpdateInfo> partial_update_info;
std::shared_ptr<PublishStatus> publish_status;
int64_t txn_expiration;
Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info(
_transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration,
&partial_update_info);
&partial_update_info, &publish_status);
if (status != Status::OK()) {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << status;
Expand All @@ -172,8 +173,16 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
txn_info.delete_bitmap = delete_bitmap;
txn_info.rowset_ids = rowset_ids;
txn_info.partial_update_info = partial_update_info;
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, _transaction_id, txn_expiration);
auto update_delete_bitmap_time_us = MonotonicMicros() - t3;
txn_info.publish_status = publish_status;
auto update_delete_bitmap_time_us = 0;
if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED)) {
LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id
<< ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap.";
} else {
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, _transaction_id,
txn_expiration);
update_delete_bitmap_time_us = MonotonicMicros() - t3;
}
if (status != Status::OK()) {
LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << rowset->rowset_id()
<< ", tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id
Expand Down
41 changes: 41 additions & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <type_traits>
#include <vector>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "cloud/pb_convert.h"
Expand All @@ -50,6 +51,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -546,6 +548,36 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
}
}

bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
DeleteBitmap* delete_bitmap) {
std::set<int64_t> txn_processed;
for (auto& rs_meta : rs_metas) {
auto txn_id = rs_meta.txn_id();
if (txn_processed.find(txn_id) != txn_processed.end()) {
continue;
}
txn_processed.insert(txn_id);
DeleteBitmapPtr tmp_delete_bitmap;
RowsetIdUnorderedSet tmp_rowset_ids;
std::shared_ptr<PublishStatus> publish_status =
std::make_shared<PublishStatus>(PublishStatus::INIT);
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap(
txn_id, tablet->tablet_id(), &tmp_delete_bitmap, &tmp_rowset_ids, &publish_status);
if (status.ok() && *(publish_status.get()) == PublishStatus::SUCCEED) {
delete_bitmap->merge(*tmp_delete_bitmap);
engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id,
tablet->tablet_id());
} else {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << tablet->tablet_id()
<< ", txn_id=" << txn_id << ", status=" << status;
return false;
}
}
return true;
}

Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
const TabletStatsPB& stats, const TabletIndexPB& idx,
Expand All @@ -554,6 +586,15 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
return Status::OK();
}

if (sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) {
return Status::OK();
} else {
LOG(WARNING) << "failed to sync delete bitmap by txn info. tablet_id="
<< tablet->tablet_id();
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
*delete_bitmap = *new_delete_bitmap;
}

std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));

Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class CloudMetaMgr {
int64_t initiator);

private:
bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
DeleteBitmap* delete_bitmap);

Status sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas, const TabletStatsPB& stats,
const TabletIndexPB& idx, DeleteBitmap* delete_bitmap);
Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
// update delete bitmap info, in order to avoid recalculation when trying again
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap,
cur_rowset_ids);
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE);

if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update &&
rowset_writer->num_rows() > 0) {
Expand All @@ -626,6 +626,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx

RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
*this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), new_delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED);

return Status::OK();
}
Expand Down
58 changes: 55 additions & 3 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ Status CloudTxnDeleteBitmapCache::init() {
Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* rowset,
DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration,
std::shared_ptr<PartialUpdateInfo>* partial_update_info) {
std::shared_ptr<PartialUpdateInfo>* partial_update_info,
std::shared_ptr<PublishStatus>* publish_status) {
{
std::shared_lock<std::shared_mutex> rlock(_rwlock);
TxnKey key(transaction_id, tablet_id);
Expand All @@ -66,6 +67,26 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
*rowset = iter->second.rowset;
*txn_expiration = iter->second.txn_expiration;
*partial_update_info = iter->second.partial_update_info;
*publish_status = iter->second.publish_status;
}
RETURN_IF_ERROR(
get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr));
return Status::OK();
}

Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr* delete_bitmap,
RowsetIdUnorderedSet* rowset_ids, std::shared_ptr<PublishStatus>* publish_status) {
if (publish_status) {
std::shared_lock<std::shared_mutex> rlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
auto iter = _txn_map.find(txn_key);
if (iter == _txn_map.end()) {
return Status::Error<ErrorCode::NOT_FOUND, false>(
"not found txn info, tablet_id={}, transaction_id={}", tablet_id,
transaction_id);
}
*publish_status = iter->second.publish_status;
}
std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
CacheKey key(key_str);
Expand Down Expand Up @@ -103,7 +124,10 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
_txn_map[txn_key] = TxnVal(rowset, txn_expiration, std::move(partial_update_info));
std::shared_ptr<PublishStatus> publish_status =
std::make_shared<PublishStatus>(PublishStatus::INIT);
_txn_map[txn_key] = TxnVal(rowset, txn_expiration, std::move(partial_update_info),
std::move(publish_status));
_expiration_txn.emplace(txn_expiration, txn_key);
}
std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
Expand All @@ -128,7 +152,14 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id,
int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids) {
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status) {
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
CHECK(_txn_map.count(txn_key) > 0);
*(_txn_map[txn_key].publish_status.get()) = publish_status;
}
std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
CacheKey key(key_str);

Expand All @@ -152,6 +183,10 @@ void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
std::unique_lock<std::shared_mutex> wlock(_rwlock);
while (!_expiration_txn.empty()) {
auto iter = _expiration_txn.begin();
if (_txn_map.find(iter->second) == _txn_map.end()) {
_expiration_txn.erase(iter);
continue;
}
int64_t current_time = duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
Expand All @@ -174,6 +209,23 @@ void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
}
}

void CloudTxnDeleteBitmapCache::remove_unused_tablet_txn_info(TTransactionId transaction_id,
int64_t tablet_id) {
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
auto txn_iter = _txn_map.find(txn_key);
if (txn_iter != _txn_map.end()) {
LOG_INFO("remove unused tablet txn info")
.tag("txn_id", txn_iter->first.txn_id)
.tag("tablt_id", txn_iter->first.tablet_id);
std::string key_str = std::to_string(txn_iter->first.txn_id) + "/" +
std::to_string(txn_iter->first.tablet_id); // Cache key container
CacheKey cache_key(key_str);
erase(cache_key);
_txn_map.erase(txn_key);
}
}

void CloudTxnDeleteBitmapCache::_clean_thread_callback() {
do {
remove_expired_tablet_txn_info();
Expand Down
20 changes: 16 additions & 4 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet_meta.h"
#include "olap/txn_manager.h"
#include "util/countdown_latch.h"

namespace doris {
Expand All @@ -40,7 +41,8 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicy {
Status get_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
RowsetSharedPtr* rowset, DeleteBitmapPtr* delete_bitmap,
RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration,
std::shared_ptr<PartialUpdateInfo>* partial_update_info);
std::shared_ptr<PartialUpdateInfo>* partial_update_info,
std::shared_ptr<PublishStatus>* publish_status);

void set_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids,
Expand All @@ -49,10 +51,17 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicy {

void update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids);
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status);

void remove_expired_tablet_txn_info();

void remove_unused_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id);

Status get_delete_bitmap(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids,
std::shared_ptr<PublishStatus>* publish_status);

private:
void _clean_thread_callback();

Expand Down Expand Up @@ -80,12 +89,15 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicy {
RowsetSharedPtr rowset;
int64_t txn_expiration;
std::shared_ptr<PartialUpdateInfo> partial_update_info;
std::shared_ptr<PublishStatus> publish_status = nullptr;
TxnVal() : txn_expiration(0) {};
TxnVal(RowsetSharedPtr rowset_, int64_t txn_expiration_,
std::shared_ptr<PartialUpdateInfo> partial_update_info_)
std::shared_ptr<PartialUpdateInfo> partial_update_info_,
std::shared_ptr<PublishStatus> publish_status_)
: rowset(std::move(rowset_)),
txn_expiration(txn_expiration_),
partial_update_info(std::move(partial_update_info_)) {}
partial_update_info(std::move(partial_update_info_)),
publish_status(std::move(publish_status_)) {}
};

std::map<TxnKey, TxnVal> _txn_map;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/txn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ enum class TxnState {
ABORTED = 4,
DELETED = 5,
};
enum class PublishStatus { INIT = 0, PREPARE = 1, SUCCEED = 2 };

struct TabletTxnInfo {
PUniqueId load_id;
Expand All @@ -73,6 +74,7 @@ struct TabletTxnInfo {
int64_t creation_time;
bool ingest {false};
std::shared_ptr<PartialUpdateInfo> partial_update_info;
std::shared_ptr<PublishStatus> publish_status;
TxnState state {TxnState::PREPARED};

TabletTxnInfo() = default;
Expand Down