Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2487,13 +2487,15 @@ int InstanceRecycler::recycle_expired_txn_label() {
int64_t num_scanned = 0;
int64_t num_expired = 0;
int64_t num_recycled = 0;
int ret = 0;

RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id_, 0, 0};
RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id_, INT64_MAX, INT64_MAX};
std::string begin_recycle_txn_key;
std::string end_recycle_txn_key;
recycle_txn_key(recycle_txn_key_info0, &begin_recycle_txn_key);
recycle_txn_key(recycle_txn_key_info1, &end_recycle_txn_key);
std::vector<std::string> recycle_txn_info_keys;

LOG_INFO("begin to recycle expired txn").tag("instance_id", instance_id_);

Expand Down Expand Up @@ -2521,12 +2523,15 @@ int InstanceRecycler::recycle_expired_txn_label() {
return final_expiration;
};

SyncExecutor<int> concurrent_delete_executor(
_thread_pool_group.s3_producer_pool,
fmt::format("recycle expired txn label, instance id {}", instance_id_),
[](const int& ret) { return ret != 0; });

int64_t current_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();

auto handle_recycle_txn_kv = [&num_scanned, &num_expired, &num_recycled, &current_time_ms,
&calc_expiration,
this](std::string_view k, std::string_view v) -> int {
auto handle_recycle_txn_kv = [&](std::string_view k, std::string_view v) -> int {
++num_scanned;
RecycleTxnPB recycle_txn_pb;
if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
Expand All @@ -2538,10 +2543,12 @@ int InstanceRecycler::recycle_expired_txn_label() {
(calc_expiration(recycle_txn_pb) <= current_time_ms)) {
VLOG_DEBUG << "found recycle txn, key=" << hex(k);
num_expired++;
} else {
return 0;
recycle_txn_info_keys.emplace_back(k);
}
return 0;
};

auto delete_recycle_txn_kv = [&](const std::string& k) -> int {
std::string_view k1 = k;
//RecycleTxnKeyInfo 0:instance_id 1:db_id 2:txn_id
k1.remove_prefix(1); // Remove key space
Expand Down Expand Up @@ -2625,8 +2632,41 @@ int InstanceRecycler::recycle_expired_txn_label() {
return 0;
};

auto loop_done = [&]() -> int {
for (const auto& k : recycle_txn_info_keys) {
concurrent_delete_executor.add([&]() {
if (delete_recycle_txn_kv(k) != 0) {
LOG_WARNING("failed to delete recycle txn kv")
.tag("instance id", instance_id_)
.tag("key", hex(k));
return -1;
}
return 0;
});
}
bool finished = true;
std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
for (int r : rets) {
if (r != 0) {
ret = -1;
}
}

ret = finished ? ret : -1;

if (ret != 0) {
LOG_WARNING("recycle txn kv ret!=0")
.tag("finished", finished)
.tag("ret", ret)
.tag("instance_id", instance_id_);
return ret;
}
recycle_txn_info_keys.clear();
return ret;
};

return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key,
std::move(handle_recycle_txn_kv));
std::move(handle_recycle_txn_kv), std::move(loop_done));
}

struct CopyJobIdTuple {
Expand Down
278 changes: 278 additions & 0 deletions cloud/test/recycler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

#include "recycler/recycler.h"

#include <fmt/core.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <gtest/gtest.h>

#include <chrono>
#include <cstdint>
#include <memory>
#include <random>
#include <string>
Expand All @@ -35,11 +37,13 @@
#include "meta-service/keys.h"
#include "meta-service/mem_txn_kv.h"
#include "meta-service/meta_service.h"
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"
#include "mock_accessor.h"
#include "mock_resource_manager.h"
#include "rate-limiter/rate_limiter.h"
#include "recycler/checker.h"
#include "recycler/recycler.cpp"
#include "recycler/storage_vault_accessor.h"
#include "recycler/util.h"
#include "recycler/white_black_list.h"
Expand Down Expand Up @@ -4007,4 +4011,278 @@ TEST(RecyclerTest, delete_tmp_rowset_without_resource_id) {
}
}

static std::string generate_random_string(int length) {
std::string char_set = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
std::random_device rd;
std::mt19937 generator(rd());
std::uniform_int_distribution<int> distribution(0, char_set.length() - 1);

std::string randomString;
for (int i = 0; i < length; ++i) {
randomString += char_set[distribution(generator)];
}
return randomString;
}

std::string instance_id = "concurrent_recycle_txn_label_test_" + generate_random_string(10);

/**
* Creates key-value pairs for a single transaction
* Includes key-value pairs for RecycleTxnKeyInfo, TxnIndexKey, TxnInfoKey, SubTxnIndex, and TxnLabel
* @param txn_kv Transaction key-value storage object
* @param i Index number used to generate unique IDs
* @param expired_kv_num Number of expired key-value pairs
*/
void make_single_txn_related_kvs(std::shared_ptr<cloud::TxnKv> txn_kv, int64_t i,
int64_t expired_kv_num) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);

std::string recycle_txn_info_key;
std::string recycle_txn_info_val;
int64_t db_id = i;
int64_t txn_id = 1000000 + i;
int64_t sub_txn_id = 2000000 + i;
int64_t current_time = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();

// RecycleTxnKeyInfo -> RecycleTxnPB
RecycleTxnKeyInfo recycle_txn_key_info {instance_id, db_id, txn_id};
recycle_txn_key(recycle_txn_key_info, &recycle_txn_info_key);
RecycleTxnPB recycle_txn_pb;
if (i < expired_kv_num) {
recycle_txn_pb.set_creation_time(current_time - 4 * 24 * 3600 * 1000L);
} else {
recycle_txn_pb.set_creation_time(current_time);
}
recycle_txn_pb.set_label("recycle_txn_key_info_label_" + std::to_string(i));
if (!recycle_txn_pb.SerializeToString(&recycle_txn_info_val)) {
LOG_WARNING("failed to serialize recycle txn info")
.tag("key", hex(recycle_txn_info_key))
.tag("db_id", db_id)
.tag("txn_id", txn_id);
return;
}
LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", instance_id, db_id,
recycle_txn_pb.label(), hex(recycle_txn_info_key));
txn->put(recycle_txn_info_key, recycle_txn_info_val);

// TxnIndexKey -> TxnIndexPB
std::string txn_idx_key = txn_index_key({instance_id, txn_id});
std::string txn_idx_val;
TxnIndexPB txn_index_pb;
if (!txn_index_pb.SerializeToString(&txn_idx_val)) {
LOG_WARNING("failed to serialize txn index")
.tag("key", hex(txn_idx_key))
.tag("db_id", db_id)
.tag("txn_id", txn_id);
return;
}
txn->put(txn_idx_key, txn_idx_val);

// TxnInfoKey -> TxnInfoPB
std::string info_key = txn_info_key({instance_id, db_id, txn_id});
std::string info_val;
TxnInfoPB txn_info_pb;
txn_info_pb.add_sub_txn_ids(sub_txn_id);
txn_info_pb.set_label("txn_info_label_" + std::to_string(i));
if (!txn_info_pb.SerializeToString(&info_val)) {
LOG_WARNING("failed to serialize txn info")
.tag("key", hex(info_key))
.tag("db_id", db_id)
.tag("txn_id", txn_id);
return;
}
txn->put(info_key, info_val);

// SubTxnIndex -> TxnIndexPB
std::string idx_key = txn_index_key({instance_id, sub_txn_id});
std::string idx_val;
TxnIndexPB sub_txn_index_pb;
if (!sub_txn_index_pb.SerializeToString(&idx_val)) {
LOG_WARNING("failed to serialize sub txn index")
.tag("key", hex(idx_key))
.tag("db_id", db_id)
.tag("txn_id", txn_id);
return;
}
txn->put(idx_key, idx_val);

// TxnLabel -> TxnLabelPB
std::string label_key;
std::string label_val;
txn_label_key({instance_id, db_id, txn_info_pb.label()}, &label_key);
TxnLabelPB txn_label_pb;
txn_label_pb.add_txn_ids(txn_id);
LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", instance_id, db_id,
txn_info_pb.label(), hex(label_key));
if (!txn_label_pb.SerializeToString(&label_val)) {
LOG_WARNING("failed to serialize txn label")
.tag("key", hex(label_key))
.tag("db_id", db_id)
.tag("txn_id", txn_id);
return;
}
MemTxnKv::gen_version_timestamp(123456790, 0, &label_val);
txn->put(label_key, label_val);

ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}

/**
* Creates multiple transaction info key-value pairs in batches
* Processes in batches of 2000 entries
* @param txn_kv Transaction key-value storage object
* @param total_kv_num Total number of key-value pairs to create
* @param expired_kv_num Number of expired key-value pairs
*/
void make_multiple_txn_info_kvs(std::shared_ptr<cloud::TxnKv> txn_kv, int64_t total_kv_num,
int64_t expired_kv_num) {
using namespace doris::cloud;
for (int64_t i = 0; i < total_kv_num; i += 2000) {
for (int64_t j = i; j < i + 2000; j++) {
make_single_txn_related_kvs(txn_kv, j, expired_kv_num);
}
}
}

/**
* Verifies key-value pairs related to a single transaction
* Validates existence and format of RecycleTxnInfo, TxnIndex, TxnInfo, SubTxnIndex, and TxnLabel
* @param k Key
* @param v Value
* @param txn_kv Transaction key-value storage object
*/
void check_single_txn_info_kvs(const std::string_view& k, const std::string_view& v,
std::shared_ptr<cloud::TxnKv> txn_kv) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);

// check RecycleTxnInfo
RecycleTxnPB recycle_txn_pb;
ASSERT_TRUE(recycle_txn_pb.ParseFromArray(v.data(), v.size()));
std::string_view k1 = k;

// check TxnIndex
std::string index_key, index_value;
k1.remove_prefix(1); // Remove key space
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
ASSERT_EQ(decode_key(&k1, &out), 0);
int64_t db_id = std::get<int64_t>(std::get<0>(out[3]));
int64_t txn_id = std::get<int64_t>(std::get<0>(out[4]));
index_key = txn_index_key({instance_id, txn_id});
ASSERT_EQ(txn->get(index_key, &index_value), TxnErrorCode::TXN_OK);

// check TxnInfo
std::string info_key, info_val;
txn_info_key({instance_id, db_id, txn_id}, &info_key);
ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK);

// check SubTxnIndex
TxnInfoPB txn_info;
ASSERT_TRUE(txn_info.ParseFromString(info_val));
std::vector<std::string> sub_txn_index_keys;
std::string sub_txn_index_value;
for (auto sub_txn_id : txn_info.sub_txn_ids()) {
auto sub_txn_index_key = txn_index_key({instance_id, sub_txn_id});
sub_txn_index_keys.push_back(sub_txn_index_key);
}
for (auto& sub_txn_index_key : sub_txn_index_keys) {
ASSERT_EQ(txn->get(sub_txn_index_key, &sub_txn_index_value), TxnErrorCode::TXN_OK);
}

// check TxnLabel
std::string label_key, label_val;
txn_label_key({instance_id, db_id, txn_info.label()}, &label_key);
ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK)
<< fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", instance_id, db_id,
txn_info.label(), hex(label_key));
}

/**
* Verifies multiple transaction info key-value pairs
* Uses an iterator to traverse and validate key-value pairs within a specified range
* @param txn_kv Transaction key-value storage object
* @param size Expected number of key-value pairs
*/
void check_multiple_txn_info_kvs(std::shared_ptr<cloud::TxnKv> txn_kv, int64_t size) {
using namespace doris::cloud;
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id, 0, 0};
RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id, INT64_MAX, INT64_MAX};
std::string begin = recycle_txn_key(recycle_txn_key_info0);
std::string end = recycle_txn_key(recycle_txn_key_info1);
int64_t total_kv = 0;

std::unique_ptr<RangeGetIterator> it;
do {
int get_ret = txn_get(txn_kv.get(), begin, end, it);
if (get_ret != 0) { // txn kv may complain "Request for future version"
LOG(WARNING) << "failed to get kv, range=[" << hex(begin) << "," << hex(end)
<< ") txn_get_ret=" << get_ret;
ASSERT_TRUE(false);
}
if (!it->has_next()) {
LOG(INFO) << "no keys in the given range=[" << hex(begin) << "," << hex(end) << ")";
break; // scan finished
}
while (it->has_next()) {
// recycle corresponding resources
auto [k, v] = it->next();
if (!it->has_next()) {
begin = k;
VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k);
}
check_single_txn_info_kvs(k, v, txn_kv);
total_kv++;
}
begin.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
ASSERT_EQ(total_kv, size);
}

TEST(RecyclerTest, concurrent_recycle_txn_label_test) {
config::label_keep_max_second = 259200;
doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group;
config::recycle_pool_parallelism = 10;
auto recycle_txn_label_s3_producer_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
recycle_txn_label_s3_producer_pool->start();
auto recycle_txn_label_recycle_tablet_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
recycle_txn_label_recycle_tablet_pool->start();
auto recycle_txn_label_group_recycle_function_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
recycle_txn_label_group_recycle_function_pool->start();
recycle_txn_label_thread_group =
RecyclerThreadPoolGroup(std::move(recycle_txn_label_s3_producer_pool),
std::move(recycle_txn_label_recycle_tablet_pool),
std::move(recycle_txn_label_group_recycle_function_pool));

auto mem_txn_kv = std::make_shared<MemTxnKv>();

// cloud::config::fdb_cluster_file_path = "fdb.cluster";
// auto fdb_txn_kv = std::dynamic_pointer_cast<cloud::TxnKv>(std::make_shared<cloud::FdbTxnKv>());
// fdb_txn_kv->init();

auto txn_kv = mem_txn_kv;
ASSERT_TRUE(txn_kv.get()) << "exit get MemTxnKv error" << std::endl;
make_multiple_txn_info_kvs(txn_kv, 10000, 8000);
check_multiple_txn_info_kvs(txn_kv, 10000);

InstanceInfoPB instance;
instance.set_instance_id(instance_id);
InstanceRecycler recycler(txn_kv, instance, recycle_txn_label_thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
auto start = std::chrono::steady_clock::now();
ASSERT_EQ(recycler.recycle_expired_txn_label(), 0);
auto finish = std::chrono::steady_clock::now();
std::cout << "recycle expired txn label cost="
<< std::chrono::duration_cast<std::chrono::milliseconds>(finish - start).count()
<< "ms" << std::endl;
check_multiple_txn_info_kvs(txn_kv, 2000);
}
} // namespace doris::cloud
Loading