Skip to content
Merged
1 change: 1 addition & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2052,6 +2052,7 @@ void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskReq
finish_task_request.__set_signature(req.signature);
finish_task_request.__set_report_version(s_report_version);
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
finish_task_request.__set_resp_partitions(calc_delete_bitmap_req.partitions);

finish_task(finish_task_request);
remove_task_info(req.task_type, req.signature);
Expand Down
16 changes: 14 additions & 2 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
std::shared_ptr<PartialUpdateInfo> partial_update_info;
std::shared_ptr<PublishStatus> publish_status;
int64_t txn_expiration;
TxnPublishInfo previous_publish_info;
Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info(
_transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration,
&partial_update_info, &publish_status);
&partial_update_info, &publish_status, &previous_publish_info);
if (status != Status::OK()) {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << status;
Expand All @@ -204,8 +205,19 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
txn_info.rowset_ids = rowset_ids;
txn_info.partial_update_info = partial_update_info;
txn_info.publish_status = publish_status;
txn_info.publish_info = {.publish_version = _version,
.base_compaction_cnt = _ms_base_compaction_cnt,
.cumulative_compaction_cnt = _ms_cumulative_compaction_cnt,
.cumulative_point = _ms_cumulative_point};
auto update_delete_bitmap_time_us = 0;
if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED)) {
if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED) &&
_version == previous_publish_info.publish_version &&
_ms_base_compaction_cnt == previous_publish_info.base_compaction_cnt &&
_ms_cumulative_compaction_cnt == previous_publish_info.cumulative_compaction_cnt &&
_ms_cumulative_point == previous_publish_info.cumulative_point) {
// if version or compaction stats can't match, it means that this is a retry and there are
// compaction or other loads finished successfully on the same tablet. So the previous publish
// is stale and we should re-calculate the delete bitmap
LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id
<< ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap.";
} else {
Expand Down
23 changes: 19 additions & 4 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,14 +559,29 @@ bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64
}
txn_processed.insert(txn_id);
DeleteBitmapPtr tmp_delete_bitmap;
RowsetIdUnorderedSet tmp_rowset_ids;
std::shared_ptr<PublishStatus> publish_status =
std::make_shared<PublishStatus>(PublishStatus::INIT);
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap(
txn_id, tablet->tablet_id(), &tmp_delete_bitmap, &tmp_rowset_ids, &publish_status);
if (status.ok() && *(publish_status.get()) == PublishStatus::SUCCEED) {
delete_bitmap->merge(*tmp_delete_bitmap);
txn_id, tablet->tablet_id(), &tmp_delete_bitmap, nullptr, &publish_status);
// CloudMetaMgr::sync_tablet_delete_bitmap_by_cache() is called after we sync rowsets from meta services.
// If the control flows reaches here, it's gauranteed that the rowsets is commited in meta services, so we can
// use the delete bitmap from cache directly if *publish_status == PublishStatus::SUCCEED without checking other
// stats(version or compaction stats)
if (status.ok() && *publish_status == PublishStatus::SUCCEED) {
// tmp_delete_bitmap contains sentinel marks, we should remove it before merge it to delete bitmap.
// Also, the version of delete bitmap key in tmp_delete_bitmap is DeleteBitmap::TEMP_VERSION_COMMON,
// we should replace it with the rowset's real version
DCHECK(rs_meta.start_version() == rs_meta.end_version());
int64_t rowset_version = rs_meta.start_version();
for (const auto& [delete_bitmap_key, bitmap_value] : tmp_delete_bitmap->delete_bitmap) {
// skip sentinel mark, which is used for delete bitmap correctness check
if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) {
delete_bitmap->merge({std::get<0>(delete_bitmap_key),
std::get<1>(delete_bitmap_key), rowset_version},
bitmap_value);
}
}
engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id,
tablet->tablet_id());
} else {
Expand Down
9 changes: 7 additions & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,13 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx

RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
*this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), new_delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED);

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap,
cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info);

return Status::OK();
}
Expand Down
19 changes: 14 additions & 5 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "cpp/sync_point.h"
#include "olap/olap_common.h"
#include "olap/tablet_meta.h"
#include "olap/txn_manager.h"

namespace doris {

Expand Down Expand Up @@ -54,7 +55,7 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* rowset,
DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration,
std::shared_ptr<PartialUpdateInfo>* partial_update_info,
std::shared_ptr<PublishStatus>* publish_status) {
std::shared_ptr<PublishStatus>* publish_status, TxnPublishInfo* previous_publish_info) {
{
std::shared_lock<std::shared_mutex> rlock(_rwlock);
TxnKey key(transaction_id, tablet_id);
Expand All @@ -68,6 +69,7 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
*txn_expiration = iter->second.txn_expiration;
*partial_update_info = iter->second.partial_update_info;
*publish_status = iter->second.publish_status;
*previous_publish_info = iter->second.publish_info;
}
RETURN_IF_ERROR(
get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr));
Expand Down Expand Up @@ -96,7 +98,9 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
handle == nullptr ? nullptr : reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
if (val) {
*delete_bitmap = val->delete_bitmap;
*rowset_ids = val->rowset_ids;
if (rowset_ids) {
*rowset_ids = val->rowset_ids;
}
// must call release handle to reduce the reference count,
// otherwise there will be memory leak
release(handle);
Expand Down Expand Up @@ -153,12 +157,17 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio
int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status) {
PublishStatus publish_status,
TxnPublishInfo publish_info) {
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
CHECK(_txn_map.count(txn_key) > 0);
*(_txn_map[txn_key].publish_status.get()) = publish_status;
CHECK(_txn_map.contains(txn_key));
TxnVal& txn_val = _txn_map[txn_key];
*(txn_val.publish_status) = publish_status;
if (publish_status == PublishStatus::SUCCEED) {
txn_val.publish_info = publish_info;
}
}
std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
CacheKey key(key_str);
Expand Down
11 changes: 9 additions & 2 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual {
RowsetSharedPtr* rowset, DeleteBitmapPtr* delete_bitmap,
RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration,
std::shared_ptr<PartialUpdateInfo>* partial_update_info,
std::shared_ptr<PublishStatus>* publish_status);
std::shared_ptr<PublishStatus>* publish_status,
TxnPublishInfo* previous_publish_info);

void set_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids,
Expand All @@ -52,12 +53,16 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual {
void update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status);
PublishStatus publish_status, TxnPublishInfo publish_info = {});

void remove_expired_tablet_txn_info();

void remove_unused_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id);

// !!!ATTENTION!!!: the delete bitmap stored in CloudTxnDeleteBitmapCache contains sentinel marks,
// and the version in BitmapKey is DeleteBitmap::TEMP_VERSION_COMMON.
// when using delete bitmap from this cache, the caller should manually remove these marks if don't need it
// and should replace versions in BitmapKey by the correct version
Status get_delete_bitmap(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids,
std::shared_ptr<PublishStatus>* publish_status);
Expand Down Expand Up @@ -88,6 +93,8 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual {
int64_t txn_expiration;
std::shared_ptr<PartialUpdateInfo> partial_update_info;
std::shared_ptr<PublishStatus> publish_status = nullptr;
// used to determine if the retry needs to re-calculate the delete bitmap
TxnPublishInfo publish_info;
TxnVal() : txn_expiration(0) {};
TxnVal(RowsetSharedPtr rowset_, int64_t txn_expiration_,
std::shared_ptr<PartialUpdateInfo> partial_update_info_,
Expand Down
28 changes: 16 additions & 12 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1208,17 +1208,6 @@ Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap
return Status::OK();
}

void BaseTablet::_remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr delete_bitmap) {
for (auto it = delete_bitmap->delete_bitmap.begin(), end = delete_bitmap->delete_bitmap.end();
it != end;) {
if (std::get<1>(it->first) == DeleteBitmap::INVALID_SEGMENT_ID) {
it = delete_bitmap->delete_bitmap.erase(it);
} else {
++it;
}
}
}

Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInfo* txn_info,
int64_t txn_id, int64_t txn_expiration) {
SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency);
Expand Down Expand Up @@ -1296,6 +1285,21 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf
}
}

DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.enable_spin_wait", {
auto token = dp->param<std::string>("token", "invalid_token");
while (DebugPoints::instance()->is_enable("BaseTablet::update_delete_bitmap.block")) {
auto block_dp = DebugPoints::instance()->get_debug_point(
"BaseTablet::update_delete_bitmap.block");
if (block_dp) {
auto wait_token = block_dp->param<std::string>("wait_token", "");
if (wait_token != token) {
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
});

if (!rowsets_skip_alignment.empty()) {
auto token = self->calc_delete_bitmap_executor()->create_token();
// set rowset_writer to nullptr to skip the alignment process
Expand Down Expand Up @@ -1544,7 +1548,7 @@ Status BaseTablet::update_delete_bitmap_without_lock(
if (!st.ok()) {
LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!");
}
self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
delete_bitmap->remove_sentinel_marks();
}
for (auto& iter : delete_bitmap->delete_bitmap) {
self->_tablet_meta->delete_bitmap().merge(
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ class BaseTablet {
static void _rowset_ids_difference(const RowsetIdUnorderedSet& cur,
const RowsetIdUnorderedSet& pre,
RowsetIdUnorderedSet* to_add, RowsetIdUnorderedSet* to_del);
static void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr delete_bitmap);

Status _capture_consistent_rowsets_unlocked(const std::vector<Version>& version_path,
std::vector<RowsetSharedPtr>* rowsets) const;
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,16 @@ bool DeleteBitmap::contains_agg_without_cache(const BitmapKey& bmk, uint32_t row
return false;
}

void DeleteBitmap::remove_sentinel_marks() {
for (auto it = delete_bitmap.begin(), end = delete_bitmap.end(); it != end;) {
if (std::get<1>(it->first) == DeleteBitmap::INVALID_SEGMENT_ID) {
it = delete_bitmap.erase(it);
} else {
++it;
}
}
}

int DeleteBitmap::set(const BitmapKey& bmk, const roaring::Roaring& segment_delete_bitmap) {
std::lock_guard l(lock);
auto [_, inserted] = delete_bitmap.insert_or_assign(bmk, segment_delete_bitmap);
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ class DeleteBitmap {
*/
std::shared_ptr<roaring::Roaring> get_agg(const BitmapKey& bmk) const;

void remove_sentinel_marks();

class AggCachePolicy : public LRUCachePolicyTrackingManual {
public:
AggCachePolicy(size_t capacity)
Expand Down
32 changes: 24 additions & 8 deletions be/src/olap/txn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ enum class TxnState {
};
enum class PublishStatus { INIT = 0, PREPARE = 1, SUCCEED = 2 };

struct TxnPublishInfo {
int64_t publish_version {-1};
int64_t base_compaction_cnt {-1};
int64_t cumulative_compaction_cnt {-1};
int64_t cumulative_point {-1};
};

struct TabletTxnInfo {
PUniqueId load_id;
RowsetSharedPtr rowset;
Expand All @@ -74,24 +81,33 @@ struct TabletTxnInfo {
int64_t creation_time;
bool ingest {false};
std::shared_ptr<PartialUpdateInfo> partial_update_info;

// for cloud only, used to determine if a retry CloudTabletCalcDeleteBitmapTask
// needs to re-calculate the delete bitmap
std::shared_ptr<PublishStatus> publish_status;
TxnState state {TxnState::PREPARED};
TxnPublishInfo publish_info;

TxnState state {TxnState::PREPARED};
TabletTxnInfo() = default;

TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
: load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}
: load_id(std::move(load_id)),
rowset(std::move(rowset)),
creation_time(UnixSeconds()) {}

TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg)
: load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), ingest(ingest_arg) {}
: load_id(std::move(load_id)),
rowset(std::move(rowset)),
creation_time(UnixSeconds()),
ingest(ingest_arg) {}

TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write,
DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& ids)
: load_id(load_id),
rowset(rowset),
DeleteBitmapPtr delete_bitmap, RowsetIdUnorderedSet ids)
: load_id(std::move(load_id)),
rowset(std::move(rowset)),
unique_key_merge_on_write(merge_on_write),
delete_bitmap(delete_bitmap),
rowset_ids(ids),
delete_bitmap(std::move(delete_bitmap)),
rowset_ids(std::move(ids)),
creation_time(UnixSeconds()) {}

void prepare() { state = TxnState::PREPARED; }
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,17 @@ private void finishCalcDeleteBitmap(AgentTask task, TFinishTaskRequest request)
"backend: " + task.getBackendId() + ", error_tablet_size: "
+ request.getErrorTabletIdsSize() + ", err_msg: "
+ request.getTaskStatus().getErrorMsgs().toString());
} else if (request.isSetRespPartitions()
&& calcDeleteBitmapTask.isFinishRequestStale(request.getRespPartitions())) {
LOG.warn("get staled response from backend: {}, report version: {}. calcDeleteBitmapTask's"
+ "partitionInfos: {}. response's partitionInfos: {}", task.getBackendId(),
request.getReportVersion(),
calcDeleteBitmapTask.getCalcDeleteBimapPartitionInfos().toString(),
request.getRespPartitions().toString());
// DELETE_BITMAP_LOCK_ERROR will be retried
calcDeleteBitmapTask.countDownToZero(TStatusCode.DELETE_BITMAP_LOCK_ERROR,
"get staled response from backend " + task.getBackendId() + ", report version: "
+ request.getReportVersion());
} else {
calcDeleteBitmapTask.countDownLatch(task.getBackendId(), calcDeleteBitmapTask.getTransactionId());
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public void countDownToZero(TStatusCode code, String errMsg) {
}
}

public boolean isFinishRequestStale(List<TCalcDeleteBitmapPartitionInfo> respPartitionInfos) {
return !respPartitionInfos.equals(partitionInfos);
}

public void setLatch(MarkedCountDownLatch<Long, Long> latch) {
this.latch = latch;
}
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/MasterService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ struct TFinishTaskRequest {
17: optional map<Types.TTabletId, Types.TVersion> succ_tablets
18: optional map<i64, i64> table_id_to_delta_num_rows
19: optional map<i64, map<i64, i64>> table_id_to_tablet_id_to_delta_num_rows
// for Cloud mow table only, used by FE to check if the response is for the latest request
20: optional list<AgentService.TCalcDeleteBitmapPartitionInfo> resp_partitions;
}

struct TTablet {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1
2 2 2
3 3 3

-- !sql --
1 1 1
2 2 2
3 3 3

-- !sql --
1 999 999
2 888 888
3 3 3

Loading