Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ void commit_txn_immediately(
std::tie(code, msg) = task->wait();
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id
<< " code=" << code << "msg=" << msg;
<< " code=" << code << " msg=" << msg;
return;
}
last_pending_txn_id = 0;
Expand Down Expand Up @@ -1654,7 +1654,7 @@ void commit_txn_eventually(
std::tie(code, msg) = task->wait();
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id
<< " code=" << code << "msg=" << msg;
<< " code=" << code << " msg=" << msg;
return;
}

Expand Down
60 changes: 48 additions & 12 deletions cloud/src/meta-service/txn_lazy_committer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,24 +303,25 @@ void TxnLazyCommitTask::commit() {
code_ = MetaServiceCode::OK;
msg_.clear();
int64_t db_id;
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> tmp_rowset_metas;
scan_tmp_rowset(instance_id_, txn_id_, txn_kv_, code_, msg_, &db_id, &tmp_rowset_metas);
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> all_tmp_rowset_metas;
scan_tmp_rowset(instance_id_, txn_id_, txn_kv_, code_, msg_, &db_id,
&all_tmp_rowset_metas);
if (code_ != MetaServiceCode::OK) {
LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id_ << " code=" << code_;
break;
}

VLOG_DEBUG << "txn_id=" << txn_id_
<< " tmp_rowset_metas.size()=" << tmp_rowset_metas.size();
if (tmp_rowset_metas.size() == 0) {
<< " tmp_rowset_metas.size()=" << all_tmp_rowset_metas.size();
if (all_tmp_rowset_metas.size() == 0) {
LOG(INFO) << "empty tmp_rowset_metas, txn_id=" << txn_id_;
}

// <partition_id, tmp_rowsets>
std::unordered_map<int64_t,
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>>
partition_to_tmp_rowset_metas;
for (auto& [tmp_rowset_key, tmp_rowset_pb] : tmp_rowset_metas) {
for (auto& [tmp_rowset_key, tmp_rowset_pb] : all_tmp_rowset_metas) {
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].emplace_back();
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().first =
tmp_rowset_key;
Expand All @@ -346,7 +347,6 @@ void TxnLazyCommitTask::commit() {
}
if (code_ != MetaServiceCode::OK) break;

DCHECK(tmp_rowset_metas.size() > 0);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
Expand All @@ -358,13 +358,39 @@ void TxnLazyCommitTask::commit() {
}

int64_t table_id = -1;
for (auto& [tmp_rowset_key, tmp_rowset_pb] : tmp_rowset_metas) {
if (table_id <= 0) {
table_id = tablet_ids[tmp_rowset_pb.tablet_id()].table_id();
DCHECK(tmp_rowset_metas.size() > 0);
if (table_id <= 0) {
if (tablet_ids.size() > 0) {
// get table_id from memory cache
table_id = tablet_ids.begin()->second.table_id();
} else {
// get table_id from storage
int64_t first_tablet_id = tmp_rowset_metas.begin()->second.tablet_id();
std::string tablet_idx_key =
meta_tablet_idx_key({instance_id_, first_tablet_id});
std::string tablet_idx_val;
err = txn->get(tablet_idx_key, &tablet_idx_val, true);
if (TxnErrorCode::TXN_OK != err) {
code_ = err == TxnErrorCode::TXN_KEY_NOT_FOUND
? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get tablet idx, txn_id=" << txn_id_
<< " key=" << hex(tablet_idx_key) << " err=" << err;
msg_ = ss.str();
LOG(WARNING) << msg_;
break;
}

TabletIndexPB tablet_idx_pb;
if (!tablet_idx_pb.ParseFromString(tablet_idx_val)) {
code_ = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse tablet idx pb txn_id=" << txn_id_
<< " key=" << hex(tablet_idx_key);
msg_ = ss.str();
break;
}
table_id = tablet_idx_pb.table_id();
}
txn->remove(tmp_rowset_key);
LOG(INFO) << "remove tmp_rowset_key=" << hex(tmp_rowset_key)
<< " txn_id=" << txn_id_;
}

DCHECK(table_id > 0);
Expand Down Expand Up @@ -415,6 +441,12 @@ void TxnLazyCommitTask::commit() {
LOG(INFO) << "put ver_key=" << hex(ver_key) << " txn_id=" << txn_id_
<< " version_pb=" << version_pb.ShortDebugString();

for (auto& [tmp_rowset_key, tmp_rowset_pb] : tmp_rowset_metas) {
txn->remove(tmp_rowset_key);
LOG(INFO) << "remove tmp_rowset_key=" << hex(tmp_rowset_key)
<< " txn_id=" << txn_id_;
}

err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code_ = cast_as<ErrCategory::COMMIT>(err);
Expand All @@ -424,6 +456,10 @@ void TxnLazyCommitTask::commit() {
}
}
}
if (code_ != MetaServiceCode::OK) {
LOG(WARNING) << "txn_id=" << txn_id_ << " code=" << code_ << " msg=" << msg_;
break;
}
make_committed_txn_visible(instance_id_, db_id, txn_id_, txn_kv_, code_, msg_);
} while (false);
} while (code_ == MetaServiceCode::KV_TXN_CONFLICT &&
Expand Down