From 50857311cba09021e8029360a44ade44c28f5f0e Mon Sep 17 00:00:00 2001 From: Gavin Chou Date: Sun, 14 Dec 2025 14:46:56 +0800 Subject: [PATCH 1/4] [chore](cloud) Change range iterator check pattern from do..while() to while() to get better robustness The original implementation of do...while() check is not extensible for retry if we failed to do the first `txn->get()` in the while loop. It will end with referencing null pointer at the check `while (it->more())` --- cloud/src/meta-service/http_encode_key.cpp | 5 +- cloud/src/meta-service/meta_service.cpp | 17 ++-- cloud/src/meta-service/meta_service_job.cpp | 8 +- .../meta-service/meta_service_resource.cpp | 8 +- .../meta_service_tablet_stats.cpp | 4 +- cloud/src/meta-service/meta_service_txn.cpp | 16 ++-- cloud/src/meta-store/blob_message.cpp | 9 +-- cloud/src/meta-store/txn_kv.cpp | 2 +- cloud/src/recycler/checker.cpp | 79 ++++++++++--------- cloud/src/recycler/meta_checker.cpp | 4 +- cloud/src/recycler/recycler.cpp | 28 ++++--- cloud/src/recycler/util.cpp | 4 +- .../src/resource-manager/resource_manager.cpp | 4 +- cloud/test/mem_txn_kv_test.cpp | 4 +- cloud/test/meta_service_test.cpp | 14 ++-- cloud/test/recycler_test.cpp | 20 ++--- cloud/test/txn_kv_test.cpp | 12 +-- 17 files changed, 122 insertions(+), 116 deletions(-) diff --git a/cloud/src/meta-service/http_encode_key.cpp b/cloud/src/meta-service/http_encode_key.cpp index 4ee76af5c9f4e0..1fb58bff23c11a 100644 --- a/cloud/src/meta-service/http_encode_key.cpp +++ b/cloud/src/meta-service/http_encode_key.cpp @@ -303,13 +303,14 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const brpc::URI& uri) { std::string end_key = key + "\xff"; std::unique_ptr it; bool more = false; - do { + while (it == nullptr /* may be not init */ || more) { err = txn->get(begin_key, end_key, &it, true); if (err != TxnErrorCode::TXN_OK) break; begin_key = it->next_begin_key(); more = it->more(); value.iters.push_back(std::move(it)); - } while (more); + if (!more) break; + } } else { err = cloud::blob_get(txn.get(), key, &value, true); } diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 919d9050fd387c..0a91aaa6f533f1 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1319,7 +1319,7 @@ void scan_restore_job_rowset( }); std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { TxnErrorCode err = txn->get(restore_job_rs_key0, restore_job_rs_key1, &it, true); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -1346,7 +1346,7 @@ void scan_restore_job_rowset( if (!it->has_next()) restore_job_rs_key0 = k; } restore_job_rs_key0.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more()); + } return; } @@ -2863,7 +2863,7 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end, }; std::stringstream ss; - do { + while (it == nullptr /* may be not init */ || it->more()) { TxnErrorCode err = txn->get(key0, key1, &it); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -2900,7 +2900,7 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end, } } key0.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more()); + } } std::vector> calc_sync_versions(int64_t req_bc_cnt, int64_t bc_cnt, @@ -4155,7 +4155,7 @@ void MetaServiceImpl::get_delete_bitmap(google::protobuf::RpcController* control int64_t last_ver = -1; int64_t last_seg_id = -1; int64_t round = 0; - do { + while (it == nullptr /* may be not init */ || it->more()) { if (test) { LOG(INFO) << "test"; err = txn->get(start_key, end_key, &it, false, 2); @@ -4233,7 +4233,7 @@ void MetaServiceImpl::get_delete_bitmap(google::protobuf::RpcController* control if (code != MetaServiceCode::OK) return; round++; start_key = it->next_begin_key(); // Update to next smallest key for iteration - } while (it->more()); + } LOG(INFO) << "get delete bitmap for tablet=" << tablet_id << ", rowset=" << rowset_ids[i] << ", start version=" << begin_versions[i] << ", end version=" << end_versions[i] << ", internal round=" << round @@ -4872,7 +4872,8 @@ void MetaServiceImpl::get_delete_bitmap_update_lock_v2( MowTabletJobPB mow_tablet_job; std::unique_ptr it; int64_t expired_job_num = 0; - do { + while (it == nullptr /* may be not init */ || + (it->more() && !has_unexpired_compaction)) { err = txn->get(key0, key1, &it); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -4903,7 +4904,7 @@ void MetaServiceImpl::get_delete_bitmap_update_lock_v2( } } key0 = it->next_begin_key(); // Update to next smallest key for iteration - } while (it->more() && !has_unexpired_compaction); + } if (has_unexpired_compaction) { // TODO print initiator ss << "already be locked by lock_id=" << lock_info.lock_id() diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index dc14f7fa3f6662..9fc1824c091692 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -801,7 +801,7 @@ std::pair scan_compaction_input_rowsets( }; auto rs_start1 = rs_start; - do { + while (it == nullptr /* may be not init */ || it->more()) { TxnErrorCode err = txn->get(rs_start1, rs_end, &it); if (err != TxnErrorCode::TXN_OK) { return {cast_as(err), @@ -827,7 +827,7 @@ std::pair scan_compaction_input_rowsets( if (!it->has_next()) rs_start1 = k; } rs_start1.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more()); + } return {MetaServiceCode::OK, ""}; } @@ -1325,7 +1325,7 @@ std::pair scan_schema_change_input_rowsets( std::string& rs_start, std::string& rs_end, auto&& callback) { std::unique_ptr it; auto rs_start1 = rs_start; - do { + while (it == nullptr /* may be not init */ || it->more()) { TxnErrorCode err = txn->get(rs_start1, rs_end, &it); if (err != TxnErrorCode::TXN_OK) { return {MetaServiceCode::KV_TXN_GET_ERR, @@ -1351,7 +1351,7 @@ std::pair scan_schema_change_input_rowsets( if (!it->has_next()) rs_start1 = k; } rs_start1.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more()); + } return {MetaServiceCode::OK, ""}; } diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index 7df96f3e727e2b..8843e5ab4733d1 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -509,7 +509,7 @@ void MetaServiceImpl::get_obj_store_info(google::protobuf::RpcController* contro std::string storage_vault_start = storage_vault_key({instance.instance_id(), ""}); std::string storage_vault_end = storage_vault_key({instance.instance_id(), "\xff"}); std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { TxnErrorCode err = txn->get(storage_vault_start, storage_vault_end, &it); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -533,7 +533,7 @@ void MetaServiceImpl::get_obj_store_info(google::protobuf::RpcController* contro } } storage_vault_start.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more()); + } } for (auto& vault : *response->mutable_storage_vault()) { if (vault.has_obj_info()) { @@ -4416,7 +4416,7 @@ void MetaServiceImpl::get_copy_files(google::protobuf::RpcController* controller copy_job_key(key_info0, &key0); copy_job_key(key_info1, &key1); std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { TxnErrorCode err = txn->get(key0, key1, &it); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -4440,7 +4440,7 @@ void MetaServiceImpl::get_copy_files(google::protobuf::RpcController* controller } } key0.push_back('\x00'); - } while (it->more()); + } } void MetaServiceImpl::filter_copy_files(google::protobuf::RpcController* controller, diff --git a/cloud/src/meta-service/meta_service_tablet_stats.cpp b/cloud/src/meta-service/meta_service_tablet_stats.cpp index 9543cd724c1a02..83b097b1726044 100644 --- a/cloud/src/meta-service/meta_service_tablet_stats.cpp +++ b/cloud/src/meta-service/meta_service_tablet_stats.cpp @@ -50,7 +50,7 @@ void internal_get_tablet_stats(MetaServiceCode& code, std::string& msg, Transact 7); // aggregate + data_size + num_rows + num_rowsets + num_segments + index_size + segment_size std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { TxnErrorCode err = txn->get(begin_key, end_key, &it, snapshot); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -64,7 +64,7 @@ void internal_get_tablet_stats(MetaServiceCode& code, std::string& msg, Transact std::string {v.data(), v.size()}); } begin_key = it->next_begin_key(); - } while (it->more()); + } if (stats_kvs.empty()) { code = MetaServiceCode::TABLET_NOT_FOUND; diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 41ee486f50aa54..cf858dde5de3e3 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -1134,7 +1134,7 @@ void scan_tmp_rowset( }; std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true); if (err == TxnErrorCode::TXN_TOO_OLD) { err = txn_kv->create_txn(&txn); @@ -1169,7 +1169,7 @@ void scan_tmp_rowset( if (!it->has_next()) rs_tmp_key0 = k; } rs_tmp_key0.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more()); + } VLOG_DEBUG << "txn_id=" << txn_id << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta->size(); return; @@ -3805,7 +3805,7 @@ void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcControll int64_t abort_txn_cnt = 0; int64_t total_iteration_cnt = 0; bool need_commit = false; - do { + while (it == nullptr /* may be not init */ || it->more()) { err = txn->get(begin_info_key, end_info_key, &it, true); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -3847,7 +3847,7 @@ void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcControll } } begin_info_key.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more()); + } LOG(INFO) << "abort txn count: " << abort_txn_cnt << " total iteration count: " << total_iteration_cnt; if (need_commit) { @@ -3924,7 +3924,7 @@ void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont int64_t skip_timeout_txn_cnt = 0; int total_iteration_cnt = 0; bool finished = true; - do { + while (it == nullptr /* may be not init */ || it->more()) { err = txn->get(begin_running_key, end_running_key, &it, true); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -4002,7 +4002,7 @@ void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont } } begin_running_key.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more()); + } LOG(INFO) << "skip timeout txn count: " << skip_timeout_txn_cnt << " conflict txn count: " << response->conflict_txns_size() << " total iteration count: " << total_iteration_cnt; @@ -4195,7 +4195,7 @@ void MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* control bool snapshot = true; int limit = 1000; TEST_SYNC_POINT_CALLBACK("clean_txn_label:limit", &limit); - do { + while (it == nullptr /* may be not init */ || it->more()) { std::unique_ptr txn; auto err = txn_kv_->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -4241,7 +4241,7 @@ void MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* control } } begin_label_key.push_back('\x00'); - } while (it->more()); + } } else { const std::string& label = request->labels(0); const std::string label_key = txn_label_key({instance_id, db_id, label}); diff --git a/cloud/src/meta-store/blob_message.cpp b/cloud/src/meta-store/blob_message.cpp index 90b2f298884fb3..c7e3edef63dcdb 100644 --- a/cloud/src/meta-store/blob_message.cpp +++ b/cloud/src/meta-store/blob_message.cpp @@ -123,17 +123,16 @@ TxnErrorCode ValueBuf::get(Transaction* txn, std::string_view key, bool snapshot } begin_key = it->next_begin_key(); iters.push_back(std::move(it)); - do { + while (it == nullptr /* may be not init */ || more) { err = txn->get(begin_key, end_key, &it, snapshot); if (err != TxnErrorCode::TXN_OK) { return err; } + begin_key = it->next_begin_key(); more = it->more(); - if (more) { - begin_key = it->next_begin_key(); - } iters.push_back(std::move(it)); - } while (more); + if (!more) break; + } return TxnErrorCode::TXN_OK; } diff --git a/cloud/src/meta-store/txn_kv.cpp b/cloud/src/meta-store/txn_kv.cpp index d7c27503a10532..4e98cd4c01176d 100644 --- a/cloud/src/meta-store/txn_kv.cpp +++ b/cloud/src/meta-store/txn_kv.cpp @@ -881,7 +881,7 @@ TxnErrorCode Transaction::get_conflicting_range( FDBKeyValue const* out_kvs; int out_kvs_count; - fdb_bool_t out_more; + fdb_bool_t out_more = false; do { fdb_error_t err = fdb_future_get_keyvalue_array(future, &out_kvs, &out_kvs_count, &out_more); diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp index c7710d17af8f08..5224faaf83579e 100644 --- a/cloud/src/recycler/checker.cpp +++ b/cloud/src/recycler/checker.cpp @@ -754,7 +754,7 @@ int InstanceChecker::do_check() { auto end_key = meta_rowset_key({instance_id_, INT64_MAX, 0}); std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -784,7 +784,7 @@ int InstanceChecker::do_check() { check_rowset_objects(rs_meta, k); } start_key.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more() && !stopped()); + } return num_rowset_loss > 0 ? 1 : check_ret; } @@ -899,7 +899,7 @@ int InstanceChecker::do_inverted_check() { std::unique_ptr it; auto begin = meta_rowset_key({instance_id_, tablet_id, 0}); auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX}); - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { TxnErrorCode err = txn->get(begin, end, &it); if (err != TxnErrorCode::TXN_OK) { LOG(WARNING) << "failed to get rowset kv, err=" << err; @@ -923,7 +923,7 @@ int InstanceChecker::do_inverted_check() { break; } } - } while (it->more() && !stopped()); + } if (!tablet_rowsets_cache.rowset_ids.contains(rowset_id)) { // Garbage data leak @@ -1025,7 +1025,7 @@ int InstanceChecker::traverse_mow_tablet(const std::function std::unique_ptr it; auto begin = meta_rowset_key({instance_id_, 0, 0}); auto end = meta_rowset_key({instance_id_, std::numeric_limits::max(), 0}); - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -1077,7 +1077,7 @@ int InstanceChecker::traverse_mow_tablet(const std::function } } } - } while (it->more() && !stopped()); + } return 0; } @@ -1089,7 +1089,7 @@ int InstanceChecker::traverse_rowset_delete_bitmaps( auto end = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id, std::numeric_limits::max(), std::numeric_limits::max()}); - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -1125,7 +1125,7 @@ int InstanceChecker::traverse_rowset_delete_bitmaps( break; } } - } while (it->more() && !stopped()); + } return 0; } @@ -1143,7 +1143,7 @@ int InstanceChecker::collect_tablet_rowsets( auto end = meta_rowset_key({instance_id_, tablet_id + 1, 0}); int64_t rowsets_num {0}; - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { TxnErrorCode err = txn->get(begin, end, &it); if (err != TxnErrorCode::TXN_OK) { LOG(WARNING) << "failed to get rowset kv, err=" << err; @@ -1169,7 +1169,7 @@ int InstanceChecker::collect_tablet_rowsets( break; } } - } while (it->more() && !stopped()); + } LOG(INFO) << fmt::format( "[delete bitmap checker] successfully collect rowsets for instance_id={}, " @@ -1225,7 +1225,7 @@ int InstanceChecker::do_delete_bitmap_inverted_check() { auto begin = meta_delete_bitmap_key({instance_id_, 0, "", 0, 0}); auto end = meta_delete_bitmap_key({instance_id_, std::numeric_limits::max(), "", 0, 0}); - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -1327,7 +1327,7 @@ int InstanceChecker::do_delete_bitmap_inverted_check() { instance_id_, tablet_id, rowset_id, version, segment_id); } } - } while (it->more() && !stopped()); + } return (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) ? 1 : 0; } @@ -1409,7 +1409,7 @@ int InstanceChecker::check_inverted_index_file_storage_format_v1( std::unique_ptr it; auto begin = meta_rowset_key({instance_id_, tablet_id, 0}); auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX}); - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { TxnErrorCode err = txn->get(begin, end, &it); if (err != TxnErrorCode::TXN_OK) { LOG(WARNING) << "failed to get rowset kv, err=" << err; @@ -1470,7 +1470,7 @@ int InstanceChecker::check_inverted_index_file_storage_format_v1( break; } } - } while (it->more() && !stopped()); + } if (!rowset_index_cache_v1.segment_ids.contains(segment_id)) { // Garbage data leak @@ -1533,7 +1533,7 @@ int InstanceChecker::check_inverted_index_file_storage_format_v2( std::unique_ptr it; auto begin = meta_rowset_key({instance_id_, tablet_id, 0}); auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX}); - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { TxnErrorCode err = txn->get(begin, end, &it); if (err != TxnErrorCode::TXN_OK) { LOG(WARNING) << "failed to get rowset kv, err=" << err; @@ -1561,7 +1561,7 @@ int InstanceChecker::check_inverted_index_file_storage_format_v2( break; } } - } while (it->more() && !stopped()); + } if (!rowset_index_cache_v2.segment_ids.contains(segment_id)) { // Garbage data leak @@ -1655,7 +1655,7 @@ int InstanceChecker::check_delete_bitmap_storage_optimize_v2( }; using namespace std::chrono; int64_t now = duration_cast(system_clock::now().time_since_epoch()).count(); - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -1728,7 +1728,7 @@ int InstanceChecker::check_delete_bitmap_storage_optimize_v2( } last_failed_version = version; } - } while (it->more() && !stopped()); + } if (!failed_versions.empty()) { print_failed_versions(); } @@ -1792,7 +1792,7 @@ int InstanceChecker::do_mow_job_key_check() { std::string begin = mow_tablet_job_key({instance_id_, 0, 0}); std::string end = mow_tablet_job_key({instance_id_, INT64_MAX, 0}); MowTabletJobPB mow_tablet_job; - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -1855,7 +1855,7 @@ int InstanceChecker::do_mow_job_key_check() { } } begin = it->next_begin_key(); // Update to next smallest key for iteration - } while (it->more() && !stopped()); + } return 0; } int InstanceChecker::do_tablet_stats_key_check() { @@ -2110,7 +2110,7 @@ int InstanceChecker::scan_and_handle_kv( std::unique_ptr it; int limit = 10000; TEST_SYNC_POINT_CALLBACK("InstanceChecker:scan_and_handle_kv:limit", &limit); - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { err = txn->get(start_key, end_key, &it, false, limit); TEST_SYNC_POINT_CALLBACK("InstanceChecker:scan_and_handle_kv:get_err", &err); if (err == TxnErrorCode::TXN_TOO_OLD) { @@ -2140,29 +2140,29 @@ int InstanceChecker::scan_and_handle_kv( } } start_key = it->next_begin_key(); - } while (it->more() && !stopped()); + } return ret; } int InstanceChecker::do_version_key_check() { - std::unique_ptr it; + std::unique_ptr table_it; std::string begin = table_version_key({instance_id_, 0, 0}); std::string end = table_version_key({instance_id_, INT64_MAX, 0}); bool check_res = true; - do { + while (table_it == nullptr /* may be not init */ || (table_it->more() && !stopped())) { std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { LOG(WARNING) << "failed to create txn"; return -1; } - err = txn->get(begin, end, &it); + err = txn->get(begin, end, &table_it); if (err != TxnErrorCode::TXN_OK) { LOG(WARNING) << "failed to get mow tablet job key, err=" << err; return -1; } - while (it->has_next() && !stopped()) { - auto [k, v] = it->next(); + while (table_it->has_next() && !stopped()) { + auto [k, v] = table_it->next(); std::string_view k1 = k; k1.remove_prefix(1); std::vector, int, int>> out; @@ -2181,20 +2181,21 @@ int InstanceChecker::do_version_key_check() { partition_version_key({instance_id_, db_id, table_id, INT64_MAX}); VersionPB partition_version_pb; - do { + std::unique_ptr part_it; + while (part_it == nullptr /* may be not init */ || (part_it->more() && !stopped())) { std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { LOG(WARNING) << "failed to create txn"; return -1; } - err = txn->get(partition_version_key_begin, partition_version_key_end, &it); + err = txn->get(partition_version_key_begin, partition_version_key_end, &part_it); if (err != TxnErrorCode::TXN_OK) { LOG(WARNING) << "failed to get mow tablet job key, err=" << err; return -1; } - while (it->has_next() && !stopped()) { - auto [k, v] = it->next(); + while (part_it->has_next() && !stopped()) { + auto [k, v] = part_it->next(); // 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id} ${partition_id} std::string_view k1 = k; k1.remove_prefix(1); @@ -2215,11 +2216,11 @@ int InstanceChecker::do_version_key_check() { << " partition_version: " << partition_version; } } - partition_version_key_begin = it->next_begin_key(); - } while (it->more() && !stopped()); + partition_version_key_begin = part_it->next_begin_key(); + } } - begin = it->next_begin_key(); // Update to next smallest key for iteration - } while (it->more() && !stopped()); + begin = table_it->next_begin_key(); // Update to next smallest key for iteration + } return check_res ? 0 : -1; } @@ -2259,7 +2260,7 @@ int InstanceChecker::do_restore_job_check() { job_restore_tablet_key(restore_job_key_info0, &begin); job_restore_tablet_key(restore_job_key_info1, &end); std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -2323,7 +2324,7 @@ int InstanceChecker::do_restore_job_check() { break; } } - } while (it->more() && !stopped()); + } return 0; } @@ -2753,7 +2754,7 @@ int InstanceChecker::do_packed_file_check() { std::string end_key = meta_rowset_key({instance_id_, INT64_MAX, 0}); std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { if (stopped()) { return -1; } @@ -2799,7 +2800,7 @@ int InstanceChecker::do_packed_file_check() { } } start_key.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more() && !stopped()); + } } // Step 2: Scan all packed file metadata and verify diff --git a/cloud/src/recycler/meta_checker.cpp b/cloud/src/recycler/meta_checker.cpp index 3fbd8ab451d417..2b37143713022b 100644 --- a/cloud/src/recycler/meta_checker.cpp +++ b/cloud/src/recycler/meta_checker.cpp @@ -55,7 +55,7 @@ bool MetaChecker::scan_and_handle_kv( return false; } std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { err = txn->get(start_key, end_key, &it); if (err != TxnErrorCode::TXN_OK) { LOG(WARNING) << "failed to get tablet idx, ret=" << err; @@ -71,7 +71,7 @@ bool MetaChecker::scan_and_handle_kv( } } start_key.push_back('\x00'); - } while (it->more()); + } return true; } diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index eabe17dce34cb9..5bd971db48a0ac 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -4881,11 +4881,11 @@ int InstanceRecycler::scan_and_recycle( }; std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { if (get_range_retried > 1000) { - err = "txn_get exceeds max retry, may not scan all keys"; - ret = -1; - return -1; + err = "txn_get exceeds max retry(1000), may not scan all keys"; + ret = -2; + return ret; } int get_ret = txn_get(txn_kv_.get(), begin, end, it); if (get_ret != 0) { // txn kv may complain "Request for future version" @@ -4909,18 +4909,24 @@ int InstanceRecycler::scan_and_recycle( VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k); } // if we want to continue scanning, the recycle_func should not return non-zero - if (recycle_func(k, v) != 0) { - err = "recycle_func error"; + int recycle_func_ret = recycle_func(k, v); + if (recycle_func_ret != 0) { + err = "recycle_func error, ret=" + std::to_string(recycle_func_ret); ret = -1; + return ret; } } begin.push_back('\x00'); // Update to next smallest key for iteration - // if we want to continue scanning, the recycle_func should not return non-zero - if (loop_done && loop_done() != 0) { - err = "loop_done error"; - ret = -1; + // if we want to continue scanning, the loop_done should not return non-zero + if (loop_done) { + int loop_done_ret = loop_done(); + if (loop_done_ret != 0) { + err = "loop_done error, ret=" + std::to_string(loop_done_ret); + ret = -1; + return ret; + } } - } while (it->more() && !stopped()); + } return ret; } diff --git a/cloud/src/recycler/util.cpp b/cloud/src/recycler/util.cpp index 3666b6836e6c9c..e419ad221ba821 100644 --- a/cloud/src/recycler/util.cpp +++ b/cloud/src/recycler/util.cpp @@ -48,7 +48,7 @@ int get_all_instances(TxnKv* txn_kv, std::vector& res) { } std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { TxnErrorCode err = txn->get(key0, key1, &it); if (err != TxnErrorCode::TXN_OK) { LOG(WARNING) << "failed to get instance, err=" << err; @@ -67,7 +67,7 @@ int get_all_instances(TxnKv* txn_kv, std::vector& res) { res.push_back(std::move(instance_info)); } key0.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more()); + } return 0; } diff --git a/cloud/src/resource-manager/resource_manager.cpp b/cloud/src/resource-manager/resource_manager.cpp index 6af1673bd45677..7c20f06dfbcbb6 100644 --- a/cloud/src/resource-manager/resource_manager.cpp +++ b/cloud/src/resource-manager/resource_manager.cpp @@ -64,7 +64,7 @@ int ResourceManager::init() { std::vector> instances; int limit = 10000; TEST_SYNC_POINT_CALLBACK("ResourceManager:init:limit", &limit); - do { + while (it == nullptr /* may be not init */ || it->more()) { TxnErrorCode err = txn->get(key0, key1, &it, false, limit); TEST_SYNC_POINT_CALLBACK("ResourceManager:init:get_err", &err); if (err == TxnErrorCode::TXN_TOO_OLD) { @@ -105,7 +105,7 @@ int ResourceManager::init() { ++num_instances; } key0.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more()); + } std::unique_lock l(mtx_); for (auto& [inst_id, inst] : instances) { diff --git a/cloud/test/mem_txn_kv_test.cpp b/cloud/test/mem_txn_kv_test.cpp index f9012fb144d438..8813264732405b 100644 --- a/cloud/test/mem_txn_kv_test.cpp +++ b/cloud/test/mem_txn_kv_test.cpp @@ -1301,7 +1301,7 @@ TEST(TxnMemKvTest, ReverseFullRangeGet) { std::string begin = range_begin, end = range_end; std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { err = txn->get(begin, end, &it, options); ASSERT_EQ(err, TxnErrorCode::TXN_OK); @@ -1312,7 +1312,7 @@ TEST(TxnMemKvTest, ReverseFullRangeGet) { // Get next begin key for reverse range get end = it->last_key(); options.end_key_selector = RangeKeySelector::FIRST_GREATER_OR_EQUAL; - } while (it->more()); + } } std::vector actual_keys; diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 266bc7efb92160..0e89d2d91809df 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -3671,9 +3671,8 @@ TEST(MetaServiceTest, CopyJobTest) { copy_file_key(key_info0, &key0); copy_file_key(key_info1, &key1); std::unique_ptr it; - ASSERT_EQ(txn->get(key0, key1, &it), TxnErrorCode::TXN_OK); int file_cnt = 0; - do { + while (it == nullptr /* may be not init */ || it->more()) { ASSERT_EQ(txn->get(key0, key1, &it), TxnErrorCode::TXN_OK); while (it->has_next()) { auto [k, v] = it->next(); @@ -3686,7 +3685,7 @@ TEST(MetaServiceTest, CopyJobTest) { } } key0.push_back('\x00'); - } while (it->more()); + } ASSERT_EQ(file_cnt, 20); } // 1 copy job with finish status @@ -3699,7 +3698,7 @@ TEST(MetaServiceTest, CopyJobTest) { copy_job_key(key_info1, &key1); std::unique_ptr it; int job_cnt = 0; - do { + while (it == nullptr /* may be not init */ || it->more()) { ASSERT_EQ(txn->get(key0, key1, &it), TxnErrorCode::TXN_OK); while (it->has_next()) { auto [k, v] = it->next(); @@ -3713,7 +3712,7 @@ TEST(MetaServiceTest, CopyJobTest) { } } key0.push_back('\x00'); - } while (it->more()); + } ASSERT_EQ(job_cnt, 1); } } @@ -4160,9 +4159,8 @@ TEST(MetaServiceTest, StageTest) { std::string get_val; ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); std::unique_ptr it; - ASSERT_EQ(txn->get(key0, key1, &it), TxnErrorCode::TXN_OK); int stage_cnt = 0; - do { + while (it == nullptr /* may be not init */ || it->more()) { ASSERT_EQ(txn->get(key0, key1, &it), TxnErrorCode::TXN_OK); while (it->has_next()) { auto [k, v] = it->next(); @@ -4172,7 +4170,7 @@ TEST(MetaServiceTest, StageTest) { } } key0.push_back('\x00'); - } while (it->more()); + } ASSERT_EQ(stage_cnt, 1); } diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index abb1d7069988e3..3533049c60f7d3 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -1215,7 +1215,7 @@ static int get_copy_file_num(TxnKv* txn_kv, const std::string& stage_id, int64_t return -1; } std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { if (txn->get(key0, key1, &it) != TxnErrorCode::TXN_OK) { return -1; } @@ -1224,7 +1224,7 @@ static int get_copy_file_num(TxnKv* txn_kv, const std::string& stage_id, int64_t ++(*file_num); } key0.push_back('\x00'); - } while (it->more()); + } return 0; } @@ -1242,14 +1242,14 @@ static void check_delete_bitmap_keys_size(TxnKv* txn_kv, int64_t tablet_id, int dbm_end_key = meta_delete_bitmap_key({instance_id, tablet_id + 1, "", 0, 0}); } int size = 0; - do { + while (it == nullptr /* may be not init */ || it->more()) { ASSERT_EQ(txn->get(dbm_start_key, dbm_end_key, &it), TxnErrorCode::TXN_OK); while (it->has_next()) { it->next(); size++; } dbm_start_key = it->next_begin_key(); - } while (it->more()); + } EXPECT_EQ(size, expected_size); } @@ -3818,7 +3818,7 @@ TEST(CheckerTest, abnormal_inverted_check_index_file_v1) { DCHECK_EQ(err, TxnErrorCode::TXN_OK) << err; std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { err = txn->get(meta_rowset_key_begin, meta_rowset_key_end, &it); while (it->has_next()) { auto [k, v] = it->next(); @@ -3830,7 +3830,7 @@ TEST(CheckerTest, abnormal_inverted_check_index_file_v1) { } } meta_rowset_key_begin.push_back('\x00'); - } while (it->more()); + } for (const auto& key : rowset_key_to_delete) { std::unique_ptr txn; @@ -3903,7 +3903,7 @@ TEST(CheckerTest, abnormal_inverted_check_index_file_v2) { DCHECK_EQ(err, TxnErrorCode::TXN_OK) << err; std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { err = txn->get(meta_rowset_key_begin, meta_rowset_key_end, &it); while (it->has_next()) { auto [k, v] = it->next(); @@ -3915,7 +3915,7 @@ TEST(CheckerTest, abnormal_inverted_check_index_file_v2) { } } meta_rowset_key_begin.push_back('\x00'); - } while (it->more()); + } for (const auto& key : rowset_key_to_delete) { std::unique_ptr txn; @@ -7318,7 +7318,7 @@ void check_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t s int64_t total_kv = 0; std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { int get_ret = txn_get(txn_kv.get(), begin, end, it); if (get_ret != 0) { // txn kv may complain "Request for future version" LOG(WARNING) << "failed to get kv, range=[" << hex(begin) << "," << hex(end) @@ -7340,7 +7340,7 @@ void check_multiple_txn_info_kvs(std::shared_ptr txn_kv, int64_t s total_kv++; } begin.push_back('\x00'); // Update to next smallest key for iteration - } while (it->more()); + } ASSERT_EQ(total_kv, size); } diff --git a/cloud/test/txn_kv_test.cpp b/cloud/test/txn_kv_test.cpp index 7be9845bc46ae4..b0ff5b24da41f0 100644 --- a/cloud/test/txn_kv_test.cpp +++ b/cloud/test/txn_kv_test.cpp @@ -982,7 +982,7 @@ TEST(TxnKvTest, FullRangeGetIterator) { auto inner_begin = begin; cnt = 0; start = std::chrono::steady_clock::now(); - do { + while (inner_it == nullptr /* may be not init */ || inner_it->more()) { err = txn->get(inner_begin, end, &inner_it, false, 11); ASSERT_EQ(err, TxnErrorCode::TXN_OK); if (!inner_it->has_next()) { @@ -998,7 +998,7 @@ TEST(TxnKvTest, FullRangeGetIterator) { } } inner_begin.push_back('\x00'); // Update to next smallest key for iteration - } while (inner_it->more()); + } finish = std::chrono::steady_clock::now(); EXPECT_EQ(cnt, 100); std::cout << "RangeGetIterator cost=" @@ -1312,7 +1312,7 @@ TEST(TxnKvTest, ReverseFullRangeGet) { std::string begin = range_begin, end = range_end; std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { err = txn->get(begin, end, &it, options); ASSERT_EQ(err, TxnErrorCode::TXN_OK); @@ -1323,7 +1323,7 @@ TEST(TxnKvTest, ReverseFullRangeGet) { // Get next begin key for reverse range get end = it->last_key(); options.end_key_selector = RangeKeySelector::FIRST_GREATER_OR_EQUAL; - } while (it->more()); + } } std::vector actual_keys; @@ -1419,7 +1419,7 @@ TEST(TxnKvTest, ReverseFullRangeGet2) { std::string begin = range_begin, end = range_end; std::unique_ptr it; - do { + while (it == nullptr /* may be not init */ || it->more()) { err = txn->get(begin, end, &it, options); ASSERT_EQ(err, TxnErrorCode::TXN_OK); @@ -1430,7 +1430,7 @@ TEST(TxnKvTest, ReverseFullRangeGet2) { // Get next begin key for reverse range get end = it->last_key(); options.end_key_selector = RangeKeySelector::FIRST_GREATER_OR_EQUAL; - } while (it->more()); + } } std::vector actual_keys; From acc9ddab0d8f0e0e9a0afe85ec0806ea6f775a85 Mon Sep 17 00:00:00 2001 From: Gavin Chou Date: Sun, 14 Dec 2025 21:32:46 +0800 Subject: [PATCH 2/4] Fix UT --- cloud/test/recycler_test.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 3533049c60f7d3..33264d048d12ba 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -7416,12 +7416,12 @@ TEST(RecyclerTest, concurrent_recycle_txn_label_failure_test) { DORIS_CLOUD_DEFER { SyncPoint::get_instance()->clear_all_call_backs(); }; + size_t recycle_txn_info_keys_cnt = 0; sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys", - [](auto&& args) { + [&](auto&& args) { auto* recycle_txn_info_keys = try_any_cast*>(args[0]); - - ASSERT_LE(recycle_txn_info_keys->size(), 10000); + recycle_txn_info_keys_cnt = recycle_txn_info_keys->size(); }); sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.failure", [](auto&& args) { auto* ret = try_any_cast(args[0]); @@ -7440,7 +7440,7 @@ TEST(RecyclerTest, concurrent_recycle_txn_label_failure_test) { std::cout << "recycle expired txn label cost=" << std::chrono::duration_cast(finish - start).count() << "ms" << std::endl; - check_multiple_txn_info_kvs(txn_kv, 5000); + check_multiple_txn_info_kvs(txn_kv, (20000 - recycle_txn_info_keys_cnt)); } TEST(RecyclerTest, concurrent_recycle_txn_label_conflict_test) { config::label_keep_max_second = 0; From ea3451b7876f3848b570d7a4437985b57067926c Mon Sep 17 00:00:00 2001 From: Gavin Chou Date: Tue, 16 Dec 2025 14:29:19 +0800 Subject: [PATCH 3/4] Opt comments --- cloud/src/recycler/recycler.h | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index a9f2b2c662d4b2..5d961c240d7a6a 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -410,10 +410,15 @@ class InstanceRecycler { int init_storage_vault_accessors(); /** - * Scan key-value pairs between [`begin`, `end`), and perform `recycle_func` on each key-value pair. + * Scan key-value pairs between [`begin`, `end`) with multiple rounds of range get(`RangeGetIterator`), + * and perform `recycle_func` on each key-value pair. * - * @param recycle_func defines how to recycle resources corresponding to a key-value pair. Returns 0 if the recycling is successful. - * @param loop_done is called after `RangeGetIterator` has no next kv. Usually used to perform a batch recycling. Returns 0 if success. + * @param recycle_func defines how to recycle resources corresponding to a key-value pair. + * The scan will stop if recycle_func() returns non-zero. + * recycle_func() returns 0 if the recycling is successful or the scan can continue with ignorable errors. + * @param loop_done is called after a round (`RangeGetIterator`) in the scan has no next kv. Usually used to perform a batch recycling. + * The scan will stop if loop_done() returns non-zero. + * loop_done() returns 0 if the recycling is successful or the scan can continue with ignorable errors. * @return 0 if all corresponding resources are recycled successfully, otherwise non-zero */ int scan_and_recycle(std::string begin, std::string_view end, From 67f42b1073b282777616d11e079c98f0ac529b9a Mon Sep 17 00:00:00 2001 From: Gavin Chou Date: Thu, 18 Dec 2025 12:09:47 +0800 Subject: [PATCH 4/4] Restore recycle_func/loop_done return --- cloud/src/recycler/recycler.cpp | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 5bd971db48a0ac..372b518e7603db 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -4884,7 +4884,7 @@ int InstanceRecycler::scan_and_recycle( while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { if (get_range_retried > 1000) { err = "txn_get exceeds max retry(1000), may not scan all keys"; - ret = -2; + ret = -3; return ret; } int get_ret = txn_get(txn_kv_.get(), begin, end, it); @@ -4908,23 +4908,17 @@ int InstanceRecycler::scan_and_recycle( begin = k; VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k); } - // if we want to continue scanning, the recycle_func should not return non-zero - int recycle_func_ret = recycle_func(k, v); - if (recycle_func_ret != 0) { - err = "recycle_func error, ret=" + std::to_string(recycle_func_ret); + // FIXME(gavin): if we want to continue scanning, the recycle_func should not return non-zero + if (recycle_func(k, v) != 0) { + err = "recycle_func error"; ret = -1; - return ret; } } begin.push_back('\x00'); // Update to next smallest key for iteration - // if we want to continue scanning, the loop_done should not return non-zero - if (loop_done) { - int loop_done_ret = loop_done(); - if (loop_done_ret != 0) { - err = "loop_done error, ret=" + std::to_string(loop_done_ret); - ret = -1; - return ret; - } + // FIXME(gavin): if we want to continue scanning, the loop_done should not return non-zero + if (loop_done && loop_done() != 0) { + err = "loop_done error"; + ret = -1; } } return ret;