From ff682fddc89526878ee1d163c7c67363e140fc19 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 15 Apr 2025 01:10:24 +0800 Subject: [PATCH 1/9] 1 --- cloud/src/recycler/recycler.cpp | 51 +++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 581ee4067d4c17..d3001b4f09dd70 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -2500,6 +2500,7 @@ int InstanceRecycler::recycle_expired_txn_label() { int64_t num_scanned = 0; int64_t num_expired = 0; int64_t num_recycled = 0; + int ret = 0; RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id_, 0, 0}; RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id_, INT64_MAX, INT64_MAX}; @@ -2507,6 +2508,7 @@ int InstanceRecycler::recycle_expired_txn_label() { std::string end_recycle_txn_key; recycle_txn_key(recycle_txn_key_info0, &begin_recycle_txn_key); recycle_txn_key(recycle_txn_key_info1, &end_recycle_txn_key); + std::vector recycle_txn_info_keys; LOG_INFO("begin to recycle expired txn").tag("instance_id", instance_id_); @@ -2534,12 +2536,15 @@ int InstanceRecycler::recycle_expired_txn_label() { return final_expiration; }; + SyncExecutor concurrent_delete_executor( + _thread_pool_group.s3_producer_pool, + fmt::format("recycle expired txn label, instance id {}", instance_id_), + [](const int& ret) { return ret != 0; }); + int64_t current_time_ms = duration_cast(system_clock::now().time_since_epoch()).count(); - auto handle_recycle_txn_kv = [&num_scanned, &num_expired, &num_recycled, ¤t_time_ms, - &calc_expiration, - this](std::string_view k, std::string_view v) -> int { + auto handle_recycle_txn_kv = [&](std::string_view k, std::string_view v) -> int { ++num_scanned; RecycleTxnPB recycle_txn_pb; if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) { @@ -2551,10 +2556,12 @@ int InstanceRecycler::recycle_expired_txn_label() { (calc_expiration(recycle_txn_pb) <= current_time_ms)) { VLOG_DEBUG << "found recycle txn, key=" << hex(k); num_expired++; - } else { - return 0; + recycle_txn_info_keys.emplace_back(k); } + return 0; + }; + auto delete_recycle_txn_kv = [&](const std::string& k) -> int { std::string_view k1 = k; //RecycleTxnKeyInfo 0:instance_id 1:db_id 2:txn_id k1.remove_prefix(1); // Remove key space @@ -2638,8 +2645,40 @@ int InstanceRecycler::recycle_expired_txn_label() { return 0; }; + auto loop_done = [&]() -> int { + for (const auto& k : recycle_txn_info_keys) { + concurrent_delete_executor.add([&]() { + if (delete_recycle_txn_kv(k) != 0) { + LOG_WARNING("failed to delete recycle txn kv") + .tag("instance id", instance_id_) + .tag("key", hex(k)); + return -1; + } + return 0; + }); + } + bool finished = true; + std::vector rets = concurrent_delete_executor.when_all(&finished); + for (int r : rets) { + if (r != 0) { + ret = -1; + } + } + + ret = finished ? ret : -1; + + if (ret != 0) { + LOG_WARNING("recycle txn kv ret!=0") + .tag("finished", finished) + .tag("ret", ret) + .tag("instance_id", instance_id_); + return ret; + } + return ret; + }; + return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key, - std::move(handle_recycle_txn_kv)); + std::move(handle_recycle_txn_kv), std::move(loop_done)); } struct CopyJobIdTuple { From a3095b4fbf0b216a253b250aa1641339487ae8c1 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 15 Apr 2025 16:32:03 +0800 Subject: [PATCH 2/9] 2 --- cloud/src/recycler/recycler.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index d3001b4f09dd70..60fed39765c84d 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -2583,7 +2583,6 @@ int InstanceRecycler::recycle_expired_txn_label() { // Remove txn index kv auto index_key = txn_index_key({instance_id_, txn_id}); txn->remove(index_key); - // Remove txn info kv std::string info_key, info_val; txn_info_key({instance_id_, db_id, txn_id}, &info_key); err = txn->get(info_key, &info_val); @@ -2596,7 +2595,6 @@ int InstanceRecycler::recycle_expired_txn_label() { LOG_WARNING("failed to parse txn info").tag("key", hex(info_key)); return -1; } - txn->remove(info_key); // Remove sub txn index kvs std::vector sub_txn_index_keys; for (auto sub_txn_id : txn_info.sub_txn_ids()) { @@ -2633,6 +2631,8 @@ int InstanceRecycler::recycle_expired_txn_label() { } txn->atomic_set_ver_value(label_key, label_val); } + // Remove txn info kv + txn->remove(info_key); // Remove recycle txn kv txn->remove(k); err = txn->commit(); From d8c84b06ac5c5c14c1f182a58fac99e08ce41788 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Fri, 18 Apr 2025 01:10:40 +0800 Subject: [PATCH 3/9] 3 --- cloud/test/recycler_test.cpp | 231 +++++++++++++++++++++++++++++++++++ 1 file changed, 231 insertions(+) diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 5b9203b1d4ef79..08af3ce0058559 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -17,11 +17,13 @@ #include "recycler/recycler.h" +#include #include #include #include #include +#include #include #include #include @@ -35,11 +37,13 @@ #include "meta-service/keys.h" #include "meta-service/mem_txn_kv.h" #include "meta-service/meta_service.h" +#include "meta-service/txn_kv.h" #include "meta-service/txn_kv_error.h" #include "mock_accessor.h" #include "mock_resource_manager.h" #include "rate-limiter/rate_limiter.h" #include "recycler/checker.h" +#include "recycler/recycler.cpp" #include "recycler/storage_vault_accessor.h" #include "recycler/util.h" #include "recycler/white_black_list.h" @@ -4051,4 +4055,231 @@ TEST(RecyclerTest, delete_tmp_rowset_without_resource_id) { } } +static std::string generate_random_string(int length) { + std::string char_set = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + std::random_device rd; + std::mt19937 generator(rd()); + std::uniform_int_distribution distribution(0, char_set.length() - 1); + + std::string randomString; + for (int i = 0; i < length; ++i) { + randomString += char_set[distribution(generator)]; + } + return randomString; +} + +std::string instance_id = "concurrent_recycle_txn_label_test_" + generate_random_string(10); + +int put_single_kv(const std::unique_ptr& txn, int64_t i) { + std::string key; + std::string val; + int64_t db_id = i; + int64_t txn_id = 10000 + i; + int64_t sub_txn_id = 20000 + i; + int64_t current_time = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + // RecycleTxnKeyInfo -> RecycleTxnPB + RecycleTxnKeyInfo recycle_txn_key_info {instance_id, db_id, txn_id}; + recycle_txn_key(recycle_txn_key_info, &key); + RecycleTxnPB recycle_txn_pb; + if (i < 8000) { + recycle_txn_pb.set_creation_time(current_time - 4 * 24 * 3600 * 1000L); + } else { + recycle_txn_pb.set_creation_time(current_time); + } + recycle_txn_pb.set_label("recycle_txn_key_info_label_" + std::to_string(i)); + if (!recycle_txn_pb.SerializeToString(&val)) { + LOG_WARNING("failed to serialize recycle txn info") + .tag("key", hex(key)) + .tag("db_id", db_id) + .tag("txn_id", txn_id); + return -1; + } + LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", instance_id, db_id, + recycle_txn_pb.label(), hex(key)); + txn->put(key, val); + + // TxnIndexKey -> TxnIndexPB + key.clear(); + val.clear(); + key = txn_index_key({instance_id, txn_id}); + TxnIndexPB txn_index_pb; + if (!txn_index_pb.SerializeToString(&val)) { + LOG_WARNING("failed to serialize txn index") + .tag("key", hex(key)) + .tag("db_id", db_id) + .tag("txn_id", txn_id); + return -1; + } + txn->put(key, val); + + // TxnInfoKey -> TxnInfoPB + key = txn_info_key({instance_id, db_id, txn_id}); + TxnInfoPB txn_info_pb; + txn_info_pb.add_sub_txn_ids(sub_txn_id); + txn_info_pb.set_label("txn_info_label_" + std::to_string(i)); + if (!txn_info_pb.SerializeToString(&val)) { + LOG_WARNING("failed to serialize txn info") + .tag("key", hex(key)) + .tag("db_id", db_id) + .tag("txn_id", txn_id); + return -1; + } + txn->put(key, val); + + // SubTxnIndex -> TxnIndexPB + key.clear(); + val.clear(); + key = txn_index_key({instance_id, sub_txn_id}); + TxnIndexPB sub_txn_index_pb; + if (!sub_txn_index_pb.SerializeToString(&val)) { + LOG_WARNING("failed to serialize sub txn index") + .tag("key", hex(key)) + .tag("db_id", db_id) + .tag("txn_id", txn_id); + return -1; + } + txn->put(key, val); + + // TxnLabel -> TxnLabelPB + key.clear(); + val.clear(); + txn_label_key({instance_id, db_id, txn_info_pb.label()}, &key); + TxnLabelPB txn_label_pb; + txn_label_pb.add_txn_ids(txn_id); + LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", instance_id, db_id, + txn_info_pb.label(), hex(key)); + if (!txn_label_pb.SerializeToString(&val)) { + LOG_WARNING("failed to serialize txn label") + .tag("key", hex(key)) + .tag("db_id", db_id) + .tag("txn_id", txn_id); + return -1; + } + MemTxnKv::gen_version_timestamp(123456790, 0, &val); + txn->put(key, val); + + return 0; +} + +void put_kv(std::shared_ptr txn_kv) { + using namespace doris::cloud; + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + for (int64_t i = 0; i < 5000; i++) { + put_single_kv(txn, i); + } + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + for (int64_t i = 5000; i < 10000; i++) { + put_single_kv(txn, i); + } + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); +} + +void check_single_kv(const std::string_view& k, const std::string_view& v, + std::shared_ptr txn_kv) { + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + + // check RecycleTxnInfo + RecycleTxnPB recycle_txn_pb; + ASSERT_TRUE(recycle_txn_pb.ParseFromArray(v.data(), v.size())); + std::string_view k1 = k; + + // check TxnIndex + std::string index_key, index_value; + k1.remove_prefix(1); // Remove key space + std::vector, int, int>> out; + ASSERT_EQ(decode_key(&k1, &out), 0); + int64_t db_id = std::get(std::get<0>(out[3])); + int64_t txn_id = std::get(std::get<0>(out[4])); + index_key = txn_index_key({instance_id, txn_id}); + ASSERT_EQ(txn->get(index_key, &index_value), TxnErrorCode::TXN_OK); + + // check TxnInfo + std::string info_key, info_val; + txn_info_key({instance_id, db_id, txn_id}, &info_key); + ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK); + + // check SubTxnIndex + TxnInfoPB txn_info; + ASSERT_TRUE(txn_info.ParseFromString(info_val)); + std::vector sub_txn_index_keys; + std::string sub_txn_index_value; + for (auto sub_txn_id : txn_info.sub_txn_ids()) { + auto sub_txn_index_key = txn_index_key({instance_id, sub_txn_id}); + sub_txn_index_keys.push_back(sub_txn_index_key); + } + for (auto& sub_txn_index_key : sub_txn_index_keys) { + ASSERT_EQ(txn->get(sub_txn_index_key, &sub_txn_index_value), TxnErrorCode::TXN_OK); + } + + // check TxnLabel + std::string label_key, label_val; + txn_label_key({instance_id, db_id, txn_info.label()}, &label_key); + ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK) + << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", instance_id, db_id, + txn_info.label(), hex(label_key)); +} + +void check_kv(std::shared_ptr txn_kv, int64_t size) { + using namespace doris::cloud; + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id, 0, 0}; + RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id, INT64_MAX, INT64_MAX}; + std::string begin; + std::string end; + recycle_txn_key(recycle_txn_key_info0, &begin); + recycle_txn_key(recycle_txn_key_info1, &end); + int64_t total_kv = 0; + + std::unique_ptr it; + do { + int get_ret = txn_get(txn_kv.get(), begin, end, it); + if (get_ret != 0) { // txn kv may complain "Request for future version" + LOG(WARNING) << "failed to get kv, range=[" << hex(begin) << "," << hex(end) + << ") txn_get_ret=" << get_ret; + ASSERT_TRUE(false); + } + if (!it->has_next()) { + LOG(INFO) << "no keys in the given range=[" << hex(begin) << "," << hex(end) << ")"; + break; // scan finished + } + while (it->has_next()) { + // recycle corresponding resources + auto [k, v] = it->next(); + if (!it->has_next()) { + begin = k; + VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k); + } + check_single_kv(k, v, txn_kv); + total_kv++; + } + begin.push_back('\x00'); // Update to next smallest key for iteration + } while (it->more()); + ASSERT_EQ(total_kv, size); +} + +TEST(RecyclerTest, concurrent_recycle_txn_label_test) { + cloud::config::init(nullptr, true); + cloud::config::fdb_cluster_file_path = "/mnt/disk2/lianyukang/doris/fdb_cluster"; + auto fdb_txn_kv = std::dynamic_pointer_cast(std::make_shared()); + ASSERT_TRUE(fdb_txn_kv.get()) << "exit get FdbTxnKv error" << std::endl; + ASSERT_EQ(fdb_txn_kv->init(), 0) << "exit inti FdbTxnKv error" << std::endl; + put_kv(fdb_txn_kv); + check_kv(fdb_txn_kv, 10000); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + InstanceRecycler recycler(fdb_txn_kv, instance, thread_group, + std::make_shared(fdb_txn_kv)); + ASSERT_EQ(recycler.init(), 0); + ASSERT_EQ(recycler.recycle_expired_txn_label(), 0); + check_kv(fdb_txn_kv, 2000); +} + } // namespace doris::cloud From ce75aa428415fa3600d08ee6283b1472487a9535 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Fri, 18 Apr 2025 01:22:33 +0800 Subject: [PATCH 4/9] 4 --- cloud/src/recycler/recycler.cpp | 4 ++-- cloud/test/recycler_test.cpp | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 60fed39765c84d..d3001b4f09dd70 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -2583,6 +2583,7 @@ int InstanceRecycler::recycle_expired_txn_label() { // Remove txn index kv auto index_key = txn_index_key({instance_id_, txn_id}); txn->remove(index_key); + // Remove txn info kv std::string info_key, info_val; txn_info_key({instance_id_, db_id, txn_id}, &info_key); err = txn->get(info_key, &info_val); @@ -2595,6 +2596,7 @@ int InstanceRecycler::recycle_expired_txn_label() { LOG_WARNING("failed to parse txn info").tag("key", hex(info_key)); return -1; } + txn->remove(info_key); // Remove sub txn index kvs std::vector sub_txn_index_keys; for (auto sub_txn_id : txn_info.sub_txn_ids()) { @@ -2631,8 +2633,6 @@ int InstanceRecycler::recycle_expired_txn_label() { } txn->atomic_set_ver_value(label_key, label_val); } - // Remove txn info kv - txn->remove(info_key); // Remove recycle txn kv txn->remove(k); err = txn->commit(); diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 08af3ce0058559..61c7268df5042c 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -72,6 +72,7 @@ int main(int argc, char** argv) { config::recycler_sleep_before_scheduling_seconds = 0; // we dont have to wait in UT ::testing::InitGoogleTest(&argc, argv); + config::recycle_pool_parallelism=1; auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); s3_producer_pool->start(); auto recycle_tablet_pool = std::make_shared(config::recycle_pool_parallelism); From b00633f6d48856b1c37260192566fb589acb92f2 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Mon, 21 Apr 2025 17:33:39 +0800 Subject: [PATCH 5/9] 5 --- cloud/src/recycler/recycler.cpp | 1 + cloud/test/recycler_test.cpp | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index d3001b4f09dd70..6f11455ce00151 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -2674,6 +2674,7 @@ int InstanceRecycler::recycle_expired_txn_label() { .tag("instance_id", instance_id_); return ret; } + recycle_txn_info_keys.clear(); return ret; }; diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 61c7268df5042c..f90025e0aec35e 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -72,7 +72,6 @@ int main(int argc, char** argv) { config::recycler_sleep_before_scheduling_seconds = 0; // we dont have to wait in UT ::testing::InitGoogleTest(&argc, argv); - config::recycle_pool_parallelism=1; auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); s3_producer_pool->start(); auto recycle_tablet_pool = std::make_shared(config::recycle_pool_parallelism); @@ -4075,8 +4074,8 @@ int put_single_kv(const std::unique_ptr& txn, int64_t i) { std::string key; std::string val; int64_t db_id = i; - int64_t txn_id = 10000 + i; - int64_t sub_txn_id = 20000 + i; + int64_t txn_id = 100000 + i; + int64_t sub_txn_id = 200000 + i; int64_t current_time = duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); @@ -4266,8 +4265,9 @@ void check_kv(std::shared_ptr txn_kv, int64_t size) { } TEST(RecyclerTest, concurrent_recycle_txn_label_test) { + config::recycle_pool_parallelism = 32; cloud::config::init(nullptr, true); - cloud::config::fdb_cluster_file_path = "/mnt/disk2/lianyukang/doris/fdb_cluster"; + cloud::config::fdb_cluster_file_path = "fdb.cluster"; auto fdb_txn_kv = std::dynamic_pointer_cast(std::make_shared()); ASSERT_TRUE(fdb_txn_kv.get()) << "exit get FdbTxnKv error" << std::endl; ASSERT_EQ(fdb_txn_kv->init(), 0) << "exit inti FdbTxnKv error" << std::endl; From 67e179248478e18f03c88dada80784ac40d7e3a4 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 22 Apr 2025 16:06:16 +0800 Subject: [PATCH 6/9] 6 --- cloud/test/recycler_test.cpp | 117 ++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 58 deletions(-) diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index f90025e0aec35e..173e10dae13424 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -4070,9 +4070,11 @@ static std::string generate_random_string(int length) { std::string instance_id = "concurrent_recycle_txn_label_test_" + generate_random_string(10); -int put_single_kv(const std::unique_ptr& txn, int64_t i) { - std::string key; - std::string val; +// aaa +int make_single_txn_related_kvs(const std::unique_ptr& txn, int64_t i, + int64_t expired_kv_num) { + std::string recycle_txn_info_key; + std::string recycle_txn_info_val; int64_t db_id = i; int64_t txn_id = 100000 + i; int64_t sub_txn_id = 200000 + i; @@ -4082,105 +4084,102 @@ int put_single_kv(const std::unique_ptr& txn, int64_t i) { // RecycleTxnKeyInfo -> RecycleTxnPB RecycleTxnKeyInfo recycle_txn_key_info {instance_id, db_id, txn_id}; - recycle_txn_key(recycle_txn_key_info, &key); + recycle_txn_key(recycle_txn_key_info, &recycle_txn_info_key); RecycleTxnPB recycle_txn_pb; - if (i < 8000) { + if (i < expired_kv_num) { recycle_txn_pb.set_creation_time(current_time - 4 * 24 * 3600 * 1000L); } else { recycle_txn_pb.set_creation_time(current_time); } recycle_txn_pb.set_label("recycle_txn_key_info_label_" + std::to_string(i)); - if (!recycle_txn_pb.SerializeToString(&val)) { + if (!recycle_txn_pb.SerializeToString(&recycle_txn_info_val)) { LOG_WARNING("failed to serialize recycle txn info") - .tag("key", hex(key)) + .tag("key", hex(recycle_txn_info_key)) .tag("db_id", db_id) .tag("txn_id", txn_id); return -1; } LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", instance_id, db_id, - recycle_txn_pb.label(), hex(key)); - txn->put(key, val); + recycle_txn_pb.label(), hex(recycle_txn_info_key)); + txn->put(recycle_txn_info_key, recycle_txn_info_val); // TxnIndexKey -> TxnIndexPB - key.clear(); - val.clear(); - key = txn_index_key({instance_id, txn_id}); + std::string txn_idx_key = txn_index_key({instance_id, txn_id}); + std::string txn_idx_val; TxnIndexPB txn_index_pb; - if (!txn_index_pb.SerializeToString(&val)) { + if (!txn_index_pb.SerializeToString(&txn_idx_val)) { LOG_WARNING("failed to serialize txn index") - .tag("key", hex(key)) + .tag("key", hex(txn_idx_key)) .tag("db_id", db_id) .tag("txn_id", txn_id); return -1; } - txn->put(key, val); + txn->put(txn_idx_key, txn_idx_val); // TxnInfoKey -> TxnInfoPB - key = txn_info_key({instance_id, db_id, txn_id}); + std::string info_key = txn_info_key({instance_id, db_id, txn_id}); + std::string info_val; TxnInfoPB txn_info_pb; txn_info_pb.add_sub_txn_ids(sub_txn_id); txn_info_pb.set_label("txn_info_label_" + std::to_string(i)); - if (!txn_info_pb.SerializeToString(&val)) { + if (!txn_info_pb.SerializeToString(&info_val)) { LOG_WARNING("failed to serialize txn info") - .tag("key", hex(key)) + .tag("key", hex(info_key)) .tag("db_id", db_id) .tag("txn_id", txn_id); return -1; } - txn->put(key, val); + txn->put(info_key, info_val); // SubTxnIndex -> TxnIndexPB - key.clear(); - val.clear(); - key = txn_index_key({instance_id, sub_txn_id}); + std::string idx_key = txn_index_key({instance_id, sub_txn_id}); + std::string idx_val; TxnIndexPB sub_txn_index_pb; - if (!sub_txn_index_pb.SerializeToString(&val)) { + if (!sub_txn_index_pb.SerializeToString(&idx_val)) { LOG_WARNING("failed to serialize sub txn index") - .tag("key", hex(key)) + .tag("key", hex(idx_key)) .tag("db_id", db_id) .tag("txn_id", txn_id); return -1; } - txn->put(key, val); + txn->put(idx_key, idx_val); // TxnLabel -> TxnLabelPB - key.clear(); - val.clear(); - txn_label_key({instance_id, db_id, txn_info_pb.label()}, &key); + std::string label_key; + std::string label_val; + txn_label_key({instance_id, db_id, txn_info_pb.label()}, &label_key); TxnLabelPB txn_label_pb; txn_label_pb.add_txn_ids(txn_id); LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", instance_id, db_id, - txn_info_pb.label(), hex(key)); - if (!txn_label_pb.SerializeToString(&val)) { + txn_info_pb.label(), hex(label_key)); + if (!txn_label_pb.SerializeToString(&label_val)) { LOG_WARNING("failed to serialize txn label") - .tag("key", hex(key)) + .tag("key", hex(label_key)) .tag("db_id", db_id) .tag("txn_id", txn_id); return -1; } - MemTxnKv::gen_version_timestamp(123456790, 0, &val); - txn->put(key, val); + MemTxnKv::gen_version_timestamp(123456790, 0, &label_val); + txn->put(label_key, label_val); return 0; } -void put_kv(std::shared_ptr txn_kv) { +// bbb +void make_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t total_kv_num, + int64_t expired_kv_num) { using namespace doris::cloud; std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int64_t i = 0; i < 5000; i++) { - put_single_kv(txn, i); - } - ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); - ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int64_t i = 5000; i < 10000; i++) { - put_single_kv(txn, i); + for (int64_t i = 0; i < total_kv_num; i++) { + make_single_txn_related_kvs(txn, i, expired_kv_num); } ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); } -void check_single_kv(const std::string_view& k, const std::string_view& v, - std::shared_ptr txn_kv) { +// ccc +void check_single_txn_info_kvs(const std::string_view& k, const std::string_view& v, + std::shared_ptr txn_kv) { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4225,16 +4224,15 @@ void check_single_kv(const std::string_view& k, const std::string_view& v, txn_info.label(), hex(label_key)); } -void check_kv(std::shared_ptr txn_kv, int64_t size) { +// ddd +void check_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t size) { using namespace doris::cloud; std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id, 0, 0}; RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id, INT64_MAX, INT64_MAX}; - std::string begin; - std::string end; - recycle_txn_key(recycle_txn_key_info0, &begin); - recycle_txn_key(recycle_txn_key_info1, &end); + std::string begin = recycle_txn_key(recycle_txn_key_info0); + std::string end = recycle_txn_key(recycle_txn_key_info1); int64_t total_kv = 0; std::unique_ptr it; @@ -4256,7 +4254,7 @@ void check_kv(std::shared_ptr txn_kv, int64_t size) { begin = k; VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k); } - check_single_kv(k, v, txn_kv); + check_single_txn_info_kvs(k, v, txn_kv); total_kv++; } begin.push_back('\x00'); // Update to next smallest key for iteration @@ -4267,20 +4265,23 @@ void check_kv(std::shared_ptr txn_kv, int64_t size) { TEST(RecyclerTest, concurrent_recycle_txn_label_test) { config::recycle_pool_parallelism = 32; cloud::config::init(nullptr, true); - cloud::config::fdb_cluster_file_path = "fdb.cluster"; - auto fdb_txn_kv = std::dynamic_pointer_cast(std::make_shared()); - ASSERT_TRUE(fdb_txn_kv.get()) << "exit get FdbTxnKv error" << std::endl; - ASSERT_EQ(fdb_txn_kv->init(), 0) << "exit inti FdbTxnKv error" << std::endl; - put_kv(fdb_txn_kv); - check_kv(fdb_txn_kv, 10000); + auto mem_txn_kv = std::dynamic_pointer_cast(std::make_shared()); + ASSERT_TRUE(mem_txn_kv.get()) << "exit get MemTxnKv error" << std::endl; + make_multiple_txn_info_kvs(mem_txn_kv, 10000, 8000); + check_multiple_txn_info_kvs(mem_txn_kv, 10000); InstanceInfoPB instance; instance.set_instance_id(instance_id); - InstanceRecycler recycler(fdb_txn_kv, instance, thread_group, - std::make_shared(fdb_txn_kv)); + InstanceRecycler recycler(mem_txn_kv, instance, thread_group, + std::make_shared(mem_txn_kv)); ASSERT_EQ(recycler.init(), 0); + auto start = std::chrono::steady_clock::now(); ASSERT_EQ(recycler.recycle_expired_txn_label(), 0); - check_kv(fdb_txn_kv, 2000); + auto finish = std::chrono::steady_clock::now(); + std::cout << "recycle expired txn label cost=" + << std::chrono::duration_cast(finish - start).count() + << "ms" << std::endl; + check_multiple_txn_info_kvs(mem_txn_kv, 2000); } } // namespace doris::cloud From e8f247871ffd4ca0c878892d5c4caecf2350c58f Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 22 Apr 2025 17:38:23 +0800 Subject: [PATCH 7/9] 7 --- cloud/test/recycler_test.cpp | 64 +++++++++++++++++++++++------------- 1 file changed, 42 insertions(+), 22 deletions(-) diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 173e10dae13424..d96a5ad624490f 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -4071,13 +4071,16 @@ static std::string generate_random_string(int length) { std::string instance_id = "concurrent_recycle_txn_label_test_" + generate_random_string(10); // aaa -int make_single_txn_related_kvs(const std::unique_ptr& txn, int64_t i, - int64_t expired_kv_num) { +void make_single_txn_related_kvs(std::shared_ptr txn_kv, int64_t i, + int64_t expired_kv_num) { + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string recycle_txn_info_key; std::string recycle_txn_info_val; int64_t db_id = i; - int64_t txn_id = 100000 + i; - int64_t sub_txn_id = 200000 + i; + int64_t txn_id = 1000000 + i; + int64_t sub_txn_id = 2000000 + i; int64_t current_time = duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); @@ -4097,7 +4100,7 @@ int make_single_txn_related_kvs(const std::unique_ptr& txn, int64_t .tag("key", hex(recycle_txn_info_key)) .tag("db_id", db_id) .tag("txn_id", txn_id); - return -1; + return; } LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", instance_id, db_id, recycle_txn_pb.label(), hex(recycle_txn_info_key)); @@ -4112,7 +4115,7 @@ int make_single_txn_related_kvs(const std::unique_ptr& txn, int64_t .tag("key", hex(txn_idx_key)) .tag("db_id", db_id) .tag("txn_id", txn_id); - return -1; + return; } txn->put(txn_idx_key, txn_idx_val); @@ -4127,7 +4130,7 @@ int make_single_txn_related_kvs(const std::unique_ptr& txn, int64_t .tag("key", hex(info_key)) .tag("db_id", db_id) .tag("txn_id", txn_id); - return -1; + return; } txn->put(info_key, info_val); @@ -4140,7 +4143,7 @@ int make_single_txn_related_kvs(const std::unique_ptr& txn, int64_t .tag("key", hex(idx_key)) .tag("db_id", db_id) .tag("txn_id", txn_id); - return -1; + return; } txn->put(idx_key, idx_val); @@ -4157,24 +4160,23 @@ int make_single_txn_related_kvs(const std::unique_ptr& txn, int64_t .tag("key", hex(label_key)) .tag("db_id", db_id) .tag("txn_id", txn_id); - return -1; + return; } MemTxnKv::gen_version_timestamp(123456790, 0, &label_val); txn->put(label_key, label_val); - return 0; + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); } // bbb void make_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t total_kv_num, int64_t expired_kv_num) { using namespace doris::cloud; - std::unique_ptr txn; - ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int64_t i = 0; i < total_kv_num; i++) { - make_single_txn_related_kvs(txn, i, expired_kv_num); + for (int64_t i = 0; i < total_kv_num; i += 2000) { + for (int64_t j = i; j < i + 2000; j++) { + make_single_txn_related_kvs(txn_kv, j, expired_kv_num); + } } - ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); } // ccc @@ -4263,17 +4265,35 @@ void check_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t s } TEST(RecyclerTest, concurrent_recycle_txn_label_test) { - config::recycle_pool_parallelism = 32; + doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group; + config::recycle_pool_parallelism = 20; + auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); + s3_producer_pool->start(); + auto recycle_tablet_pool = std::make_shared(config::recycle_pool_parallelism); + recycle_tablet_pool->start(); + auto group_recycle_function_pool = + std::make_shared(config::recycle_pool_parallelism); + group_recycle_function_pool->start(); + recycle_txn_label_thread_group = + RecyclerThreadPoolGroup(std::move(s3_producer_pool), std::move(recycle_tablet_pool), + std::move(group_recycle_function_pool)); cloud::config::init(nullptr, true); + auto mem_txn_kv = std::dynamic_pointer_cast(std::make_shared()); - ASSERT_TRUE(mem_txn_kv.get()) << "exit get MemTxnKv error" << std::endl; - make_multiple_txn_info_kvs(mem_txn_kv, 10000, 8000); - check_multiple_txn_info_kvs(mem_txn_kv, 10000); + + cloud::config::fdb_cluster_file_path = "/mnt/disk2/lianyukang/doris/fdb_cluster"; + auto fdb_txn_kv = std::dynamic_pointer_cast(std::make_shared()); + fdb_txn_kv->init(); + + auto txn_kv = fdb_txn_kv.get() ? fdb_txn_kv : mem_txn_kv; + ASSERT_TRUE(txn_kv.get()) << "exit get MemTxnKv error" << std::endl; + make_multiple_txn_info_kvs(txn_kv, 10000, 8000); + check_multiple_txn_info_kvs(txn_kv, 10000); InstanceInfoPB instance; instance.set_instance_id(instance_id); - InstanceRecycler recycler(mem_txn_kv, instance, thread_group, - std::make_shared(mem_txn_kv)); + InstanceRecycler recycler(txn_kv, instance, recycle_txn_label_thread_group, + std::make_shared(txn_kv)); ASSERT_EQ(recycler.init(), 0); auto start = std::chrono::steady_clock::now(); ASSERT_EQ(recycler.recycle_expired_txn_label(), 0); @@ -4281,7 +4301,7 @@ TEST(RecyclerTest, concurrent_recycle_txn_label_test) { std::cout << "recycle expired txn label cost=" << std::chrono::duration_cast(finish - start).count() << "ms" << std::endl; - check_multiple_txn_info_kvs(mem_txn_kv, 2000); + check_multiple_txn_info_kvs(txn_kv, 2000); } } // namespace doris::cloud From 22128e2b7305a2996ef6ae6049102292e5e3ba7e Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 22 Apr 2025 17:51:06 +0800 Subject: [PATCH 8/9] 8 --- cloud/test/recycler_test.cpp | 42 +++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index d96a5ad624490f..8c0dace3fd0c5b 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -4070,7 +4070,13 @@ static std::string generate_random_string(int length) { std::string instance_id = "concurrent_recycle_txn_label_test_" + generate_random_string(10); -// aaa +/** + * Creates key-value pairs for a single transaction + * Includes key-value pairs for RecycleTxnKeyInfo, TxnIndexKey, TxnInfoKey, SubTxnIndex, and TxnLabel + * @param txn_kv Transaction key-value storage object + * @param i Index number used to generate unique IDs + * @param expired_kv_num Number of expired key-value pairs + */ void make_single_txn_related_kvs(std::shared_ptr txn_kv, int64_t i, int64_t expired_kv_num) { std::unique_ptr txn; @@ -4168,7 +4174,13 @@ void make_single_txn_related_kvs(std::shared_ptr txn_kv, int64_t i ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); } -// bbb +/** + * Creates multiple transaction info key-value pairs in batches + * Processes in batches of 2000 entries + * @param txn_kv Transaction key-value storage object + * @param total_kv_num Total number of key-value pairs to create + * @param expired_kv_num Number of expired key-value pairs + */ void make_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t total_kv_num, int64_t expired_kv_num) { using namespace doris::cloud; @@ -4179,7 +4191,13 @@ void make_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t to } } -// ccc +/** + * Verifies key-value pairs related to a single transaction + * Validates existence and format of RecycleTxnInfo, TxnIndex, TxnInfo, SubTxnIndex, and TxnLabel + * @param k Key + * @param v Value + * @param txn_kv Transaction key-value storage object + */ void check_single_txn_info_kvs(const std::string_view& k, const std::string_view& v, std::shared_ptr txn_kv) { std::unique_ptr txn; @@ -4226,7 +4244,12 @@ void check_single_txn_info_kvs(const std::string_view& k, const std::string_view txn_info.label(), hex(label_key)); } -// ddd +/** + * Verifies multiple transaction info key-value pairs + * Uses an iterator to traverse and validate key-value pairs within a specified range + * @param txn_kv Transaction key-value storage object + * @param size Expected number of key-value pairs + */ void check_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t size) { using namespace doris::cloud; std::unique_ptr txn; @@ -4266,7 +4289,7 @@ void check_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t s TEST(RecyclerTest, concurrent_recycle_txn_label_test) { doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group; - config::recycle_pool_parallelism = 20; + config::recycle_pool_parallelism = 10; auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); s3_producer_pool->start(); auto recycle_tablet_pool = std::make_shared(config::recycle_pool_parallelism); @@ -4281,11 +4304,11 @@ TEST(RecyclerTest, concurrent_recycle_txn_label_test) { auto mem_txn_kv = std::dynamic_pointer_cast(std::make_shared()); - cloud::config::fdb_cluster_file_path = "/mnt/disk2/lianyukang/doris/fdb_cluster"; - auto fdb_txn_kv = std::dynamic_pointer_cast(std::make_shared()); - fdb_txn_kv->init(); + // cloud::config::fdb_cluster_file_path = "fdb.cluster"; + // auto fdb_txn_kv = std::dynamic_pointer_cast(std::make_shared()); + // fdb_txn_kv->init(); - auto txn_kv = fdb_txn_kv.get() ? fdb_txn_kv : mem_txn_kv; + auto txn_kv = mem_txn_kv; ASSERT_TRUE(txn_kv.get()) << "exit get MemTxnKv error" << std::endl; make_multiple_txn_info_kvs(txn_kv, 10000, 8000); check_multiple_txn_info_kvs(txn_kv, 10000); @@ -4303,5 +4326,4 @@ TEST(RecyclerTest, concurrent_recycle_txn_label_test) { << "ms" << std::endl; check_multiple_txn_info_kvs(txn_kv, 2000); } - } // namespace doris::cloud From 767af3873ba852b4066ba29c1f91cddd1d97e441 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 22 Apr 2025 21:11:12 +0800 Subject: [PATCH 9/9] 9 --- cloud/test/recycler_test.cpp | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 8c0dace3fd0c5b..2383b71b210f17 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -4288,21 +4288,24 @@ void check_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t s } TEST(RecyclerTest, concurrent_recycle_txn_label_test) { + config::label_keep_max_second = 259200; doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group; config::recycle_pool_parallelism = 10; - auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); - s3_producer_pool->start(); - auto recycle_tablet_pool = std::make_shared(config::recycle_pool_parallelism); - recycle_tablet_pool->start(); - auto group_recycle_function_pool = + auto recycle_txn_label_s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); - group_recycle_function_pool->start(); + recycle_txn_label_s3_producer_pool->start(); + auto recycle_txn_label_recycle_tablet_pool = + std::make_shared(config::recycle_pool_parallelism); + recycle_txn_label_recycle_tablet_pool->start(); + auto recycle_txn_label_group_recycle_function_pool = + std::make_shared(config::recycle_pool_parallelism); + recycle_txn_label_group_recycle_function_pool->start(); recycle_txn_label_thread_group = - RecyclerThreadPoolGroup(std::move(s3_producer_pool), std::move(recycle_tablet_pool), - std::move(group_recycle_function_pool)); - cloud::config::init(nullptr, true); + RecyclerThreadPoolGroup(std::move(recycle_txn_label_s3_producer_pool), + std::move(recycle_txn_label_recycle_tablet_pool), + std::move(recycle_txn_label_group_recycle_function_pool)); - auto mem_txn_kv = std::dynamic_pointer_cast(std::make_shared()); + auto mem_txn_kv = std::make_shared(); // cloud::config::fdb_cluster_file_path = "fdb.cluster"; // auto fdb_txn_kv = std::dynamic_pointer_cast(std::make_shared());