Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cloud/script/run_all_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ for i in *_test; do
if [[ "${fdb}" != "" ]]; then
patchelf --set-rpath "$(pwd)" "${i}"
fi

LLVM_PROFILE_FILE="./report/${i}.profraw"
set -euo pipefail
if [[ "${filter}" == "" ]]; then
LLVM_PROFILE_FILE="./report/${i}.profraw" "./${i}" --gtest_print_time=true --gtest_output="xml:${i}.xml"
"./${i}" --gtest_print_time=true --gtest_output="xml:${i}.xml"
else
LLVM_PROFILE_FILE="./report/${i}.profraw" "./${i}" --gtest_print_time=true --gtest_output="xml:${i}.xml" --gtest_filter="${filter}"
"./${i}" --gtest_print_time=true --gtest_output="xml:${i}.xml" --gtest_filter="${filter}"
fi
set +euo pipefail
unittest_files[${#unittest_files[*]}]="${i}"
Expand Down
23 changes: 22 additions & 1 deletion cloud/src/meta-service/injection_point_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,27 @@ static void register_suites() {
}
});
});

suite_map.emplace("sleep_before_execute_commit_rowset", [] {
int duration_ms = 15000;
std::shared_ptr<std::atomic_long> hit(new std::atomic_long(0));
LOG(INFO) << "register injection point sleep_before_execute_commit_rowset sleep for "
<< duration_ms << " ms";
auto sp = SyncPoint::get_instance();

sp->set_call_back("MetaServiceImpl::commit_rowset", [duration_ms, hit](auto&& args) {
if (hit->fetch_add(1) == 0) {
const CreateRowsetRequest* req = try_any_cast<const CreateRowsetRequest*>(args[0]);
LOG(INFO) << "hit injection point sleep_before_execute_commit_rowset sleep for "
<< duration_ms << " ms, request=" << req->ShortDebugString();
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
return;
}
LOG(WARNING) << "injection point sleep_before_execute_commit_rowset without sleep, hit="
<< hit->load();
});
});

suite_map.emplace("Transaction::commit.enable_inject", []() {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("Transaction::commit.inject_random_fault", [](auto&& args) {
Expand Down Expand Up @@ -366,4 +387,4 @@ HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller*

return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown op:" + op);
}
} // namespace doris::cloud
} // namespace doris::cloud
11 changes: 11 additions & 0 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,7 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
CreateRowsetResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(commit_rowset);
TEST_INJECTION_POINT_CALLBACK("MetaServiceImpl::commit_rowset", request);
if (!request->has_rowset_meta()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no rowset meta";
Expand Down Expand Up @@ -1222,6 +1223,16 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
}

auto recycle_rs_key = recycle_rowset_key({instance_id, tablet_id, rowset_id});
std::string recycle_rs_val;
err = txn->get(recycle_rs_key, &recycle_rs_val);
if (err != TxnErrorCode::TXN_OK) {
std::stringstream ss;
ss << "failed to get recycle_rowset_key when commit rowset instance_id=" << instance_id
<< " tablet_id=" << tablet_id << " rowset_id=" << rowset_id << " ret=" << err;
code = cast_as<ErrCategory::READ>(err);
msg = ss.str();
return;
}
txn->remove(recycle_rs_key);

DCHECK_GT(rowset_meta.txn_expiration(), 0);
Expand Down
1 change: 1 addition & 0 deletions cloud/src/recycler/hdfs_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ std::string extract_parent_path(const std::string& path) {
}

int HdfsAccessor::init() {
TEST_SYNC_POINT_RETURN_WITH_VALUE("HdfsAccessor::init", 0);
TEST_SYNC_POINT_RETURN_WITH_VALUE("HdfsAccessor::init.hdfs_init_failed", (int)-1);
// TODO(plat1ko): Cache hdfsFS
fs_ = HDFSBuilder::create_fs(info_.build_conf());
Expand Down
11 changes: 9 additions & 2 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1991,7 +1991,7 @@ int InstanceRecycler::recycle_rowsets() {
return final_expiration;
};

auto handle_rowset_kv = [&](std::string_view k, std::string_view v) -> int {
auto handle_recycle_rowset_kv = [&](std::string_view k, std::string_view v) -> int {
++num_scanned;
total_rowset_key_size += k.size();
total_rowset_value_size += v.size();
Expand All @@ -2008,6 +2008,13 @@ int InstanceRecycler::recycle_rowsets() {
if (current_time < calc_expiration(rowset)) { // not expired
return 0;
}
// TODO(gavin): The recycle key is marked as an expired rowset, indicating that it should be
// recycled. However, it may still be referenced by load, SC, or compaction
// processes.
// To resolve this, create a key-value transaction to read and update its
// status to EXPIRED. This will ensure that it conflicts with other commit
// procedures, preventing potential issues.

++num_expired;
expired_rowset_size += v.size();
if (!rowset.has_type()) { // old version `RecycleRowsetPB`
Expand Down Expand Up @@ -2097,7 +2104,7 @@ int InstanceRecycler::recycle_rowsets() {
return 0;
};

int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1, std::move(handle_rowset_kv),
int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1, std::move(handle_recycle_rowset_kv),
std::move(loop_done));
worker_pool->stop();

Expand Down
2 changes: 1 addition & 1 deletion cloud/test/fdb_injection_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ int main(int argc, char** argv) {
[](auto&& args) { *try_any_cast<uint64_t*>(args[0]) = 0; });
sp->set_call_back("put_schema_kv:schema_key_exists_return",
[](auto&& args) { *try_any_cast<bool*>(args.back()) = true; });
sp->set_call_back("resource_manager::set_safe_drop_time",
sp->set_call_back("resource_manager::set_safe_drop_time", // make it always safe to drop
[](auto&& args) { *try_any_cast<int64_t*>(args[0]) = -1; });

meta_service = create_meta_service();
Expand Down
61 changes: 53 additions & 8 deletions cloud/test/meta_service_job_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ doris::RowsetMetaCloudPB create_rowset(int64_t tablet_id, int64_t start_version,
return rowset;
}

static void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res) {
brpc::Controller cntl;
CreateRowsetRequest req;
req.mutable_rowset_meta()->CopyFrom(rowset);
meta_service->prepare_rowset(&cntl, &req, &res, nullptr);
}

void commit_rowset(MetaService* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res) {
brpc::Controller cntl;
Expand Down Expand Up @@ -2353,6 +2361,9 @@ TEST(MetaServiceJobTest, SchemaChangeJobTest) {
for (int64_t i = 0; i < 5; ++i) {
output_rowsets.push_back(create_rowset(new_tablet_id, i + 2, i + 2));
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), output_rowsets.back(), res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), output_rowsets.back(), res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << i;
}
Expand Down Expand Up @@ -2430,6 +2441,9 @@ TEST(MetaServiceJobTest, SchemaChangeJobTest) {
output_rowsets.push_back(create_rowset(new_tablet_id, 13, 13));
for (auto& rs : output_rowsets) {
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), rs, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), rs, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << rs.end_version();
}
Expand Down Expand Up @@ -2566,8 +2580,12 @@ TEST(MetaServiceJobTest, RetrySchemaChangeJobTest) {
be1_output_rowsets.push_back(create_rowset(new_tablet_id, 11, 11));
for (auto& rs : be1_output_rowsets) {
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), rs, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), rs, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << rs.end_version();
ASSERT_EQ(res.status().code(), MetaServiceCode::OK)
<< "end_version=" << rs.end_version() << res.status().ShortDebugString();
}
sc_res.Clear();
// FE thinks BE1 is not alive and retries "job2" on BE2, should preempt "job2" created by BE1
Expand All @@ -2579,17 +2597,28 @@ TEST(MetaServiceJobTest, RetrySchemaChangeJobTest) {
{
CreateRowsetResponse res;
// [2-8] has committed by BE1
commit_rowset(meta_service.get(), create_rowset(new_tablet_id, 2, 8), res);
auto rs_meta = create_rowset(new_tablet_id, 2, 8);
prepare_rowset(meta_service.get(), rs_meta, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED)
<< res.status().ShortDebugString();
res.Clear();
commit_rowset(meta_service.get(), rs_meta, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED);
ASSERT_TRUE(res.has_existed_rowset_meta());
ASSERT_EQ(res.existed_rowset_meta().rowset_id_v2(), be1_output_rowsets[0].rowset_id_v2());
be2_output_rowsets.push_back(res.existed_rowset_meta());
res.Clear();
be2_output_rowsets.push_back(create_rowset(new_tablet_id, 9, 12));
prepare_rowset(meta_service.get(), be2_output_rowsets.back(), res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), be2_output_rowsets.back(), res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
be2_output_rowsets.push_back(create_rowset(new_tablet_id, 13, 13));
prepare_rowset(meta_service.get(), be2_output_rowsets.back(), res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), be2_output_rowsets.back(), res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down Expand Up @@ -2711,6 +2740,9 @@ TEST(MetaServiceJobTest, SchemaChangeJobWithMoWTest) {
for (int64_t i = 0; i < 5; ++i) {
output_rowsets.push_back(create_rowset(new_tablet_id, i + 2, i + 2));
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), output_rowsets.back(), res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), output_rowsets.back(), res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << i;
}
Expand Down Expand Up @@ -2762,6 +2794,9 @@ TEST(MetaServiceJobTest, SchemaChangeJobWithMoWTest) {
for (int64_t i = 0; i < 5; ++i) {
output_rowsets.push_back(create_rowset(new_tablet_id, i + 2, i + 2));
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), output_rowsets.back(), res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), output_rowsets.back(), res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << i;
}
Expand Down Expand Up @@ -2916,9 +2951,14 @@ TEST(MetaServiceJobTest, ConcurrentCompactionTest) {
{
// Provide output rowset
auto output_rowset = create_rowset(tablet_id, 5, 10);
CreateRowsetResponse rowset_res;
commit_rowset(meta_service.get(), output_rowset, rowset_res);
ASSERT_EQ(rowset_res.status().code(), MetaServiceCode::OK);
{
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), output_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), output_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

FinishTabletJobRequest req;
FinishTabletJobResponse res;
Expand Down Expand Up @@ -3028,9 +3068,14 @@ TEST(MetaServiceJobTest, ConcurrentCompactionTest) {
{
// Provide output rowset
auto output_rowset = create_rowset(tablet_id, 2, 4);
CreateRowsetResponse rowset_res;
commit_rowset(meta_service.get(), output_rowset, rowset_res);
ASSERT_EQ(rowset_res.status().code(), MetaServiceCode::OK);
{
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), output_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), output_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

FinishTabletJobRequest req;
FinishTabletJobResponse res;
Expand Down
35 changes: 34 additions & 1 deletion cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ static void commit_txn(MetaServiceProxy* meta_service, int64_t db_id, int64_t tx
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
}

/**
* Generates a rowset meta represented by RowsetMetaCloudPB
*/
static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id,
int partition_id = 10, int64_t version = -1,
int num_rows = 100) {
Expand Down Expand Up @@ -1747,6 +1750,9 @@ TEST(MetaServiceTest, CommitTxnTest) {
create_tablet(meta_service.get(), 1234, 1235, 1236, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i);
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down Expand Up @@ -1845,6 +1851,9 @@ TEST(MetaServiceTest, CommitTxnExpiredTest) {
create_tablet(meta_service.get(), 1234789234, 1235, 1236, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i);
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down Expand Up @@ -1873,6 +1882,9 @@ static void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t tab
create_tablet(meta_service, table_id, index_id, partition_id, tablet_id);
auto tmp_rowset = create_rowset(txn_id, tablet_id, partition_id);
CreateRowsetResponse res;
prepare_rowset(meta_service, tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service, tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down Expand Up @@ -2466,6 +2478,9 @@ TEST(MetaServiceTest, AbortTxnTest) {
create_tablet(meta_service.get(), 12345, 1235, 1236, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i);
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down Expand Up @@ -2516,8 +2531,11 @@ TEST(MetaServiceTest, AbortTxnTest) {
create_tablet(meta_service.get(), table_id, 1235, 1236, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
prepare_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), tmp_rowset, res);
EXPECT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().ShortDebugString();
}

// abort txn by db_id and label
Expand Down Expand Up @@ -2706,6 +2724,9 @@ TEST(MetaServiceTest, CheckTxnConflictTest) {
create_tablet(meta_service.get(), table_id, 1235, 1236, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i);
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down Expand Up @@ -2977,6 +2998,9 @@ TEST(MetaServiceTest, CleanTxnLabelTest) {
create_tablet(meta_service.get(), 1234, 1235, 1236, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i);
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down Expand Up @@ -3230,6 +3254,9 @@ TEST(MetaServiceTest, CleanTxnLabelTest) {
create_tablet(meta_service.get(), 1234, 1235, 1236, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i);
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down Expand Up @@ -3298,6 +3325,9 @@ TEST(MetaServiceTest, CleanTxnLabelTest) {
create_tablet(meta_service.get(), 1234, 1235, 1236, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i);
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down Expand Up @@ -6145,6 +6175,9 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) {
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i);
tmp_rowset.set_partition_id(partition_id);
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down
Loading