Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {

// we still need to update delete bitmap KVs to MS when we skip to calcalate delete bitmaps,
// because the pending delete bitmap KVs in MS we wrote before may have been removed and replaced by other txns
RETURN_IF_ERROR(tablet->save_delete_bitmap_to_ms(_version, _transaction_id, delete_bitmap));
RETURN_IF_ERROR(tablet->save_delete_bitmap_to_ms(_version, _transaction_id, delete_bitmap,
_version));

LOG(INFO) << "tablet=" << _tablet_id << ", txn=" << _transaction_id
<< ", publish_status=SUCCEED, not need to re-calculate delete_bitmaps.";
Expand Down
11 changes: 10 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,9 @@ Status CloudMetaMgr::update_tablet_schema(int64_t tablet_id, const TabletSchema&
}

Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator, DeleteBitmap* delete_bitmap) {
int64_t initiator, DeleteBitmap* delete_bitmap,
int64_t txn_id, bool is_explicit_txn,
int64_t next_visible_version) {
VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id();
UpdateDeleteBitmapRequest req;
UpdateDeleteBitmapResponse res;
Expand All @@ -1151,6 +1153,13 @@ Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t loc
req.set_tablet_id(tablet.tablet_id());
req.set_lock_id(lock_id);
req.set_initiator(initiator);
req.set_is_explicit_txn(is_explicit_txn);
if (txn_id > 0) {
req.set_txn_id(txn_id);
}
if (next_visible_version > 0) {
req.set_next_visible_version(next_visible_version);
}
for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
req.add_rowset_ids(std::get<0>(key).to_string());
req.add_segment_ids(std::get<1>(key));
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class CloudMetaMgr {
Status update_tablet_schema(int64_t tablet_id, const TabletSchema& tablet_schema);

Status update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id, int64_t initiator,
DeleteBitmap* delete_bitmap);
DeleteBitmap* delete_bitmap, int64_t txn_id = -1,
bool is_explicit_txn = false, int64_t next_visible_version = -1);

Status cloud_update_delete_bitmap_without_lock(const CloudTablet& tablet,
DeleteBitmap* delete_bitmap);
Expand Down
12 changes: 8 additions & 4 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,8 @@ CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() {

Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
const RowsetIdUnorderedSet& cur_rowset_ids) {
const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t next_visible_version) {
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
// update delete bitmap info, in order to avoid recalculation when trying again
Expand All @@ -715,7 +716,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
}

RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap));
RETURN_IF_ERROR(
save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, next_visible_version));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
Expand Down Expand Up @@ -745,7 +747,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
}

Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap) {
DeleteBitmapPtr delete_bitmap,
int64_t next_visible_version) {
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
for (auto iter = delete_bitmap->delete_bitmap.begin();
iter != delete_bitmap->delete_bitmap.end(); ++iter) {
Expand All @@ -758,7 +761,8 @@ Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id
}

RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, txn_id, LOAD_INITIATOR_ID,
new_delete_bitmap.get()));
new_delete_bitmap.get(), txn_id, false,
next_visible_version));
return Status::OK();
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ class CloudTablet final : public BaseTablet {

Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
const RowsetIdUnorderedSet& cur_rowset_ids) override;
const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t next_visible_version = -1) override;

Status save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap);
DeleteBitmapPtr delete_bitmap, int64_t next_visible_version);

Status calc_delete_bitmap_for_compaction(const std::vector<RowsetSharedPtr>& input_rowsets,
const RowsetSharedPtr& output_rowset,
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,8 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
auto t5 = watch.get_elapse_time_us();
RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap,
transient_rs_writer.get(), cur_rowset_ids));
transient_rs_writer.get(), cur_rowset_ids,
cur_version));

// defensive check, check that the delete bitmap cache we wrote is correct
RETURN_IF_ERROR(self->check_delete_bitmap_cache(txn_id, delete_bitmap.get()));
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ class BaseTablet {

static Status update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInfo* txn_info,
int64_t txn_id, int64_t txn_expiration = 0);

virtual Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
const RowsetIdUnorderedSet& cur_rowset_ids) = 0;
const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t next_visible_version = -1) = 0;
virtual CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() = 0;

void calc_compaction_output_rowset_delete_bitmap(
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2483,7 +2483,8 @@ CalcDeleteBitmapExecutor* Tablet::calc_delete_bitmap_executor() {

Status Tablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
const RowsetIdUnorderedSet& cur_rowset_ids) {
const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t next_visible_version) {
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();

Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ class Tablet final : public BaseTablet {
CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
const RowsetIdUnorderedSet& cur_rowset_ids) override;
const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t next_visible_version = -1) override;

void merge_delete_bitmap(const DeleteBitmap& delete_bitmap);
bool check_all_rowset_segment();
Expand Down
114 changes: 112 additions & 2 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1829,6 +1829,106 @@ static bool process_pending_delete_bitmap(MetaServiceCode& code, std::string& ms
return true;
}

// When a load txn retries in publish phase with different version to publish, it will gain delete bitmap lock
// many times. these locks are *different*, but they are the same in the current implementation because they have
// the same lock_id and initiator and don't have version info. If some delete bitmap calculation task with version X
// on BE lasts long and try to update delete bitmaps on MS when the txn gains the lock in later retries
// with version Y(Y > X) to publish. It may wrongly update version X's delete bitmaps because the lock don't have version info.
//
// This function checks whether the partition version is correct when updating the delete bitmap
// to avoid wrongly update an visible version's delete bitmaps.
// 1. get the db id with txn id
// 2. get the partition version with db id, table id and partition id
// 3. check if the partition version matches the updating version
static bool check_partition_version_when_update_delete_bitmap(
MetaServiceCode& code, std::string& msg, std::unique_ptr<Transaction>& txn,
std::string& instance_id, int64_t table_id, int64_t partition_id, int64_t tablet_id,
int64_t txn_id, int64_t next_visible_version) {
if (partition_id <= 0) {
LOG(WARNING) << fmt::format(
"invalid partition_id, skip to check partition version. txn={}, "
"table_id={}, partition_id={}, tablet_id={}",
txn_id, table_id, partition_id, tablet_id);
return true;
}
// Get db id with txn id
std::string index_val;
const std::string index_key = txn_index_key({instance_id, txn_id});
auto err = txn->get(index_key, &index_val);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get db id, txn_id={} err={}", txn_id, err);
LOG(WARNING) << msg;
return false;
}

TxnIndexPB index_pb;
if (!index_pb.ParseFromString(index_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("failed to parse txn_index_pb, txn_id={}", txn_id);
LOG(WARNING) << msg;
return false;
}

DCHECK(index_pb.has_tablet_index())
<< fmt::format("txn={}, table_id={}, partition_id={}, tablet_id={}, index_pb={}",
txn_id, table_id, partition_id, tablet_id, proto_to_json(index_pb));
DCHECK(index_pb.tablet_index().has_db_id())
<< fmt::format("txn={}, table_id={}, partition_id={}, tablet_id={}, index_pb={}",
txn_id, table_id, partition_id, tablet_id, proto_to_json(index_pb));
if (!index_pb.has_tablet_index() || !index_pb.tablet_index().has_db_id()) {
LOG(WARNING) << fmt::format(
"has no db_id in TxnIndexPB, skip to check partition version. txn={}, "
"table_id={}, partition_id={}, tablet_id={}, index_pb={}",
txn_id, table_id, partition_id, tablet_id, proto_to_json(index_pb));
return true;
}
int64_t db_id = index_pb.tablet_index().db_id();

std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id});
std::string ver_val;
err = txn->get(ver_key, &ver_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get partition version, txn_id={}, tablet={}, err={}", txn_id,
tablet_id, err);
LOG(WARNING) << msg;
return false;
}

int64_t cur_max_version {-1};
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
cur_max_version = 1;
} else {
VersionPB version_pb;
if (!version_pb.ParseFromString(ver_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("failed to parse version_pb, txn_id={}, tablet={}, key={}", txn_id,
tablet_id, hex(ver_key));
LOG(WARNING) << msg;
return false;
}
DCHECK(version_pb.has_version());
cur_max_version = version_pb.version();

if (version_pb.pending_txn_ids_size() > 0) {
DCHECK(version_pb.pending_txn_ids_size() == 1);
cur_max_version += version_pb.pending_txn_ids_size();
}
}

if (cur_max_version + 1 != next_visible_version) {
code = MetaServiceCode::VERSION_NOT_MATCH;
msg = fmt::format(
"check version failed when update_delete_bitmap, txn={}, table_id={}, "
"partition_id={}, tablet_id={}, found partition's max version is {}, but "
"request next_visible_version is {}",
txn_id, table_id, partition_id, tablet_id, cur_max_version, next_visible_version);
return false;
}
return true;
}

void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* controller,
const UpdateDeleteBitmapRequest* request,
UpdateDeleteBitmapResponse* response,
Expand Down Expand Up @@ -1880,7 +1980,17 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
}
}

// 3. store all pending delete bitmap for this txn
// 3. check if partition's version matches
if (request->lock_id() > 0 && request->has_txn_id() && request->has_partition_id() &&
request->has_next_visible_version()) {
if (!check_partition_version_when_update_delete_bitmap(
code, msg, txn, instance_id, table_id, request->partition_id(), tablet_id,
request->txn_id(), request->next_visible_version())) {
return;
}
}

// 4. store all pending delete bitmap for this txn
PendingDeleteBitmapPB delete_bitmap_keys;
for (size_t i = 0; i < request->rowset_ids_size(); ++i) {
MetaDeleteBitmapInfo key_info {instance_id, tablet_id, request->rowset_ids(i),
Expand Down Expand Up @@ -1919,7 +2029,7 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
}
}

// 4. Update delete bitmap for curent txn
// 5. Update delete bitmap for curent txn
size_t current_key_count = 0;
size_t current_value_count = 0;
size_t total_key_count = 0;
Expand Down
1 change: 1 addition & 0 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3163,6 +3163,7 @@ void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controlle
const std::string index_key = txn_index_key({instance_id, sub_txn_id});
std::string index_val;
TxnIndexPB index_pb;
index_pb.mutable_tablet_index()->set_db_id(db_id);
if (!index_pb.SerializeToString(&index_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_index_pb "
Expand Down
Loading
Loading