diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 581ee4067d4c17..6f11455ce00151 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -2500,6 +2500,7 @@ int InstanceRecycler::recycle_expired_txn_label() { int64_t num_scanned = 0; int64_t num_expired = 0; int64_t num_recycled = 0; + int ret = 0; RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id_, 0, 0}; RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id_, INT64_MAX, INT64_MAX}; @@ -2507,6 +2508,7 @@ int InstanceRecycler::recycle_expired_txn_label() { std::string end_recycle_txn_key; recycle_txn_key(recycle_txn_key_info0, &begin_recycle_txn_key); recycle_txn_key(recycle_txn_key_info1, &end_recycle_txn_key); + std::vector recycle_txn_info_keys; LOG_INFO("begin to recycle expired txn").tag("instance_id", instance_id_); @@ -2534,12 +2536,15 @@ int InstanceRecycler::recycle_expired_txn_label() { return final_expiration; }; + SyncExecutor concurrent_delete_executor( + _thread_pool_group.s3_producer_pool, + fmt::format("recycle expired txn label, instance id {}", instance_id_), + [](const int& ret) { return ret != 0; }); + int64_t current_time_ms = duration_cast(system_clock::now().time_since_epoch()).count(); - auto handle_recycle_txn_kv = [&num_scanned, &num_expired, &num_recycled, ¤t_time_ms, - &calc_expiration, - this](std::string_view k, std::string_view v) -> int { + auto handle_recycle_txn_kv = [&](std::string_view k, std::string_view v) -> int { ++num_scanned; RecycleTxnPB recycle_txn_pb; if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) { @@ -2551,10 +2556,12 @@ int InstanceRecycler::recycle_expired_txn_label() { (calc_expiration(recycle_txn_pb) <= current_time_ms)) { VLOG_DEBUG << "found recycle txn, key=" << hex(k); num_expired++; - } else { - return 0; + recycle_txn_info_keys.emplace_back(k); } + return 0; + }; + auto delete_recycle_txn_kv = [&](const std::string& k) -> int { std::string_view k1 = k; //RecycleTxnKeyInfo 0:instance_id 1:db_id 2:txn_id k1.remove_prefix(1); // Remove key space @@ -2638,8 +2645,41 @@ int InstanceRecycler::recycle_expired_txn_label() { return 0; }; + auto loop_done = [&]() -> int { + for (const auto& k : recycle_txn_info_keys) { + concurrent_delete_executor.add([&]() { + if (delete_recycle_txn_kv(k) != 0) { + LOG_WARNING("failed to delete recycle txn kv") + .tag("instance id", instance_id_) + .tag("key", hex(k)); + return -1; + } + return 0; + }); + } + bool finished = true; + std::vector rets = concurrent_delete_executor.when_all(&finished); + for (int r : rets) { + if (r != 0) { + ret = -1; + } + } + + ret = finished ? ret : -1; + + if (ret != 0) { + LOG_WARNING("recycle txn kv ret!=0") + .tag("finished", finished) + .tag("ret", ret) + .tag("instance_id", instance_id_); + return ret; + } + recycle_txn_info_keys.clear(); + return ret; + }; + return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key, - std::move(handle_recycle_txn_kv)); + std::move(handle_recycle_txn_kv), std::move(loop_done)); } struct CopyJobIdTuple { diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 5b9203b1d4ef79..2383b71b210f17 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -17,11 +17,13 @@ #include "recycler/recycler.h" +#include #include #include #include #include +#include #include #include #include @@ -35,11 +37,13 @@ #include "meta-service/keys.h" #include "meta-service/mem_txn_kv.h" #include "meta-service/meta_service.h" +#include "meta-service/txn_kv.h" #include "meta-service/txn_kv_error.h" #include "mock_accessor.h" #include "mock_resource_manager.h" #include "rate-limiter/rate_limiter.h" #include "recycler/checker.h" +#include "recycler/recycler.cpp" #include "recycler/storage_vault_accessor.h" #include "recycler/util.h" #include "recycler/white_black_list.h" @@ -4051,4 +4055,278 @@ TEST(RecyclerTest, delete_tmp_rowset_without_resource_id) { } } +static std::string generate_random_string(int length) { + std::string char_set = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + std::random_device rd; + std::mt19937 generator(rd()); + std::uniform_int_distribution distribution(0, char_set.length() - 1); + + std::string randomString; + for (int i = 0; i < length; ++i) { + randomString += char_set[distribution(generator)]; + } + return randomString; +} + +std::string instance_id = "concurrent_recycle_txn_label_test_" + generate_random_string(10); + +/** + * Creates key-value pairs for a single transaction + * Includes key-value pairs for RecycleTxnKeyInfo, TxnIndexKey, TxnInfoKey, SubTxnIndex, and TxnLabel + * @param txn_kv Transaction key-value storage object + * @param i Index number used to generate unique IDs + * @param expired_kv_num Number of expired key-value pairs + */ +void make_single_txn_related_kvs(std::shared_ptr txn_kv, int64_t i, + int64_t expired_kv_num) { + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + + std::string recycle_txn_info_key; + std::string recycle_txn_info_val; + int64_t db_id = i; + int64_t txn_id = 1000000 + i; + int64_t sub_txn_id = 2000000 + i; + int64_t current_time = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + // RecycleTxnKeyInfo -> RecycleTxnPB + RecycleTxnKeyInfo recycle_txn_key_info {instance_id, db_id, txn_id}; + recycle_txn_key(recycle_txn_key_info, &recycle_txn_info_key); + RecycleTxnPB recycle_txn_pb; + if (i < expired_kv_num) { + recycle_txn_pb.set_creation_time(current_time - 4 * 24 * 3600 * 1000L); + } else { + recycle_txn_pb.set_creation_time(current_time); + } + recycle_txn_pb.set_label("recycle_txn_key_info_label_" + std::to_string(i)); + if (!recycle_txn_pb.SerializeToString(&recycle_txn_info_val)) { + LOG_WARNING("failed to serialize recycle txn info") + .tag("key", hex(recycle_txn_info_key)) + .tag("db_id", db_id) + .tag("txn_id", txn_id); + return; + } + LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", instance_id, db_id, + recycle_txn_pb.label(), hex(recycle_txn_info_key)); + txn->put(recycle_txn_info_key, recycle_txn_info_val); + + // TxnIndexKey -> TxnIndexPB + std::string txn_idx_key = txn_index_key({instance_id, txn_id}); + std::string txn_idx_val; + TxnIndexPB txn_index_pb; + if (!txn_index_pb.SerializeToString(&txn_idx_val)) { + LOG_WARNING("failed to serialize txn index") + .tag("key", hex(txn_idx_key)) + .tag("db_id", db_id) + .tag("txn_id", txn_id); + return; + } + txn->put(txn_idx_key, txn_idx_val); + + // TxnInfoKey -> TxnInfoPB + std::string info_key = txn_info_key({instance_id, db_id, txn_id}); + std::string info_val; + TxnInfoPB txn_info_pb; + txn_info_pb.add_sub_txn_ids(sub_txn_id); + txn_info_pb.set_label("txn_info_label_" + std::to_string(i)); + if (!txn_info_pb.SerializeToString(&info_val)) { + LOG_WARNING("failed to serialize txn info") + .tag("key", hex(info_key)) + .tag("db_id", db_id) + .tag("txn_id", txn_id); + return; + } + txn->put(info_key, info_val); + + // SubTxnIndex -> TxnIndexPB + std::string idx_key = txn_index_key({instance_id, sub_txn_id}); + std::string idx_val; + TxnIndexPB sub_txn_index_pb; + if (!sub_txn_index_pb.SerializeToString(&idx_val)) { + LOG_WARNING("failed to serialize sub txn index") + .tag("key", hex(idx_key)) + .tag("db_id", db_id) + .tag("txn_id", txn_id); + return; + } + txn->put(idx_key, idx_val); + + // TxnLabel -> TxnLabelPB + std::string label_key; + std::string label_val; + txn_label_key({instance_id, db_id, txn_info_pb.label()}, &label_key); + TxnLabelPB txn_label_pb; + txn_label_pb.add_txn_ids(txn_id); + LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", instance_id, db_id, + txn_info_pb.label(), hex(label_key)); + if (!txn_label_pb.SerializeToString(&label_val)) { + LOG_WARNING("failed to serialize txn label") + .tag("key", hex(label_key)) + .tag("db_id", db_id) + .tag("txn_id", txn_id); + return; + } + MemTxnKv::gen_version_timestamp(123456790, 0, &label_val); + txn->put(label_key, label_val); + + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); +} + +/** + * Creates multiple transaction info key-value pairs in batches + * Processes in batches of 2000 entries + * @param txn_kv Transaction key-value storage object + * @param total_kv_num Total number of key-value pairs to create + * @param expired_kv_num Number of expired key-value pairs + */ +void make_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t total_kv_num, + int64_t expired_kv_num) { + using namespace doris::cloud; + for (int64_t i = 0; i < total_kv_num; i += 2000) { + for (int64_t j = i; j < i + 2000; j++) { + make_single_txn_related_kvs(txn_kv, j, expired_kv_num); + } + } +} + +/** + * Verifies key-value pairs related to a single transaction + * Validates existence and format of RecycleTxnInfo, TxnIndex, TxnInfo, SubTxnIndex, and TxnLabel + * @param k Key + * @param v Value + * @param txn_kv Transaction key-value storage object + */ +void check_single_txn_info_kvs(const std::string_view& k, const std::string_view& v, + std::shared_ptr txn_kv) { + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + + // check RecycleTxnInfo + RecycleTxnPB recycle_txn_pb; + ASSERT_TRUE(recycle_txn_pb.ParseFromArray(v.data(), v.size())); + std::string_view k1 = k; + + // check TxnIndex + std::string index_key, index_value; + k1.remove_prefix(1); // Remove key space + std::vector, int, int>> out; + ASSERT_EQ(decode_key(&k1, &out), 0); + int64_t db_id = std::get(std::get<0>(out[3])); + int64_t txn_id = std::get(std::get<0>(out[4])); + index_key = txn_index_key({instance_id, txn_id}); + ASSERT_EQ(txn->get(index_key, &index_value), TxnErrorCode::TXN_OK); + + // check TxnInfo + std::string info_key, info_val; + txn_info_key({instance_id, db_id, txn_id}, &info_key); + ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK); + + // check SubTxnIndex + TxnInfoPB txn_info; + ASSERT_TRUE(txn_info.ParseFromString(info_val)); + std::vector sub_txn_index_keys; + std::string sub_txn_index_value; + for (auto sub_txn_id : txn_info.sub_txn_ids()) { + auto sub_txn_index_key = txn_index_key({instance_id, sub_txn_id}); + sub_txn_index_keys.push_back(sub_txn_index_key); + } + for (auto& sub_txn_index_key : sub_txn_index_keys) { + ASSERT_EQ(txn->get(sub_txn_index_key, &sub_txn_index_value), TxnErrorCode::TXN_OK); + } + + // check TxnLabel + std::string label_key, label_val; + txn_label_key({instance_id, db_id, txn_info.label()}, &label_key); + ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK) + << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", instance_id, db_id, + txn_info.label(), hex(label_key)); +} + +/** + * Verifies multiple transaction info key-value pairs + * Uses an iterator to traverse and validate key-value pairs within a specified range + * @param txn_kv Transaction key-value storage object + * @param size Expected number of key-value pairs + */ +void check_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t size) { + using namespace doris::cloud; + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id, 0, 0}; + RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id, INT64_MAX, INT64_MAX}; + std::string begin = recycle_txn_key(recycle_txn_key_info0); + std::string end = recycle_txn_key(recycle_txn_key_info1); + int64_t total_kv = 0; + + std::unique_ptr it; + do { + int get_ret = txn_get(txn_kv.get(), begin, end, it); + if (get_ret != 0) { // txn kv may complain "Request for future version" + LOG(WARNING) << "failed to get kv, range=[" << hex(begin) << "," << hex(end) + << ") txn_get_ret=" << get_ret; + ASSERT_TRUE(false); + } + if (!it->has_next()) { + LOG(INFO) << "no keys in the given range=[" << hex(begin) << "," << hex(end) << ")"; + break; // scan finished + } + while (it->has_next()) { + // recycle corresponding resources + auto [k, v] = it->next(); + if (!it->has_next()) { + begin = k; + VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k); + } + check_single_txn_info_kvs(k, v, txn_kv); + total_kv++; + } + begin.push_back('\x00'); // Update to next smallest key for iteration + } while (it->more()); + ASSERT_EQ(total_kv, size); +} + +TEST(RecyclerTest, concurrent_recycle_txn_label_test) { + config::label_keep_max_second = 259200; + doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group; + config::recycle_pool_parallelism = 10; + auto recycle_txn_label_s3_producer_pool = + std::make_shared(config::recycle_pool_parallelism); + recycle_txn_label_s3_producer_pool->start(); + auto recycle_txn_label_recycle_tablet_pool = + std::make_shared(config::recycle_pool_parallelism); + recycle_txn_label_recycle_tablet_pool->start(); + auto recycle_txn_label_group_recycle_function_pool = + std::make_shared(config::recycle_pool_parallelism); + recycle_txn_label_group_recycle_function_pool->start(); + recycle_txn_label_thread_group = + RecyclerThreadPoolGroup(std::move(recycle_txn_label_s3_producer_pool), + std::move(recycle_txn_label_recycle_tablet_pool), + std::move(recycle_txn_label_group_recycle_function_pool)); + + auto mem_txn_kv = std::make_shared(); + + // cloud::config::fdb_cluster_file_path = "fdb.cluster"; + // auto fdb_txn_kv = std::dynamic_pointer_cast(std::make_shared()); + // fdb_txn_kv->init(); + + auto txn_kv = mem_txn_kv; + ASSERT_TRUE(txn_kv.get()) << "exit get MemTxnKv error" << std::endl; + make_multiple_txn_info_kvs(txn_kv, 10000, 8000); + check_multiple_txn_info_kvs(txn_kv, 10000); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + InstanceRecycler recycler(txn_kv, instance, recycle_txn_label_thread_group, + std::make_shared(txn_kv)); + ASSERT_EQ(recycler.init(), 0); + auto start = std::chrono::steady_clock::now(); + ASSERT_EQ(recycler.recycle_expired_txn_label(), 0); + auto finish = std::chrono::steady_clock::now(); + std::cout << "recycle expired txn label cost=" + << std::chrono::duration_cast(finish - start).count() + << "ms" << std::endl; + check_multiple_txn_info_kvs(txn_kv, 2000); +} } // namespace doris::cloud