Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cloud/src/meta-service/http_encode_key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,14 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const brpc::URI& uri) {
std::string end_key = key + "\xff";
std::unique_ptr<RangeGetIterator> it;
bool more = false;
do {
while (it == nullptr /* may be not init */ || more) {
err = txn->get(begin_key, end_key, &it, true);
if (err != TxnErrorCode::TXN_OK) break;
begin_key = it->next_begin_key();
more = it->more();
value.iters.push_back(std::move(it));
} while (more);
if (!more) break;
}
} else {
err = cloud::blob_get(txn.get(), key, &value, true);
}
Expand Down
17 changes: 9 additions & 8 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ void scan_restore_job_rowset(
});

std::unique_ptr<RangeGetIterator> it;
do {
while (it == nullptr /* may be not init */ || it->more()) {
TxnErrorCode err = txn->get(restore_job_rs_key0, restore_job_rs_key1, &it, true);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
Expand All @@ -1346,7 +1346,7 @@ void scan_restore_job_rowset(
if (!it->has_next()) restore_job_rs_key0 = k;
}
restore_job_rs_key0.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
}
return;
}

Expand Down Expand Up @@ -2863,7 +2863,7 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end,
};

std::stringstream ss;
do {
while (it == nullptr /* may be not init */ || it->more()) {
TxnErrorCode err = txn->get(key0, key1, &it);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
Expand Down Expand Up @@ -2900,7 +2900,7 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end,
}
}
key0.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
}
}

std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(int64_t req_bc_cnt, int64_t bc_cnt,
Expand Down Expand Up @@ -4155,7 +4155,7 @@ void MetaServiceImpl::get_delete_bitmap(google::protobuf::RpcController* control
int64_t last_ver = -1;
int64_t last_seg_id = -1;
int64_t round = 0;
do {
while (it == nullptr /* may be not init */ || it->more()) {
if (test) {
LOG(INFO) << "test";
err = txn->get(start_key, end_key, &it, false, 2);
Expand Down Expand Up @@ -4233,7 +4233,7 @@ void MetaServiceImpl::get_delete_bitmap(google::protobuf::RpcController* control
if (code != MetaServiceCode::OK) return;
round++;
start_key = it->next_begin_key(); // Update to next smallest key for iteration
} while (it->more());
}
LOG(INFO) << "get delete bitmap for tablet=" << tablet_id
<< ", rowset=" << rowset_ids[i] << ", start version=" << begin_versions[i]
<< ", end version=" << end_versions[i] << ", internal round=" << round
Expand Down Expand Up @@ -4872,7 +4872,8 @@ void MetaServiceImpl::get_delete_bitmap_update_lock_v2(
MowTabletJobPB mow_tablet_job;
std::unique_ptr<RangeGetIterator> it;
int64_t expired_job_num = 0;
do {
while (it == nullptr /* may be not init */ ||
(it->more() && !has_unexpired_compaction)) {
err = txn->get(key0, key1, &it);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
Expand Down Expand Up @@ -4903,7 +4904,7 @@ void MetaServiceImpl::get_delete_bitmap_update_lock_v2(
}
}
key0 = it->next_begin_key(); // Update to next smallest key for iteration
} while (it->more() && !has_unexpired_compaction);
}
if (has_unexpired_compaction) {
// TODO print initiator
ss << "already be locked by lock_id=" << lock_info.lock_id()
Expand Down
8 changes: 4 additions & 4 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ std::pair<MetaServiceCode, std::string> scan_compaction_input_rowsets(
};

auto rs_start1 = rs_start;
do {
while (it == nullptr /* may be not init */ || it->more()) {
TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
if (err != TxnErrorCode::TXN_OK) {
return {cast_as<ErrCategory::READ>(err),
Expand All @@ -827,7 +827,7 @@ std::pair<MetaServiceCode, std::string> scan_compaction_input_rowsets(
if (!it->has_next()) rs_start1 = k;
}
rs_start1.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
}
return {MetaServiceCode::OK, ""};
}

Expand Down Expand Up @@ -1325,7 +1325,7 @@ std::pair<MetaServiceCode, std::string> scan_schema_change_input_rowsets(
std::string& rs_start, std::string& rs_end, auto&& callback) {
std::unique_ptr<RangeGetIterator> it;
auto rs_start1 = rs_start;
do {
while (it == nullptr /* may be not init */ || it->more()) {
TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
if (err != TxnErrorCode::TXN_OK) {
return {MetaServiceCode::KV_TXN_GET_ERR,
Expand All @@ -1351,7 +1351,7 @@ std::pair<MetaServiceCode, std::string> scan_schema_change_input_rowsets(
if (!it->has_next()) rs_start1 = k;
}
rs_start1.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
}
return {MetaServiceCode::OK, ""};
}

Expand Down
8 changes: 4 additions & 4 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ void MetaServiceImpl::get_obj_store_info(google::protobuf::RpcController* contro
std::string storage_vault_start = storage_vault_key({instance.instance_id(), ""});
std::string storage_vault_end = storage_vault_key({instance.instance_id(), "\xff"});
std::unique_ptr<RangeGetIterator> it;
do {
while (it == nullptr /* may be not init */ || it->more()) {
TxnErrorCode err = txn->get(storage_vault_start, storage_vault_end, &it);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
Expand All @@ -533,7 +533,7 @@ void MetaServiceImpl::get_obj_store_info(google::protobuf::RpcController* contro
}
}
storage_vault_start.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
}
}
for (auto& vault : *response->mutable_storage_vault()) {
if (vault.has_obj_info()) {
Expand Down Expand Up @@ -4416,7 +4416,7 @@ void MetaServiceImpl::get_copy_files(google::protobuf::RpcController* controller
copy_job_key(key_info0, &key0);
copy_job_key(key_info1, &key1);
std::unique_ptr<RangeGetIterator> it;
do {
while (it == nullptr /* may be not init */ || it->more()) {
TxnErrorCode err = txn->get(key0, key1, &it);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
Expand All @@ -4440,7 +4440,7 @@ void MetaServiceImpl::get_copy_files(google::protobuf::RpcController* controller
}
}
key0.push_back('\x00');
} while (it->more());
}
}

void MetaServiceImpl::filter_copy_files(google::protobuf::RpcController* controller,
Expand Down
4 changes: 2 additions & 2 deletions cloud/src/meta-service/meta_service_tablet_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void internal_get_tablet_stats(MetaServiceCode& code, std::string& msg, Transact
7); // aggregate + data_size + num_rows + num_rowsets + num_segments + index_size + segment_size

std::unique_ptr<RangeGetIterator> it;
do {
while (it == nullptr /* may be not init */ || it->more()) {
TxnErrorCode err = txn->get(begin_key, end_key, &it, snapshot);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
Expand All @@ -64,7 +64,7 @@ void internal_get_tablet_stats(MetaServiceCode& code, std::string& msg, Transact
std::string {v.data(), v.size()});
}
begin_key = it->next_begin_key();
} while (it->more());
}

if (stats_kvs.empty()) {
code = MetaServiceCode::TABLET_NOT_FOUND;
Expand Down
16 changes: 8 additions & 8 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ void scan_tmp_rowset(
};

std::unique_ptr<RangeGetIterator> it;
do {
while (it == nullptr /* may be not init */ || it->more()) {
err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
if (err == TxnErrorCode::TXN_TOO_OLD) {
err = txn_kv->create_txn(&txn);
Expand Down Expand Up @@ -1169,7 +1169,7 @@ void scan_tmp_rowset(
if (!it->has_next()) rs_tmp_key0 = k;
}
rs_tmp_key0.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
}

VLOG_DEBUG << "txn_id=" << txn_id << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta->size();
return;
Expand Down Expand Up @@ -3805,7 +3805,7 @@ void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcControll
int64_t abort_txn_cnt = 0;
int64_t total_iteration_cnt = 0;
bool need_commit = false;
do {
while (it == nullptr /* may be not init */ || it->more()) {
err = txn->get(begin_info_key, end_info_key, &it, true);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
Expand Down Expand Up @@ -3847,7 +3847,7 @@ void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcControll
}
}
begin_info_key.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
}
LOG(INFO) << "abort txn count: " << abort_txn_cnt
<< " total iteration count: " << total_iteration_cnt;
if (need_commit) {
Expand Down Expand Up @@ -3924,7 +3924,7 @@ void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont
int64_t skip_timeout_txn_cnt = 0;
int total_iteration_cnt = 0;
bool finished = true;
do {
while (it == nullptr /* may be not init */ || it->more()) {
err = txn->get(begin_running_key, end_running_key, &it, true);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
Expand Down Expand Up @@ -4002,7 +4002,7 @@ void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont
}
}
begin_running_key.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
}
LOG(INFO) << "skip timeout txn count: " << skip_timeout_txn_cnt
<< " conflict txn count: " << response->conflict_txns_size()
<< " total iteration count: " << total_iteration_cnt;
Expand Down Expand Up @@ -4195,7 +4195,7 @@ void MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* control
bool snapshot = true;
int limit = 1000;
TEST_SYNC_POINT_CALLBACK("clean_txn_label:limit", &limit);
do {
while (it == nullptr /* may be not init */ || it->more()) {
std::unique_ptr<Transaction> txn;
auto err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
Expand Down Expand Up @@ -4241,7 +4241,7 @@ void MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* control
}
}
begin_label_key.push_back('\x00');
} while (it->more());
}
} else {
const std::string& label = request->labels(0);
const std::string label_key = txn_label_key({instance_id, db_id, label});
Expand Down
9 changes: 4 additions & 5 deletions cloud/src/meta-store/blob_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,16 @@ TxnErrorCode ValueBuf::get(Transaction* txn, std::string_view key, bool snapshot
}
begin_key = it->next_begin_key();
iters.push_back(std::move(it));
do {
while (it == nullptr /* may be not init */ || more) {
err = txn->get(begin_key, end_key, &it, snapshot);
if (err != TxnErrorCode::TXN_OK) {
return err;
}
begin_key = it->next_begin_key();
more = it->more();
if (more) {
begin_key = it->next_begin_key();
}
iters.push_back(std::move(it));
} while (more);
if (!more) break;
}
return TxnErrorCode::TXN_OK;
}

Expand Down
2 changes: 1 addition & 1 deletion cloud/src/meta-store/txn_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ TxnErrorCode Transaction::get_conflicting_range(

FDBKeyValue const* out_kvs;
int out_kvs_count;
fdb_bool_t out_more;
fdb_bool_t out_more = false;
do {
fdb_error_t err =
fdb_future_get_keyvalue_array(future, &out_kvs, &out_kvs_count, &out_more);
Expand Down
Loading