diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp b/cloud/src/meta-service/txn_lazy_committer.cpp index aaff8e948566b8..5b9ae571bb3adc 100644 --- a/cloud/src/meta-service/txn_lazy_committer.cpp +++ b/cloud/src/meta-service/txn_lazy_committer.cpp @@ -121,6 +121,7 @@ void convert_tmp_rowsets( LOG(WARNING) << msg; return; } + VersionPB version_pb; if (!version_pb.ParseFromString(ver_val)) { code = MetaServiceCode::PROTOBUF_PARSE_ERR; @@ -130,6 +131,17 @@ void convert_tmp_rowsets( } LOG(INFO) << "txn_id=" << txn_id << " key=" << hex(ver_key) << " version_pb:" << version_pb.ShortDebugString(); + + if (version_pb.pending_txn_ids_size() == 0 || version_pb.pending_txn_ids(0) != txn_id) { + LOG(INFO) << "txn_id=" << txn_id << " partition_id=" << tmp_rowset_pb.partition_id() + << " tmp_rowset_key=" << hex(tmp_rowset_key) + << " version has already been converted." + << " version_pb:" << version_pb.ShortDebugString(); + TEST_SYNC_POINT_CALLBACK("convert_tmp_rowsets::already_been_converted", + &version_pb); + return; + } + partition_versions.emplace(tmp_rowset_pb.partition_id(), version_pb); DCHECK_EQ(partition_versions.size(), 1) << partition_versions.size(); } @@ -279,6 +291,8 @@ void make_committed_txn_visible(const std::string& instance_id, int64_t db_id, i txn->put(recycle_key, recycle_val); LOG(INFO) << "put recycle_key=" << hex(recycle_key) << " txn_id=" << txn_id; + TEST_SYNC_POINT_RETURN_WITH_VOID("TxnLazyCommitTask::make_committed_txn_visible::commit", + &code); err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); diff --git a/cloud/test/txn_lazy_commit_test.cpp b/cloud/test/txn_lazy_commit_test.cpp index 5b1686eed28745..d75ebca81846cd 100644 --- a/cloud/test/txn_lazy_commit_test.cpp +++ b/cloud/test/txn_lazy_commit_test.cpp @@ -156,8 +156,8 @@ static void create_tablet_with_db_id(MetaServiceProxy* meta_service, int64_t db_ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; } -static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int index_id, - int partition_id, int64_t version = -1, +static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int64_t index_id, + int64_t partition_id, int64_t version = -1, int num_rows = 100) { doris::RowsetMetaCloudPB rowset; rowset.set_rowset_id(0); // required @@ -178,9 +178,9 @@ static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, return rowset; } -static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t tablet_id, int index_id, - int partition_id, int64_t version = -1, - int num_rows = 100) { +static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t tablet_id, + int64_t index_id, int64_t partition_id, + int64_t version = -1, int num_rows = 100) { doris::RowsetMetaCloudPB rowset; rowset.set_rowset_id(0); // required rowset.set_rowset_id_v2(next_rowset_id()); @@ -3088,4 +3088,171 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) { config::txn_lazy_max_rowsets_per_batch = 2; } +TEST(TxnLazyCommitTest, CommitTxnEventuallyWithSchemaChangeTest) { + // =========================================================================== + // threads concurrent execution flow: + // + // thread1 lazy thread1 thread3 + // | | | + // commit_txn_eventually begin | | + // | | | + // lazy commit wait | | + // | | | + // | make_committed_txn_visible | + // | | | + // | inject TXN_TOO_OLD fdb error | + // | | sc create new tablet tmp rowset + // | | | + // | | | + // retry commit_txn | | + // v v + config::txn_lazy_max_rowsets_per_batch = 1000; + auto txn_kv = get_mem_txn_kv(); + int64_t db_id = 4534445675; + int64_t table_id = 4365676543; + int64_t index_id = 665453237; + int64_t partition_id = 2136776543678; + + bool go = false; + std::mutex go_mutex; + std::condition_variable go_cv; + std::atomic make_committed_txn_visible_count = {0}; + std::atomic sc_create_tmp_rowset_count = {0}; + std::atomic sc_create_tmp_rowset_finish_count = {0}; + std::atomic tmp_rowsets_been_already_converted = {0}; + + auto sp = SyncPoint::get_instance(); + sp->set_call_back("TxnLazyCommitTask::make_committed_txn_visible::commit", [&](auto&& args) { + LOG(INFO) << "zhangleiyyy"; + { + std::unique_lock _lock(go_mutex); + if (make_committed_txn_visible_count == 0) { + make_committed_txn_visible_count++; + if (sc_create_tmp_rowset_count == 0) { + go_cv.wait(_lock, [&] { return sc_create_tmp_rowset_count == 1; }); + } + MetaServiceCode* code = try_any_cast(args[0]); + *code = MetaServiceCode::KV_TXN_CONFLICT; + bool* pred = try_any_cast(args.back()); + *pred = true; + LOG(INFO) << "inject kv error KV_TXN_CONFLICT"; + go_cv.notify_all(); + } + } + }); + + sp->set_call_back("convert_tmp_rowsets::already_been_converted", [&](auto&& args) { + auto version_pb = *try_any_cast(args[0]); + LOG(INFO) << "version_pb:" << version_pb.ShortDebugString(); + std::unique_lock _lock(go_mutex); + tmp_rowsets_been_already_converted++; + go_cv.notify_all(); + }); + + sp->enable_processing(); + + auto meta_service = get_meta_service(txn_kv, true); + brpc::Controller cntl; + BeginTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + TxnInfoPB txn_info_pb; + txn_info_pb.set_db_id(db_id); + txn_info_pb.set_label("test_sc_with_commit_txn_label"); + txn_info_pb.add_table_ids(table_id); + txn_info_pb.set_timeout_ms(36000); + req.mutable_txn_info()->CopyFrom(txn_info_pb); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, + nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + int64_t txn_id = res.txn_id(); + + // mock rowset and tablet + int64_t tablet_id_base = 3131124; + for (int i = 0; i < 1000; ++i) { + create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, + tablet_id_base + i); + auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, index_id, partition_id); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + std::thread thread1([&] { + { + { + std::unique_lock _lock(go_mutex); + go_cv.wait(_lock, [&] { return go; }); + } + + brpc::Controller cntl; + CommitTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_db_id(db_id); + req.set_txn_id(txn_id); + req.set_is_2pc(false); + req.set_enable_txn_lazy_commit(true); + CommitTxnResponse res; + meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + LOG(INFO) << "thread1 finish"; + }); + + std::thread thread2([&] { + { + { + std::unique_lock _lock(go_mutex); + go_cv.wait(_lock, [&] { return go; }); + } + + { + std::unique_lock _lock(go_mutex); + sc_create_tmp_rowset_count++; + if (make_committed_txn_visible_count == 0) { + go_cv.wait(_lock, [&] { return make_committed_txn_visible_count > 0; }); + } + for (int i = 0; i < 1000; ++i) { + create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, + partition_id, tablet_id_base + i); + auto tmp_rowset = + create_huge_rowset(txn_id, tablet_id_base + i, index_id, partition_id); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + LOG(INFO) << "sc_create_tmp_rowset_finish_count finish"; + sc_create_tmp_rowset_finish_count++; + go_cv.notify_all(); + } + LOG(INFO) << "thread2 finish"; + } + }); + + std::unique_lock go_lock(go_mutex); + go = true; + go_lock.unlock(); + go_cv.notify_all(); + + thread1.join(); + thread2.join(); + + ASSERT_GT(tmp_rowsets_been_already_converted, 1); + { + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string mock_instance = "test_instance"; + for (int i = 0; i < 1000; ++i) { + int64_t tablet_id = tablet_id_base + i; + check_tablet_idx_db_id(txn, db_id, tablet_id); + check_tmp_rowset_exist(txn, tablet_id, txn_id); + check_rowset_meta_exist(txn, tablet_id, 2); + } + } + sp->clear_all_call_backs(); + sp->clear_trace(); + sp->disable_processing(); +} + } // namespace doris::cloud