From 0ad6e5f6560d568133314d4e87a77aea1c7d5c53 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Fri, 10 Jan 2025 21:10:03 +0800 Subject: [PATCH 1/7] 1 --- cloud/src/recycler/recycler.cpp | 159 ++++++++++++++++++++++++-------- 1 file changed, 119 insertions(+), 40 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index ba67711866bb0a..c4153abbe5a1a7 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -30,6 +30,7 @@ #include #include "common/stopwatch.h" +#include "meta-service/meta_service.h" #include "meta-service/meta_service_schema.h" #include "meta-service/txn_kv.h" #include "meta-service/txn_kv_error.h" @@ -249,8 +250,9 @@ void Recycler::recycle_callback() { auto instance_recycler = std::make_shared( txn_kv_, instance, _thread_pool_group, txn_lazy_committer_); - if (instance_recycler->init() != 0) { - LOG(WARNING) << "failed to init instance recycler, instance_id=" << instance_id; + if (int r = instance_recycler->init(); r != 0) { + LOG(WARNING) << "failed to init instance recycler, instance_id=" << instance_id + << " ret=" << r; continue; } std::string recycle_job_key; @@ -258,6 +260,8 @@ void Recycler::recycle_callback() { int ret = prepare_instance_recycle_job(txn_kv_.get(), recycle_job_key, instance_id, ip_port_, config::recycle_interval_seconds * 1000); if (ret != 0) { // Prepare failed + LOG(WARNING) << "failed to prepare recycle_job, instance_id=" << instance_id + << " ret=" << ret; continue; } else { std::lock_guard lock(mtx_); @@ -276,7 +280,12 @@ void Recycler::recycle_callback() { std::lock_guard lock(mtx_); recycling_instance_map_.erase(instance_id); } - LOG_INFO("finish recycle instance").tag("instance_id", instance_id); + auto elpased_ms = + ctime_ms - + duration_cast(system_clock::now().time_since_epoch()).count(); + LOG_INFO("finish recycle instance") + .tag("instance_id", instance_id) + .tag("cost_ms", elpased_ms); } } @@ -529,8 +538,9 @@ int InstanceRecycler::init_storage_vault_accessors() { int ret = accessor->init(); if (ret != 0) { LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_ - << " resource_id=" << vault.id() << " name=" << vault.name(); - return ret; + << " resource_id=" << vault.id() << " name=" << vault.name() + << " hdfs_vault=" << vault.hdfs_info().DebugString(); + continue; } accessor_map_.emplace(vault.id(), std::move(accessor)); @@ -540,16 +550,18 @@ int InstanceRecycler::init_storage_vault_accessors() { #else auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info()); if (!s3_conf) { - LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_; - return -1; + LOG(WARNING) << "failed to init object accessor, invalid conf, instance_id=" + << instance_id_ << " s3_vault=" << vault.obj_info().DebugString(); + continue; } std::shared_ptr accessor; int ret = S3Accessor::create(std::move(*s3_conf), &accessor); if (ret != 0) { LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_ - << " resource_id=" << vault.id() << " name=" << vault.name(); - return ret; + << " resource_id=" << vault.id() << " name=" << vault.name() + << " ret=" << ret << " s3_vault=" << vault.obj_info().DebugString(); + continue; } #endif @@ -562,6 +574,13 @@ int InstanceRecycler::init_storage_vault_accessors() { return -1; } + if (accessor_map_.empty()) { + LOG(WARNING) << "no accessors for instance=" << instance_id_; + return -2; + } + LOG_INFO("finish init instance recycler number_accessors={} instance=", accessor_map_.size(), + instance_id_); + return 0; } @@ -1461,7 +1480,8 @@ int InstanceRecycler::delete_rowset_data(const std::vector int { + DCHECK(accessor_map_.count(*rid)) + << "uninitilized accessor, instance_id=" << instance_id_ + << " resource_id=" << resource_id << " path[0]=" << (*paths)[0]; auto& accessor = accessor_map_[*rid]; - DCHECK(accessor); return accessor->delete_files(*paths); }); } @@ -1576,7 +1598,9 @@ int InstanceRecycler::delete_rowset_data(const std::string& resource_id, int64_t if (it == accessor_map_.end()) { LOG_WARNING("instance has no such resource id") .tag("instance_id", instance_id_) - .tag("resource_id", resource_id); + .tag("resource_id", resource_id) + .tag("tablet_id", tablet_id) + .tag("rowset_id", rowset_id); return -1; } auto& accessor = it->second; @@ -1588,59 +1612,93 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { .tag("instance_id", instance_id_) .tag("tablet_id", tablet_id); + int ret = 0; auto start_time = steady_clock::now(); - std::unique_ptr> defer_log_statistics((int*)0x01, [&](int*) { - auto cost = duration(steady_clock::now() - start_time).count(); - LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s", cost) - .tag("instance_id", instance_id_) - .tag("tablet_id", tablet_id); - }); + std::unique_ptr txn; + if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) { + LOG_WARNING("failed to delete rowset kv of tablet ") + .tag("tablet id", tablet_id) + .tag("reason", "failed to create txn"); + ret = -1; + } - // delete all rowset kv in this tablet + // collect resource ids std::string rs_key0 = meta_rowset_key({instance_id_, tablet_id, 0}); std::string rs_key1 = meta_rowset_key({instance_id_, tablet_id + 1, 0}); std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id, ""}); std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id + 1, ""}); - int ret = 0; - std::unique_ptr txn; - if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) { - LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id; - ret = -1; - } - txn->remove(rs_key0, rs_key1); - txn->remove(recyc_rs_key0, recyc_rs_key1); + std::set resource_ids; - // remove delete bitmap for MoW table - std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id}); - txn->remove(pending_key); - std::string delete_bitmap_start = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0}); - std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0}); - txn->remove(delete_bitmap_start, delete_bitmap_end); + std::unique_ptr> defer_log_statistics((int*)0x01, [&](int*) { + auto cost = duration(steady_clock::now() - start_time).count(); + LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s", cost) + .tag("instance_id", instance_id_) + .tag("tablet_id", tablet_id) + .tag("ret", ret); + }); - TxnErrorCode err = txn->commit(); - if (err != TxnErrorCode::TXN_OK) { - LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id << ", err=" << err; + GetRowsetResponse resp; + std::string msg; + MetaServiceCode code = MetaServiceCode::OK; + // get rowsets in tablet + internal_get_rowset(txn.get(), 0, std::numeric_limits::max() - 1, instance_id_, + tablet_id, code, msg, &resp); + if (code != MetaServiceCode::OK) { + LOG_WARNING("failed to delete rowset kv of tablet ") + .tag("tablet id", tablet_id) + .tag("reason", "failed to internal get rowset") + .tag("msg", msg) + .tag("code", code); ret = -1; } + for (const auto& rs_meta : resp.rowset_meta()) { + if (!rs_meta.has_resource_id()) { + continue; + } + auto it = accessor_map_.find(rs_meta.resource_id()); + // possible if the accessor is not initilized correctly + if (it == accessor_map_.end()) [[unlikely]] { + LOG_WARNING( + "failed to find resource id when recycle tablet, skip this vault accessor " + "recycle process") + .tag("tablet id", tablet_id) + .tag("instance_id", instance_id_) + .tag("resource_id", rs_meta.resource_id()) + .tag("rowset meta pb", rs_meta.DebugString()); + continue; + } + resource_ids.emplace(rs_meta.resource_id()); + } + + LOG_INFO("recycle tablet resource ids are") + .tag("", std::accumulate(resource_ids.begin(), resource_ids.end(), std::string(), + [](const std::string& a, const std::string& b) { + return a.empty() ? b : a + "," + b; + })); + SyncExecutor concurrent_delete_executor( _thread_pool_group.s3_producer_pool, fmt::format("delete tablet {} s3 rowset", tablet_id), [](const int& ret) { return ret != 0; }); // delete all rowset data in this tablet - for (auto& [_, accessor] : accessor_map_) { - concurrent_delete_executor.add([&, accessor_ptr = &accessor]() { - if ((*accessor_ptr)->delete_directory(tablet_path_prefix(tablet_id)) != 0) { + // ATTN: there may be data leak if not all accessor initilized successfully + // partial data deleted if the tablet is stored cross-storage vault + // vault id is not attached to TabletMeta... + for (const auto& resource_id : resource_ids) { + concurrent_delete_executor.add([&, accessor_ptr = accessor_map_[resource_id]]() { + if (accessor_ptr->delete_directory(tablet_path_prefix(tablet_id)) != 0) { LOG(WARNING) << "failed to delete rowset data of tablet " << tablet_id - << " s3_path=" << accessor->uri(); + << " path=" << accessor_ptr->uri(); return -1; } return 0; }); } + bool finished = true; std::vector rets = concurrent_delete_executor.when_all(&finished); for (int r : rets) { @@ -1651,6 +1709,27 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { ret = finished ? ret : -1; + if (ret != 0) { // failed recycle tablet data + return ret; + } + + // delete all rowset kv in this tablet + txn->remove(rs_key0, rs_key1); + txn->remove(recyc_rs_key0, recyc_rs_key1); + + // remove delete bitmap for MoW table + std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id}); + txn->remove(pending_key); + std::string delete_bitmap_start = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0}); + std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0}); + txn->remove(delete_bitmap_start, delete_bitmap_end); + + TxnErrorCode err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id << ", err=" << err; + ret = -1; + } + if (ret == 0) { // All object files under tablet have been deleted std::lock_guard lock(recycled_tablets_mtx_); From 55ee8b8975f33115879849e0035800a6af149ef0 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Fri, 10 Jan 2025 21:14:00 +0800 Subject: [PATCH 2/7] 2 Co-authored-by: Gavin Chou --- cloud/src/recycler/recycler.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index c4153abbe5a1a7..cd9f3a722a9055 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1646,9 +1646,8 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { internal_get_rowset(txn.get(), 0, std::numeric_limits::max() - 1, instance_id_, tablet_id, code, msg, &resp); if (code != MetaServiceCode::OK) { - LOG_WARNING("failed to delete rowset kv of tablet ") + LOG_WARNING("failed to get rowsets of tablet when recycle tablet") .tag("tablet id", tablet_id) - .tag("reason", "failed to internal get rowset") .tag("msg", msg) .tag("code", code); ret = -1; From 03d430f649b50934035d57fa4a1f1dbb073b6f72 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 14 Jan 2025 21:49:56 +0800 Subject: [PATCH 3/7] 3 --- cloud/src/recycler/recycler.cpp | 7 +- cloud/test/recycler_test.cpp | 131 ++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+), 4 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index cd9f3a722a9055..3db1c62d7b3bfe 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -532,6 +532,8 @@ int InstanceRecycler::init_storage_vault_accessors() { LOG(WARNING) << "malformed storage vault, unable to deserialize key=" << hex(k); return -1; } + TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault", + &accessor_map_, &vault); if (vault.has_hdfs_info()) { auto accessor = std::make_shared(vault.hdfs_info()); @@ -545,9 +547,6 @@ int InstanceRecycler::init_storage_vault_accessors() { accessor_map_.emplace(vault.id(), std::move(accessor)); } else if (vault.has_obj_info()) { -#ifdef UNIT_TEST - auto accessor = std::make_shared(); -#else auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info()); if (!s3_conf) { LOG(WARNING) << "failed to init object accessor, invalid conf, instance_id=" @@ -563,7 +562,6 @@ int InstanceRecycler::init_storage_vault_accessors() { << " ret=" << ret << " s3_vault=" << vault.obj_info().DebugString(); continue; } -#endif accessor_map_.emplace(vault.id(), std::move(accessor)); } @@ -1652,6 +1650,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { .tag("code", code); ret = -1; } + TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_tablet.create_rowset_meta", &resp); for (const auto& rs_meta : resp.rowset_meta()) { if (!rs_meta.has_resource_id()) { diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index e38d25aaa8420a..567d27f5d6f3c4 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -36,6 +36,7 @@ #include "meta-service/mem_txn_kv.h" #include "meta-service/meta_service.h" #include "meta-service/txn_kv_error.h" +#include "mock_accessor.h" #include "mock_resource_manager.h" #include "rate-limiter/rate_limiter.h" #include "recycler/checker.h" @@ -3263,4 +3264,134 @@ TEST(RecyclerTest, delete_rowset_data_without_inverted_index_storage_format) { } } +TEST(RecyclerTest, init_vault_accessor_failed_test) { + auto* sp = SyncPoint::get_instance(); + std::unique_ptr> defer((int*)0x01, [&sp](int*) { + sp->clear_all_call_backs(); + sp->clear_trace(); + sp->disable_processing(); + }); + + auto txn_kv = std::make_shared(); + EXPECT_EQ(txn_kv->init(), 0); + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string key; + std::string val; + + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + InstanceInfoPB instance; + instance.set_instance_id("GetObjStoreInfoTestInstance"); + // failed to init because S3Conf::from_obj_store_info() fails + { + ObjectStoreInfoPB obj_info; + StorageVaultPB vault; + obj_info.set_id("id"); + obj_info.set_ak("ak"); + obj_info.set_sk("sk"); + vault.mutable_obj_info()->MergeFrom(obj_info); + vault.set_name("test_failed_s3_vault_1"); + vault.set_id("failed_s3_1"); + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + txn->put(storage_vault_key({instance.instance_id(), "1"}), vault.SerializeAsString()); + } + + // succeed to init but unuseful + { + ObjectStoreInfoPB obj_info; + StorageVaultPB vault; + obj_info.set_id("id"); + obj_info.set_ak("ak"); + obj_info.set_sk("sk"); + obj_info.set_provider(ObjectStoreInfoPB_Provider_COS); + vault.mutable_obj_info()->MergeFrom(obj_info); + vault.set_name("test_failed_s3_vault_2"); + vault.set_id("failed_s3_2"); + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + instance.set_instance_id("GetObjStoreInfoTestInstance"); + txn->put(storage_vault_key({instance.instance_id(), "2"}), vault.SerializeAsString()); + } + + // failed to init because accessor->init() fails + { + HdfsBuildConf hdfs_build_conf; + StorageVaultPB vault; + hdfs_build_conf.set_fs_name("fs_name"); + hdfs_build_conf.set_user("root"); + HdfsVaultInfo hdfs_info; + hdfs_info.set_prefix("root_path"); + hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf); + vault.mutable_hdfs_info()->MergeFrom(hdfs_info); + vault.set_name("test_failed_hdfs_vault_1"); + vault.set_id("failed_hdfs_1"); + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + instance.set_instance_id("GetObjStoreInfoTestInstance"); + txn->put(storage_vault_key({instance.instance_id(), "3"}), vault.SerializeAsString()); + } + + auto accessor = std::make_shared(); + EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0); + sp->set_call_back( + "InstanceRecycler::init_storage_vault_accessors.mock_vault", [&accessor](auto&& args) { + auto* map = try_any_cast< + std::unordered_map>*>( + args[0]); + auto* vault = try_any_cast(args[1]); + if (vault->name() == "test_success_hdfs_vault") { + map->emplace(vault->id(), accessor); + } + }); + sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta", [](auto&& args) { + auto* resp = try_any_cast(args[0]); + auto* rs = resp->add_rowset_meta(); + rs->set_resource_id("failed_s3_2"); + rs = resp->add_rowset_meta(); + rs->set_resource_id("success_vault"); + }); + sp->enable_processing(); + + // succeed to init MockAccessor + { + HdfsBuildConf hdfs_build_conf; + StorageVaultPB vault; + hdfs_build_conf.set_fs_name("fs_name"); + hdfs_build_conf.set_user("root"); + HdfsVaultInfo hdfs_info; + hdfs_info.set_prefix("root_path"); + hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf); + vault.mutable_hdfs_info()->MergeFrom(hdfs_info); + vault.set_name("test_success_hdfs_vault"); + vault.set_id("success_vault"); + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + instance.set_instance_id("GetObjStoreInfoTestInstance"); + txn->put(storage_vault_key({instance.instance_id(), "4"}), vault.SerializeAsString()); + } + + val = instance.SerializeAsString(); + txn->put(key, val); + EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + EXPECT_EQ(accessor->exists("data/0/test.csv"), 0); + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared(txn_kv)); + EXPECT_EQ(recycler.init(), 0); + EXPECT_EQ(recycler.accessor_map_.size(), 2); + + // unuseful obj accessor + EXPECT_EQ(recycler.accessor_map_.at("failed_s3_2")->exists("data/0/test.csv"), -1); + // useful mock accessor + EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 0); + + // recycle tablet will fail because unuseful obj accessor can not connectted + EXPECT_EQ(recycler.recycle_tablet(0), -1); + // however, useful mock accessor can recycle tablet + EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 1); +} + } // namespace doris::cloud From 17e7c880835efb86e55086e1c2df83e6967375c0 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Wed, 15 Jan 2025 16:11:57 +0800 Subject: [PATCH 4/7] 4 --- cloud/src/recycler/recycler.cpp | 52 +++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 3db1c62d7b3bfe..692fe39b025abb 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -1628,12 +1629,30 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id + 1, ""}); std::set resource_ids; + int64_t recycle_rowsets_number = 0; + int64_t recycle_segments_number = 0; + int64_t recycle_rowsets_data_size = 0; + int64_t recycle_rowsets_index_size = 0; + int64_t max_rowset_version = 0; + int64_t min_rowset_creation_time = INT64_MAX; + int64_t max_rowset_creation_time = 0; + int64_t min_rowset_expiration_time = INT64_MAX; + int64_t max_rowset_expiration_time = 0; std::unique_ptr> defer_log_statistics((int*)0x01, [&](int*) { auto cost = duration(steady_clock::now() - start_time).count(); LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("tablet_id", tablet_id) + .tag("recycle rowsets number", recycle_rowsets_number) + .tag("recycle segments number", recycle_segments_number) + .tag("all rowsets recycle data size", recycle_rowsets_data_size) + .tag("all rowsets recycle index size", recycle_rowsets_index_size) + .tag("max rowset version", max_rowset_version) + .tag("min rowset creation time", min_rowset_creation_time) + .tag("max rowset creation time", max_rowset_creation_time) + .tag("min rowset expiration time", min_rowset_expiration_time) + .tag("max rowset expiration time", max_rowset_expiration_time) .tag("ret", ret); }); @@ -1647,14 +1666,17 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { LOG_WARNING("failed to get rowsets of tablet when recycle tablet") .tag("tablet id", tablet_id) .tag("msg", msg) - .tag("code", code); + .tag("code", code) + .tag("instance id", instance_id_); ret = -1; } TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_tablet.create_rowset_meta", &resp); for (const auto& rs_meta : resp.rowset_meta()) { if (!rs_meta.has_resource_id()) { - continue; + LOG_WARNING("rowset meta does not have a resource id, impossible!") + .tag("rs_meta", rs_meta.ShortDebugString()); + return -1; } auto it = accessor_map_.find(rs_meta.resource_id()); // possible if the accessor is not initilized correctly @@ -1665,17 +1687,29 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { .tag("tablet id", tablet_id) .tag("instance_id", instance_id_) .tag("resource_id", rs_meta.resource_id()) - .tag("rowset meta pb", rs_meta.DebugString()); - continue; + .tag("rowset meta pb", rs_meta.ShortDebugString()); + return -1; } + recycle_rowsets_number += 1; + recycle_segments_number += rs_meta.num_segments(); + recycle_rowsets_data_size += rs_meta.data_disk_size(); + recycle_rowsets_index_size += rs_meta.index_disk_size(); + max_rowset_version = std::max(max_rowset_version, rs_meta.end_version()); + min_rowset_creation_time = std::min(min_rowset_creation_time, rs_meta.creation_time()); + max_rowset_creation_time = std::max(max_rowset_creation_time, rs_meta.creation_time()); + min_rowset_expiration_time = std::min(min_rowset_expiration_time, rs_meta.txn_expiration()); + max_rowset_expiration_time = std::max(max_rowset_expiration_time, rs_meta.txn_expiration()); resource_ids.emplace(rs_meta.resource_id()); } - LOG_INFO("recycle tablet resource ids are") - .tag("", std::accumulate(resource_ids.begin(), resource_ids.end(), std::string(), - [](const std::string& a, const std::string& b) { - return a.empty() ? b : a + "," + b; - })); + LOG_INFO("recycle tablet start to delete object") + .tag("instance id", instance_id_) + .tag("tablet id", tablet_id) + .tag("recycle tablet resource ids are", + std::accumulate(resource_ids.begin(), resource_ids.end(), std::string(), + [](const std::string& a, const std::string& b) { + return a.empty() ? b : a + "," + b; + })); SyncExecutor concurrent_delete_executor( _thread_pool_group.s3_producer_pool, From bb14aef45bfd1727b045e98320e265c2d9213277 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Wed, 15 Jan 2025 16:15:33 +0800 Subject: [PATCH 5/7] 5 --- cloud/src/recycler/recycler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 692fe39b025abb..7e3ea2a6efac0e 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1676,7 +1676,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { if (!rs_meta.has_resource_id()) { LOG_WARNING("rowset meta does not have a resource id, impossible!") .tag("rs_meta", rs_meta.ShortDebugString()); - return -1; + return -1; } auto it = accessor_map_.find(rs_meta.resource_id()); // possible if the accessor is not initilized correctly From 78997871e496ab0a847514f69326a18b256a6910 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Wed, 15 Jan 2025 20:34:06 +0800 Subject: [PATCH 6/7] 6 --- cloud/src/recycler/recycler.cpp | 46 +++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 7e3ea2a6efac0e..19388bf153cf6f 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -542,7 +542,7 @@ int InstanceRecycler::init_storage_vault_accessors() { if (ret != 0) { LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_ << " resource_id=" << vault.id() << " name=" << vault.name() - << " hdfs_vault=" << vault.hdfs_info().DebugString(); + << " hdfs_vault=" << vault.hdfs_info().ShortDebugString(); continue; } @@ -551,7 +551,7 @@ int InstanceRecycler::init_storage_vault_accessors() { auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info()); if (!s3_conf) { LOG(WARNING) << "failed to init object accessor, invalid conf, instance_id=" - << instance_id_ << " s3_vault=" << vault.obj_info().DebugString(); + << instance_id_ << " s3_vault=" << vault.obj_info().ShortDebugString(); continue; } @@ -560,7 +560,8 @@ int InstanceRecycler::init_storage_vault_accessors() { if (ret != 0) { LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_ << " resource_id=" << vault.id() << " name=" << vault.name() - << " ret=" << ret << " s3_vault=" << vault.obj_info().DebugString(); + << " ret=" << ret + << " s3_vault=" << vault.obj_info().ShortDebugString(); continue; } @@ -1567,6 +1568,12 @@ int InstanceRecycler::delete_rowset_data(const std::vectordelete_files(*paths); }); @@ -1614,14 +1621,6 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { int ret = 0; auto start_time = steady_clock::now(); - std::unique_ptr txn; - if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) { - LOG_WARNING("failed to delete rowset kv of tablet ") - .tag("tablet id", tablet_id) - .tag("reason", "failed to create txn"); - ret = -1; - } - // collect resource ids std::string rs_key0 = meta_rowset_key({instance_id_, tablet_id, 0}); std::string rs_key1 = meta_rowset_key({instance_id_, tablet_id + 1, 0}); @@ -1656,6 +1655,14 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { .tag("ret", ret); }); + std::unique_ptr txn; + if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) { + LOG_WARNING("failed to recycle tablet ") + .tag("tablet id", tablet_id) + .tag("instance_id", instance_id_) + .tag("reason", "failed to create txn"); + ret = -1; + } GetRowsetResponse resp; std::string msg; MetaServiceCode code = MetaServiceCode::OK; @@ -1675,7 +1682,9 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { for (const auto& rs_meta : resp.rowset_meta()) { if (!rs_meta.has_resource_id()) { LOG_WARNING("rowset meta does not have a resource id, impossible!") - .tag("rs_meta", rs_meta.ShortDebugString()); + .tag("rs_meta", rs_meta.ShortDebugString()) + .tag("instance_id", instance_id_) + .tag("tablet_id", tablet_id); return -1; } auto it = accessor_map_.find(rs_meta.resource_id()); @@ -1742,9 +1751,18 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { ret = finished ? ret : -1; if (ret != 0) { // failed recycle tablet data + LOG_WARNING("ret!=0").tag("finished", finished).tag("ret", ret); return ret; } + txn.reset(); + if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) { + LOG_WARNING("failed to recycle tablet ") + .tag("tablet id", tablet_id) + .tag("instance_id", instance_id_) + .tag("reason", "failed to create txn"); + ret = -1; + } // delete all rowset kv in this tablet txn->remove(rs_key0, rs_key1); txn->remove(recyc_rs_key0, recyc_rs_key1); @@ -2344,7 +2362,7 @@ int InstanceRecycler::abort_timeout_txn() { txn_info.set_status(TxnStatusPB::TXN_STATUS_ABORTED); txn_info.set_finish_time(current_time); txn_info.set_reason("timeout"); - VLOG_DEBUG << "txn_info=" << txn_info.DebugString(); + VLOG_DEBUG << "txn_info=" << txn_info.ShortDebugString(); txn_inf_val.clear(); if (!txn_info.SerializeToString(&txn_inf_val)) { LOG_WARNING("failed to serialize txn info").tag("key", hex(k)); @@ -3022,7 +3040,7 @@ int InstanceRecycler::recycle_expired_stage_objects() { const auto& old_obj = instance_info_.obj_info()[idx - 1]; auto s3_conf = S3Conf::from_obj_store_info(old_obj); if (!s3_conf) { - LOG(WARNING) << "failed to init s3_conf with obj_info=" << old_obj.DebugString(); + LOG(WARNING) << "failed to init s3_conf with obj_info=" << old_obj.ShortDebugString(); continue; } From ae3983ad8dbbbcff190da04efabcf064fa48c2ad Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 16 Jan 2025 02:28:45 +0800 Subject: [PATCH 7/7] 7 --- cloud/src/recycler/recycler.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 19388bf153cf6f..97f216d6829324 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1751,7 +1751,11 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { ret = finished ? ret : -1; if (ret != 0) { // failed recycle tablet data - LOG_WARNING("ret!=0").tag("finished", finished).tag("ret", ret); + LOG_WARNING("ret!=0") + .tag("finished", finished) + .tag("ret", ret) + .tag("instance_id", instance_id_) + .tag("tablet_id", tablet_id); return ret; }