Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1310,15 +1310,27 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end,

while (it->has_next()) {
auto [k, v] = it->next();
auto rs = response->add_rowset_meta();
auto* rs = response->add_rowset_meta();
auto byte_size = rs->ByteSizeLong();
TEST_SYNC_POINT_CALLBACK("get_rowset:meta_exceed_limit", &byte_size);
if (byte_size + v.size() > std::numeric_limits<int32_t>::max()) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format(
"rowset meta exceeded 2G, unable to serialize, key={}. byte_size={}",
hex(k), byte_size);
LOG(WARNING) << msg;
return;
}
if (!rs->ParseFromArray(v.data(), v.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed rowset meta, unable to deserialize";
msg = "malformed rowset meta, unable to serialize";
LOG(WARNING) << msg << " key=" << hex(k);
return;
}
++num_rowsets;
if (!it->has_next()) key0 = k;
if (!it->has_next()) {
key0 = k;
}
}
key0.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
Expand Down
90 changes: 89 additions & 1 deletion cloud/test/txn_lazy_commit_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <limits>
#include <memory>
#include <random>
#include <string>
Expand Down Expand Up @@ -1812,4 +1814,90 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase4Test) {
ASSERT_TRUE(abort_timeout_txn_hit);
ASSERT_EQ(txn_id, txn_info_pb.txn_id());
}
} // namespace doris::cloud

TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) {
auto txn_kv = get_mem_txn_kv();

int64_t db_id = 5252025;
int64_t table_id = 35201043384;
int64_t index_id = 256439;
int64_t partition_id = 732536259;

auto meta_service = get_meta_service(txn_kv, true);
int64_t tablet_id = 25910248;

{
create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id,
tablet_id);
}
{
int tmp_txn_id = 0;
{
brpc::Controller cntl;
BeginTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
TxnInfoPB txn_info_pb;
txn_info_pb.set_db_id(db_id);
txn_info_pb.set_label("test_label_32ae213dasg3");
txn_info_pb.add_table_ids(table_id);
txn_info_pb.set_timeout_ms(36000);
req.mutable_txn_info()->CopyFrom(txn_info_pb);
BeginTxnResponse res;
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
tmp_txn_id = res.txn_id();
ASSERT_GT(res.txn_id(), 0);
}
{
auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
{
brpc::Controller cntl;
CommitTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_db_id(db_id);
req.set_txn_id(tmp_txn_id);
req.set_is_2pc(false);
req.set_enable_txn_lazy_commit(true);
CommitTxnResponse res;
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
}

auto* sp = SyncPoint::get_instance();
sp->set_call_back("get_rowset:meta_exceed_limit", [](auto&& args) {
auto* byte_size = try_any_cast<size_t*>(args[0]);
*byte_size = std::numeric_limits<int32_t>::max();
++(*byte_size);
});

sp->enable_processing();
{
brpc::Controller cntl;
GetRowsetRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
auto* tablet_idx = req.mutable_idx();
tablet_idx->set_table_id(table_id);
tablet_idx->set_index_id(index_id);
tablet_idx->set_partition_id(partition_id);
tablet_idx->set_tablet_id(tablet_id);
req.set_start_version(0);
req.set_end_version(-1);
req.set_cumulative_compaction_cnt(0);
req.set_base_compaction_cnt(0);
req.set_cumulative_point(2);

GetRowsetResponse res;
meta_service->get_rowset(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req,
&res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::PROTOBUF_PARSE_ERR);
}
}

} // namespace doris::cloud
Loading