Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ CONF_String(kerberos_krb5_conf_path, "/etc/krb5.conf");

CONF_mBool(enable_distinguish_hdfs_path, "true");

// If enabled, the txn status will be checked when preapre/commit rowset
CONF_mBool(enable_load_txn_status_check, "true");

// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
// this is a list in semicolon-delimited format, in CIDR notation,
Expand Down
97 changes: 96 additions & 1 deletion cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,73 @@ static void fill_schema_from_dict(MetaServiceCode& code, std::string& msg,
existed_rowset_meta->CopyFrom(metas.Get(0));
}

/**
* Check if the transaction status is as expected.
* If the transaction is not in the expected state, return false and set the error code and message.
*
* @param expect_status The expected transaction status.
* @param txn Pointer to the transaction object.
* @param instance_id The instance ID associated with the transaction.
* @param txn_id The transaction ID to check.
* @param code Reference to the error code to be set in case of failure.
* @param msg Reference to the error message to be set in case of failure.
* @return true if the transaction status matches the expected status, false otherwise.
*/
static bool check_transaction_status(TxnStatusPB expect_status, Transaction* txn,
const std::string& instance_id, int64_t txn_id,
MetaServiceCode& code, std::string& msg) {
// Get db id with txn id
std::string index_val;
const std::string index_key = txn_index_key({instance_id, txn_id});
TxnErrorCode err = txn->get(index_key, &index_val);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get db id, txn_id={} err={}", txn_id, err);
return false;
}

TxnIndexPB index_pb;
if (!index_pb.ParseFromString(index_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("failed to parse txn_index_pb, txn_id={}", txn_id);
return false;
}

DCHECK(index_pb.has_tablet_index() == true);
DCHECK(index_pb.tablet_index().has_db_id() == true);
if (!index_pb.has_tablet_index() || !index_pb.tablet_index().has_db_id()) {
LOG(WARNING) << fmt::format(
"txn_index_pb is malformed, tablet_index has no db_id, txn_id={}", txn_id);
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("has no db_id in TxnIndexPB, txn_id={}", txn_id);
return false;
}
auto db_id = index_pb.tablet_index().db_id();
txn_id = index_pb.has_parent_txn_id() ? index_pb.parent_txn_id() : txn_id;

const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
std::string info_val;
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get txn, txn_id={}, err={}", txn_id, err);
return false;
}
TxnInfoPB txn_info;
if (!txn_info.ParseFromString(info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("failed to parse txn_info, db_id={} txn_id={}", db_id, txn_id);
return false;
}
if (txn_info.status() != expect_status) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("txn is not in {} state, txn_id={}, txn_status={}", expect_status, txn_id,
txn_info.status());
return false;
}
return true;
}

/**
* 1. Check and confirm tmp rowset kv does not exist
* 2. Construct recycle rowset kv which contains object path
Expand Down Expand Up @@ -1032,6 +1099,20 @@ void MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
return;
}

// Check if the prepare rowset request is invalid.
// If the transaction has been finished, it means this prepare rowset is a timeout retry request.
// In this case, do not write the recycle key again, otherwise it may cause data loss.
// If the rowset had load id, it means it is a load request, otherwise it is a
// compaction/sc request.
if (config::enable_load_txn_status_check && rowset_meta.has_load_id() &&
!check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(), instance_id,
rowset_meta.txn_id(), code, msg)) {
LOG(WARNING) << "prepare rowset failed, txn_id=" << rowset_meta.txn_id()
<< ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id
<< ", rowset_state=" << rowset_meta.rowset_state() << ", msg=" << msg;
return;
}

// Check if commit key already exists.
std::string val;
err = txn->get(tmp_rs_key, &val);
Expand Down Expand Up @@ -1155,6 +1236,20 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
return;
}

// Check if the commit rowset request is invalid.
// If the transaction has been finished, it means this commit rowset is a timeout retry request.
// In this case, do not write the recycle key again, otherwise it may cause data loss.
// If the rowset has load id, it means it is a load request, otherwise it is a
// compaction/sc request.
if (config::enable_load_txn_status_check && rowset_meta.has_load_id() &&
!check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(), instance_id,
rowset_meta.txn_id(), code, msg)) {
LOG(WARNING) << "commit rowset failed, txn_id=" << rowset_meta.txn_id()
<< ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id
<< ", rowset_state=" << rowset_meta.rowset_state() << ", msg=" << msg;
return;
}

// Check if commit key already exists.
std::string existed_commit_val;
err = txn->get(tmp_rs_key, &existed_commit_val);
Expand Down Expand Up @@ -3408,4 +3503,4 @@ void MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController* control
response->mutable_schema_dict()->Swap(&schema_dict);
}

} // namespace doris::cloud
} // namespace doris::cloud
1 change: 1 addition & 0 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3167,6 +3167,7 @@ void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controlle
std::string index_val;
TxnIndexPB index_pb;
index_pb.mutable_tablet_index()->set_db_id(db_id);
index_pb.set_parent_txn_id(txn_id);
if (!index_pb.SerializeToString(&index_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_index_pb "
Expand Down
67 changes: 67 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9260,4 +9260,71 @@ TEST(MetaServiceTest, AddObjInfoWithRole) {
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
}

TEST(MetaServiceTest, StalePrepareRowset) {
auto meta_service = get_meta_service();

int64_t table_id = 1;
int64_t partition_id = 1;
int64_t tablet_id = 1;
int64_t db_id = 100201;
std::string label = "test_prepare_rowset";
create_tablet(meta_service.get(), table_id, 1, partition_id, tablet_id);

int64_t txn_id = 0;
ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, table_id, txn_id));
CreateRowsetResponse res;
auto rowset = create_rowset(txn_id, tablet_id, partition_id);
rowset.mutable_load_id()->set_hi(123);
rowset.mutable_load_id()->set_lo(456);
prepare_rowset(meta_service.get(), rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
res.Clear();
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;

prepare_rowset(meta_service.get(), rowset, res);
ASSERT_TRUE(res.status().msg().find("rowset already exists") != std::string::npos)
<< res.status().msg();
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) << res.status().code();

commit_txn(meta_service.get(), db_id, txn_id, label);
prepare_rowset(meta_service.get(), rowset, res);
ASSERT_TRUE(res.status().msg().find("txn is not in") != std::string::npos)
<< res.status().msg();
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().code();
}

TEST(MetaServiceTest, StaleCommitRowset) {
auto meta_service = get_meta_service();

int64_t table_id = 1;
int64_t partition_id = 1;
int64_t tablet_id = 1;
int64_t db_id = 100201;
std::string label = "test_prepare_rowset";
create_tablet(meta_service.get(), table_id, 1, partition_id, tablet_id);

int64_t txn_id = 0;
ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, table_id, txn_id));
CreateRowsetResponse res;
auto rowset = create_rowset(txn_id, tablet_id, partition_id);
rowset.mutable_load_id()->set_hi(123);
rowset.mutable_load_id()->set_lo(456);
prepare_rowset(meta_service.get(), rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
res.Clear();
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;

ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;

commit_txn(meta_service.get(), db_id, txn_id, label);
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
ASSERT_TRUE(res.status().msg().find("txn is not in") != std::string::npos)
<< res.status().msg();
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().code();
}

} // namespace doris::cloud
1 change: 1 addition & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ message TxnLabelPB {
// txn_id -> db_id
message TxnIndexPB {
optional TabletIndexPB tablet_index = 1;
optional int64 parent_txn_id = 2;
}

message TxnInfoPB {
Expand Down
Loading