Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cloud/src/meta-service/txn_lazy_committer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ void convert_tmp_rowsets(
LOG(WARNING) << msg;
return;
}

VersionPB version_pb;
if (!version_pb.ParseFromString(ver_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
Expand All @@ -130,6 +131,17 @@ void convert_tmp_rowsets(
}
LOG(INFO) << "txn_id=" << txn_id << " key=" << hex(ver_key)
<< " version_pb:" << version_pb.ShortDebugString();

if (version_pb.pending_txn_ids_size() == 0 || version_pb.pending_txn_ids(0) != txn_id) {
LOG(INFO) << "txn_id=" << txn_id << " partition_id=" << tmp_rowset_pb.partition_id()
<< " tmp_rowset_key=" << hex(tmp_rowset_key)
<< " version has already been converted."
<< " version_pb:" << version_pb.ShortDebugString();
TEST_SYNC_POINT_CALLBACK("convert_tmp_rowsets::already_been_converted",
&version_pb);
return;
}

partition_versions.emplace(tmp_rowset_pb.partition_id(), version_pb);
DCHECK_EQ(partition_versions.size(), 1) << partition_versions.size();
}
Expand Down Expand Up @@ -279,6 +291,8 @@ void make_committed_txn_visible(const std::string& instance_id, int64_t db_id, i
txn->put(recycle_key, recycle_val);
LOG(INFO) << "put recycle_key=" << hex(recycle_key) << " txn_id=" << txn_id;

TEST_SYNC_POINT_RETURN_WITH_VOID("TxnLazyCommitTask::make_committed_txn_visible::commit",
&code);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
Expand Down
177 changes: 172 additions & 5 deletions cloud/test/txn_lazy_commit_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ static void create_tablet_with_db_id(MetaServiceProxy* meta_service, int64_t db_
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
}

static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int index_id,
int partition_id, int64_t version = -1,
static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int64_t index_id,
int64_t partition_id, int64_t version = -1,
int num_rows = 100) {
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // required
Expand All @@ -178,9 +178,9 @@ static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id,
return rowset;
}

static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t tablet_id, int index_id,
int partition_id, int64_t version = -1,
int num_rows = 100) {
static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t tablet_id,
int64_t index_id, int64_t partition_id,
int64_t version = -1, int num_rows = 100) {
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // required
rowset.set_rowset_id_v2(next_rowset_id());
Expand Down Expand Up @@ -3088,4 +3088,171 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) {
config::txn_lazy_max_rowsets_per_batch = 2;
}

TEST(TxnLazyCommitTest, CommitTxnEventuallyWithSchemaChangeTest) {
// ===========================================================================
// threads concurrent execution flow:
//
// thread1 lazy thread1 thread3
// | | |
// commit_txn_eventually begin | |
// | | |
// lazy commit wait | |
// | | |
// | make_committed_txn_visible |
// | | |
// | inject TXN_TOO_OLD fdb error |
// | | sc create new tablet tmp rowset
// | | |
// | | |
// retry commit_txn | |
// v v
config::txn_lazy_max_rowsets_per_batch = 1000;
auto txn_kv = get_mem_txn_kv();
int64_t db_id = 4534445675;
int64_t table_id = 4365676543;
int64_t index_id = 665453237;
int64_t partition_id = 2136776543678;

bool go = false;
std::mutex go_mutex;
std::condition_variable go_cv;
std::atomic<int32_t> make_committed_txn_visible_count = {0};
std::atomic<int32_t> sc_create_tmp_rowset_count = {0};
std::atomic<int32_t> sc_create_tmp_rowset_finish_count = {0};
std::atomic<int32_t> tmp_rowsets_been_already_converted = {0};

auto sp = SyncPoint::get_instance();
sp->set_call_back("TxnLazyCommitTask::make_committed_txn_visible::commit", [&](auto&& args) {
LOG(INFO) << "zhangleiyyy";
{
std::unique_lock<std::mutex> _lock(go_mutex);
if (make_committed_txn_visible_count == 0) {
make_committed_txn_visible_count++;
if (sc_create_tmp_rowset_count == 0) {
go_cv.wait(_lock, [&] { return sc_create_tmp_rowset_count == 1; });
}
MetaServiceCode* code = try_any_cast<MetaServiceCode*>(args[0]);
*code = MetaServiceCode::KV_TXN_CONFLICT;
bool* pred = try_any_cast<bool*>(args.back());
*pred = true;
LOG(INFO) << "inject kv error KV_TXN_CONFLICT";
go_cv.notify_all();
}
}
});

sp->set_call_back("convert_tmp_rowsets::already_been_converted", [&](auto&& args) {
auto version_pb = *try_any_cast<VersionPB*>(args[0]);
LOG(INFO) << "version_pb:" << version_pb.ShortDebugString();
std::unique_lock<std::mutex> _lock(go_mutex);
tmp_rowsets_been_already_converted++;
go_cv.notify_all();
});

sp->enable_processing();

auto meta_service = get_meta_service(txn_kv, true);
brpc::Controller cntl;
BeginTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
TxnInfoPB txn_info_pb;
txn_info_pb.set_db_id(db_id);
txn_info_pb.set_label("test_sc_with_commit_txn_label");
txn_info_pb.add_table_ids(table_id);
txn_info_pb.set_timeout_ms(36000);
req.mutable_txn_info()->CopyFrom(txn_info_pb);
BeginTxnResponse res;
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res,
nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
int64_t txn_id = res.txn_id();

// mock rowset and tablet
int64_t tablet_id_base = 3131124;
for (int i = 0; i < 1000; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id,
tablet_id_base + i);
auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, index_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

std::thread thread1([&] {
{
{
std::unique_lock<std::mutex> _lock(go_mutex);
go_cv.wait(_lock, [&] { return go; });
}

brpc::Controller cntl;
CommitTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_db_id(db_id);
req.set_txn_id(txn_id);
req.set_is_2pc(false);
req.set_enable_txn_lazy_commit(true);
CommitTxnResponse res;
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
LOG(INFO) << "thread1 finish";
});

std::thread thread2([&] {
{
{
std::unique_lock<std::mutex> _lock(go_mutex);
go_cv.wait(_lock, [&] { return go; });
}

{
std::unique_lock<std::mutex> _lock(go_mutex);
sc_create_tmp_rowset_count++;
if (make_committed_txn_visible_count == 0) {
go_cv.wait(_lock, [&] { return make_committed_txn_visible_count > 0; });
}
for (int i = 0; i < 1000; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id,
partition_id, tablet_id_base + i);
auto tmp_rowset =
create_huge_rowset(txn_id, tablet_id_base + i, index_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
LOG(INFO) << "sc_create_tmp_rowset_finish_count finish";
sc_create_tmp_rowset_finish_count++;
go_cv.notify_all();
}
LOG(INFO) << "thread2 finish";
}
});

std::unique_lock<std::mutex> go_lock(go_mutex);
go = true;
go_lock.unlock();
go_cv.notify_all();

thread1.join();
thread2.join();

ASSERT_GT(tmp_rowsets_been_already_converted, 1);
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string mock_instance = "test_instance";
for (int i = 0; i < 1000; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_db_id(txn, db_id, tablet_id);
check_tmp_rowset_exist(txn, tablet_id, txn_id);
check_rowset_meta_exist(txn, tablet_id, 2);
}
}
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
}

} // namespace doris::cloud
Loading