diff --git a/cloud/script/run_all_tests.sh b/cloud/script/run_all_tests.sh index f2b5f4dd9b0b06..f89a70691495ea 100644 --- a/cloud/script/run_all_tests.sh +++ b/cloud/script/run_all_tests.sh @@ -142,12 +142,12 @@ for i in *_test; do if [[ "${fdb}" != "" ]]; then patchelf --set-rpath "$(pwd)" "${i}" fi - + LLVM_PROFILE_FILE="./report/${i}.profraw" set -euo pipefail if [[ "${filter}" == "" ]]; then - LLVM_PROFILE_FILE="./report/${i}.profraw" "./${i}" --gtest_print_time=true --gtest_output="xml:${i}.xml" + "./${i}" --gtest_print_time=true --gtest_output="xml:${i}.xml" else - LLVM_PROFILE_FILE="./report/${i}.profraw" "./${i}" --gtest_print_time=true --gtest_output="xml:${i}.xml" --gtest_filter="${filter}" + "./${i}" --gtest_print_time=true --gtest_output="xml:${i}.xml" --gtest_filter="${filter}" fi set +euo pipefail unittest_files[${#unittest_files[*]}]="${i}" diff --git a/cloud/src/meta-service/injection_point_http.cpp b/cloud/src/meta-service/injection_point_http.cpp index 910b709ca89d1a..e79e2e527704ce 100644 --- a/cloud/src/meta-service/injection_point_http.cpp +++ b/cloud/src/meta-service/injection_point_http.cpp @@ -91,6 +91,27 @@ static void register_suites() { } }); }); + + suite_map.emplace("sleep_before_execute_commit_rowset", [] { + int duration_ms = 15000; + std::shared_ptr hit(new std::atomic_long(0)); + LOG(INFO) << "register injection point sleep_before_execute_commit_rowset sleep for " + << duration_ms << " ms"; + auto sp = SyncPoint::get_instance(); + + sp->set_call_back("MetaServiceImpl::commit_rowset", [duration_ms, hit](auto&& args) { + if (hit->fetch_add(1) == 0) { + const CreateRowsetRequest* req = try_any_cast(args[0]); + LOG(INFO) << "hit injection point sleep_before_execute_commit_rowset sleep for " + << duration_ms << " ms, request=" << req->ShortDebugString(); + std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms)); + return; + } + LOG(WARNING) << "injection point sleep_before_execute_commit_rowset without sleep, hit=" + << hit->load(); + }); + }); + suite_map.emplace("Transaction::commit.enable_inject", []() { auto* sp = SyncPoint::get_instance(); sp->set_call_back("Transaction::commit.inject_random_fault", [](auto&& args) { @@ -366,4 +387,4 @@ HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown op:" + op); } -} // namespace doris::cloud \ No newline at end of file +} // namespace doris::cloud diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index c7c0827edf4a8a..e8ceb909a4ffdd 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1119,6 +1119,7 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle CreateRowsetResponse* response, ::google::protobuf::Closure* done) { RPC_PREPROCESS(commit_rowset); + TEST_INJECTION_POINT_CALLBACK("MetaServiceImpl::commit_rowset", request); if (!request->has_rowset_meta()) { code = MetaServiceCode::INVALID_ARGUMENT; msg = "no rowset meta"; @@ -1222,6 +1223,16 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle } auto recycle_rs_key = recycle_rowset_key({instance_id, tablet_id, rowset_id}); + std::string recycle_rs_val; + err = txn->get(recycle_rs_key, &recycle_rs_val); + if (err != TxnErrorCode::TXN_OK) { + std::stringstream ss; + ss << "failed to get recycle_rowset_key when commit rowset instance_id=" << instance_id + << " tablet_id=" << tablet_id << " rowset_id=" << rowset_id << " ret=" << err; + code = cast_as(err); + msg = ss.str(); + return; + } txn->remove(recycle_rs_key); DCHECK_GT(rowset_meta.txn_expiration(), 0); diff --git a/cloud/src/recycler/hdfs_accessor.cpp b/cloud/src/recycler/hdfs_accessor.cpp index 024acd0efe7852..87d5958ffef4c1 100644 --- a/cloud/src/recycler/hdfs_accessor.cpp +++ b/cloud/src/recycler/hdfs_accessor.cpp @@ -356,6 +356,7 @@ std::string extract_parent_path(const std::string& path) { } int HdfsAccessor::init() { + TEST_SYNC_POINT_RETURN_WITH_VALUE("HdfsAccessor::init", 0); TEST_SYNC_POINT_RETURN_WITH_VALUE("HdfsAccessor::init.hdfs_init_failed", (int)-1); // TODO(plat1ko): Cache hdfsFS fs_ = HDFSBuilder::create_fs(info_.build_conf()); diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index ad16bc96d92311..c503e2a192df75 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1991,7 +1991,7 @@ int InstanceRecycler::recycle_rowsets() { return final_expiration; }; - auto handle_rowset_kv = [&](std::string_view k, std::string_view v) -> int { + auto handle_recycle_rowset_kv = [&](std::string_view k, std::string_view v) -> int { ++num_scanned; total_rowset_key_size += k.size(); total_rowset_value_size += v.size(); @@ -2008,6 +2008,13 @@ int InstanceRecycler::recycle_rowsets() { if (current_time < calc_expiration(rowset)) { // not expired return 0; } + // TODO(gavin): The recycle key is marked as an expired rowset, indicating that it should be + // recycled. However, it may still be referenced by load, SC, or compaction + // processes. + // To resolve this, create a key-value transaction to read and update its + // status to EXPIRED. This will ensure that it conflicts with other commit + // procedures, preventing potential issues. + ++num_expired; expired_rowset_size += v.size(); if (!rowset.has_type()) { // old version `RecycleRowsetPB` @@ -2097,7 +2104,7 @@ int InstanceRecycler::recycle_rowsets() { return 0; }; - int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1, std::move(handle_rowset_kv), + int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1, std::move(handle_recycle_rowset_kv), std::move(loop_done)); worker_pool->stop(); diff --git a/cloud/test/fdb_injection_test.cpp b/cloud/test/fdb_injection_test.cpp index 60226e7f9521a2..ae5e7307ce6aec 100644 --- a/cloud/test/fdb_injection_test.cpp +++ b/cloud/test/fdb_injection_test.cpp @@ -93,7 +93,7 @@ int main(int argc, char** argv) { [](auto&& args) { *try_any_cast(args[0]) = 0; }); sp->set_call_back("put_schema_kv:schema_key_exists_return", [](auto&& args) { *try_any_cast(args.back()) = true; }); - sp->set_call_back("resource_manager::set_safe_drop_time", + sp->set_call_back("resource_manager::set_safe_drop_time", // make it always safe to drop [](auto&& args) { *try_any_cast(args[0]) = -1; }); meta_service = create_meta_service(); diff --git a/cloud/test/meta_service_job_test.cpp b/cloud/test/meta_service_job_test.cpp index 3d7e3a6311ac99..7752005ca476cd 100644 --- a/cloud/test/meta_service_job_test.cpp +++ b/cloud/test/meta_service_job_test.cpp @@ -103,6 +103,14 @@ doris::RowsetMetaCloudPB create_rowset(int64_t tablet_id, int64_t start_version, return rowset; } +static void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, + CreateRowsetResponse& res) { + brpc::Controller cntl; + CreateRowsetRequest req; + req.mutable_rowset_meta()->CopyFrom(rowset); + meta_service->prepare_rowset(&cntl, &req, &res, nullptr); +} + void commit_rowset(MetaService* meta_service, const doris::RowsetMetaCloudPB& rowset, CreateRowsetResponse& res) { brpc::Controller cntl; @@ -2353,6 +2361,9 @@ TEST(MetaServiceJobTest, SchemaChangeJobTest) { for (int64_t i = 0; i < 5; ++i) { output_rowsets.push_back(create_rowset(new_tablet_id, i + 2, i + 2)); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), output_rowsets.back(), res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), output_rowsets.back(), res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << i; } @@ -2430,6 +2441,9 @@ TEST(MetaServiceJobTest, SchemaChangeJobTest) { output_rowsets.push_back(create_rowset(new_tablet_id, 13, 13)); for (auto& rs : output_rowsets) { CreateRowsetResponse res; + prepare_rowset(meta_service.get(), rs, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), rs, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << rs.end_version(); } @@ -2566,8 +2580,12 @@ TEST(MetaServiceJobTest, RetrySchemaChangeJobTest) { be1_output_rowsets.push_back(create_rowset(new_tablet_id, 11, 11)); for (auto& rs : be1_output_rowsets) { CreateRowsetResponse res; + prepare_rowset(meta_service.get(), rs, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), rs, res); - ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << rs.end_version(); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) + << "end_version=" << rs.end_version() << res.status().ShortDebugString(); } sc_res.Clear(); // FE thinks BE1 is not alive and retries "job2" on BE2, should preempt "job2" created by BE1 @@ -2579,17 +2597,28 @@ TEST(MetaServiceJobTest, RetrySchemaChangeJobTest) { { CreateRowsetResponse res; // [2-8] has committed by BE1 - commit_rowset(meta_service.get(), create_rowset(new_tablet_id, 2, 8), res); + auto rs_meta = create_rowset(new_tablet_id, 2, 8); + prepare_rowset(meta_service.get(), rs_meta, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) + << res.status().ShortDebugString(); + res.Clear(); + commit_rowset(meta_service.get(), rs_meta, res); ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED); ASSERT_TRUE(res.has_existed_rowset_meta()); ASSERT_EQ(res.existed_rowset_meta().rowset_id_v2(), be1_output_rowsets[0].rowset_id_v2()); be2_output_rowsets.push_back(res.existed_rowset_meta()); res.Clear(); be2_output_rowsets.push_back(create_rowset(new_tablet_id, 9, 12)); + prepare_rowset(meta_service.get(), be2_output_rowsets.back(), res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), be2_output_rowsets.back(), res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); res.Clear(); be2_output_rowsets.push_back(create_rowset(new_tablet_id, 13, 13)); + prepare_rowset(meta_service.get(), be2_output_rowsets.back(), res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), be2_output_rowsets.back(), res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -2711,6 +2740,9 @@ TEST(MetaServiceJobTest, SchemaChangeJobWithMoWTest) { for (int64_t i = 0; i < 5; ++i) { output_rowsets.push_back(create_rowset(new_tablet_id, i + 2, i + 2)); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), output_rowsets.back(), res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), output_rowsets.back(), res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << i; } @@ -2762,6 +2794,9 @@ TEST(MetaServiceJobTest, SchemaChangeJobWithMoWTest) { for (int64_t i = 0; i < 5; ++i) { output_rowsets.push_back(create_rowset(new_tablet_id, i + 2, i + 2)); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), output_rowsets.back(), res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), output_rowsets.back(), res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << i; } @@ -2916,9 +2951,14 @@ TEST(MetaServiceJobTest, ConcurrentCompactionTest) { { // Provide output rowset auto output_rowset = create_rowset(tablet_id, 5, 10); - CreateRowsetResponse rowset_res; - commit_rowset(meta_service.get(), output_rowset, rowset_res); - ASSERT_EQ(rowset_res.status().code(), MetaServiceCode::OK); + { + CreateRowsetResponse res; + prepare_rowset(meta_service.get(), output_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); + commit_rowset(meta_service.get(), output_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } FinishTabletJobRequest req; FinishTabletJobResponse res; @@ -3028,9 +3068,14 @@ TEST(MetaServiceJobTest, ConcurrentCompactionTest) { { // Provide output rowset auto output_rowset = create_rowset(tablet_id, 2, 4); - CreateRowsetResponse rowset_res; - commit_rowset(meta_service.get(), output_rowset, rowset_res); - ASSERT_EQ(rowset_res.status().code(), MetaServiceCode::OK); + { + CreateRowsetResponse res; + prepare_rowset(meta_service.get(), output_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); + commit_rowset(meta_service.get(), output_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } FinishTabletJobRequest req; FinishTabletJobResponse res; diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 7e8a73c3bf62af..de75506fcab0ff 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -193,6 +193,9 @@ static void commit_txn(MetaServiceProxy* meta_service, int64_t db_id, int64_t tx ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label; } +/** + * Generates a rowset meta represented by RowsetMetaCloudPB + */ static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int partition_id = 10, int64_t version = -1, int num_rows = 100) { @@ -1747,6 +1750,9 @@ TEST(MetaServiceTest, CommitTxnTest) { create_tablet(meta_service.get(), 1234, 1235, 1236, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -1845,6 +1851,9 @@ TEST(MetaServiceTest, CommitTxnExpiredTest) { create_tablet(meta_service.get(), 1234789234, 1235, 1236, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -1873,6 +1882,9 @@ static void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t tab create_tablet(meta_service, table_id, index_id, partition_id, tablet_id); auto tmp_rowset = create_rowset(txn_id, tablet_id, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service, tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service, tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -2466,6 +2478,9 @@ TEST(MetaServiceTest, AbortTxnTest) { create_tablet(meta_service.get(), 12345, 1235, 1236, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -2516,8 +2531,11 @@ TEST(MetaServiceTest, AbortTxnTest) { create_tablet(meta_service.get(), table_id, 1235, 1236, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i); CreateRowsetResponse res; - commit_rowset(meta_service.get(), tmp_rowset, res); + prepare_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); + commit_rowset(meta_service.get(), tmp_rowset, res); + EXPECT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().ShortDebugString(); } // abort txn by db_id and label @@ -2706,6 +2724,9 @@ TEST(MetaServiceTest, CheckTxnConflictTest) { create_tablet(meta_service.get(), table_id, 1235, 1236, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -2977,6 +2998,9 @@ TEST(MetaServiceTest, CleanTxnLabelTest) { create_tablet(meta_service.get(), 1234, 1235, 1236, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -3230,6 +3254,9 @@ TEST(MetaServiceTest, CleanTxnLabelTest) { create_tablet(meta_service.get(), 1234, 1235, 1236, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -3298,6 +3325,9 @@ TEST(MetaServiceTest, CleanTxnLabelTest) { create_tablet(meta_service.get(), 1234, 1235, 1236, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -6145,6 +6175,9 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) { auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i); tmp_rowset.set_partition_id(partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index c3cf5051995157..b12e67586a4047 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -3494,6 +3494,12 @@ TEST(RecyclerTest, init_vault_accessor_failed_test) { txn->put(storage_vault_key({instance.instance_id(), "2"}), vault.SerializeAsString()); } + // ATTN(gavin): we have to inject this point, otherwise it may complain ASAN memory issue of JVM + sp->set_call_back("HdfsAccessor::init", [](auto&& args) { + auto* ret = try_any_cast*>(args[0]); + ret->first = -1; // return -1, make it fail + ret->second = true; + }); // failed to init because accessor->init() fails { HdfsBuildConf hdfs_build_conf; @@ -3538,7 +3544,8 @@ TEST(RecyclerTest, init_vault_accessor_failed_test) { }); sp->enable_processing(); - // succeed to init MockAccessor + // succeed to init MockAccessor, sync point "HdfsAccessor::init" is set to return -1 but it is + // ignored by sync point "InstanceRecycler::init_storage_vault_accessors.mock_vault" { HdfsBuildConf hdfs_build_conf; StorageVaultPB vault; diff --git a/cloud/test/resource_test.cpp b/cloud/test/resource_test.cpp index e2aa09514360e1..9b319989c8fe59 100644 --- a/cloud/test/resource_test.cpp +++ b/cloud/test/resource_test.cpp @@ -209,7 +209,7 @@ static void drop_cluster(MetaServiceProxy* ms, const std::string& instance_id, req.set_op(AlterClusterRequest::DROP_CLUSTER); AlterClusterResponse res; ms->alter_cluster(&cntl, &req, &res, brpc::DoNothing()); - ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().ShortDebugString(); } // test cluster's node addr use ip @@ -402,7 +402,7 @@ TEST(ResourceTest, AddDropCluster) { auto* ret = try_any_cast(args[1]); *ret = 0; }); - sp->set_call_back("resource_manager::set_safe_drop_time", + sp->set_call_back("resource_manager::set_safe_drop_time", // make it always safe to drop [](auto&& args) { *try_any_cast(args[0]) = -1; }); sp->enable_processing(); diff --git a/cloud/test/txn_lazy_commit_test.cpp b/cloud/test/txn_lazy_commit_test.cpp index 5737e6e9eef366..bcc9a1afa0510c 100644 --- a/cloud/test/txn_lazy_commit_test.cpp +++ b/cloud/test/txn_lazy_commit_test.cpp @@ -175,6 +175,16 @@ static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, return rowset; } +static void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, + CreateRowsetResponse& res) { + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage(arena); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; +} + static void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, CreateRowsetResponse& res) { brpc::Controller cntl; @@ -425,6 +435,9 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) { auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); tmp_rowsets_meta.push_back(std::make_pair("mock_tmp_rowset_key", tmp_rowset)); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -512,6 +525,9 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest) { tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -593,6 +609,9 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) { tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -702,6 +721,9 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) { tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -780,6 +802,9 @@ TEST(TxnLazyCommitTest, CommitTxnImmediatelyTest) { tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -868,6 +893,9 @@ TEST(TxnLazyCommitTest, NotFallThroughCommitTxnEventuallyTest) { tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -956,6 +984,9 @@ TEST(TxnLazyCommitTest, FallThroughCommitTxnEventuallyTest) { tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -1113,6 +1144,9 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase1Test) { for (int i = 0; i < 10; ++i) { auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -1160,6 +1194,9 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase1Test) { for (int i = 0; i < 10; ++i) { auto tmp_rowset = create_rowset(txn_id2, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -1351,6 +1388,9 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase2Test) { for (int i = 0; i < 10; ++i) { auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -1398,6 +1438,9 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase2Test) { for (int i = 0; i < 10; ++i) { auto tmp_rowset = create_rowset(txn_id2, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -1508,6 +1551,9 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase3Test) { for (int i = 0; i < 10; ++i) { auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -1617,6 +1663,9 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase3Test) { for (int i = 0; i < 10; ++i) { auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -1768,6 +1817,9 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase4Test) { for (int i = 0; i < 10; ++i) { auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -1853,6 +1905,9 @@ TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) { { auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, partition_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); }