From e194bddf8acf7edff93488b7fe4d5b69076bcb28 Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Mon, 12 May 2025 18:03:40 +0800 Subject: [PATCH] [Fix](recycler) Fix transaction label recycling to prevent key cleanup failures and 'key not found' errors (#50766) Related PR: #50037 If an error occurs during transaction label recycling, the vector recording keys cannot be cleaned up. Keys that were already cleaned up in the previous scan and recycle cycle will be carried over to the next scan and recycle cycle, causing a large number of 'key not found' errors. --- cloud/src/recycler/recycler.cpp | 8 ++++- cloud/test/recycler_test.cpp | 55 +++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 0cee75994e65fa..307beab63d40b0 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -2635,6 +2635,11 @@ int InstanceRecycler::recycle_expired_txn_label() { }; auto loop_done = [&]() -> int { + std::unique_ptr> defer( + (int*)0x01, [&](int*) { recycle_txn_info_keys.clear(); }); + TEST_SYNC_POINT_CALLBACK( + "InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys", + &recycle_txn_info_keys); for (const auto& k : recycle_txn_info_keys) { concurrent_delete_executor.add([&]() { if (delete_recycle_txn_kv(k) != 0) { @@ -2656,6 +2661,8 @@ int InstanceRecycler::recycle_expired_txn_label() { ret = finished ? ret : -1; + TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_expired_txn_label.failure", &ret); + if (ret != 0) { LOG_WARNING("recycle txn kv ret!=0") .tag("finished", finished) @@ -2663,7 +2670,6 @@ int InstanceRecycler::recycle_expired_txn_label() { .tag("instance_id", instance_id_); return ret; } - recycle_txn_info_keys.clear(); return ret; }; diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 6ef782b9c5fb5e..bcd7dd39160651 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -4285,4 +4285,59 @@ TEST(RecyclerTest, concurrent_recycle_txn_label_test) { << "ms" << std::endl; check_multiple_txn_info_kvs(txn_kv, 2000); } + +TEST(RecyclerTest, concurrent_recycle_txn_label_failure_test) { + config::label_keep_max_second = 259200; + doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group; + config::recycle_pool_parallelism = 40; + auto recycle_txn_label_s3_producer_pool = + std::make_shared(config::recycle_pool_parallelism); + recycle_txn_label_s3_producer_pool->start(); + auto recycle_txn_label_recycle_tablet_pool = + std::make_shared(config::recycle_pool_parallelism); + recycle_txn_label_recycle_tablet_pool->start(); + auto recycle_txn_label_group_recycle_function_pool = + std::make_shared(config::recycle_pool_parallelism); + recycle_txn_label_group_recycle_function_pool->start(); + recycle_txn_label_thread_group = + RecyclerThreadPoolGroup(std::move(recycle_txn_label_s3_producer_pool), + std::move(recycle_txn_label_recycle_tablet_pool), + std::move(recycle_txn_label_group_recycle_function_pool)); + + auto mem_txn_kv = std::make_shared(); + + auto txn_kv = mem_txn_kv; + ASSERT_TRUE(txn_kv.get()) << "exit get MemTxnKv error" << std::endl; + make_multiple_txn_info_kvs(txn_kv, 20000, 15000); + check_multiple_txn_info_kvs(txn_kv, 20000); + + auto* sp = SyncPoint::get_instance(); + std::unique_ptr> defer( + (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); + sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys", + [](auto&& args) { + auto* recycle_txn_info_keys = + try_any_cast*>(args[0]); + + ASSERT_LE(recycle_txn_info_keys->size(), 10000); + }); + sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.failure", [](auto&& args) { + auto* ret = try_any_cast(args[0]); + *ret = -1; + }); + sp->enable_processing(); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + InstanceRecycler recycler(txn_kv, instance, recycle_txn_label_thread_group, + std::make_shared(txn_kv)); + ASSERT_EQ(recycler.init(), 0); + auto start = std::chrono::steady_clock::now(); + ASSERT_EQ(recycler.recycle_expired_txn_label(), -1); + auto finish = std::chrono::steady_clock::now(); + std::cout << "recycle expired txn label cost=" + << std::chrono::duration_cast(finish - start).count() + << "ms" << std::endl; + check_multiple_txn_info_kvs(txn_kv, 5000); +} } // namespace doris::cloud