From dd23402c5c361797184f5093b1cbb358c347816e Mon Sep 17 00:00:00 2001 From: lamb Date: Thu, 28 Mar 2024 15:44:59 +0800 Subject: [PATCH 1/7] fix test --- regression-test/suites/table_p0/test_table_version.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/table_p0/test_table_version.groovy b/regression-test/suites/table_p0/test_table_version.groovy index 250dcf96cbbcb3..3b60a49b1c4807 100644 --- a/regression-test/suites/table_p0/test_table_version.groovy +++ b/regression-test/suites/table_p0/test_table_version.groovy @@ -49,7 +49,7 @@ suite("test_table_version") { assertEquals(2, visibleVersion); sql """ - alter table ${tableNameNum} drop partition p201703_all; + alter table ${tableNameNum} drop partition p201703_all force; """ visibleVersion = getTableVersion(dbId,tableNameNum); assertEquals(3, visibleVersion); From 118a3f9dc13b9e716ad26c40087d4dee0be96100 Mon Sep 17 00:00:00 2001 From: lamb Date: Thu, 28 Mar 2024 20:59:54 +0800 Subject: [PATCH 2/7] save --- .../meta-service/meta_service_partition.cpp | 44 +++++++++++++++-- cloud/test/meta_service_test.cpp | 48 +++++++++++++++++++ .../org/apache/doris/catalog/OlapTable.java | 4 +- .../doris/cloud/catalog/CloudPartition.java | 4 +- .../VersionHelper.java} | 7 ++- gensrc/proto/cloud.proto | 2 +- 6 files changed, 95 insertions(+), 14 deletions(-) rename fe/fe-core/src/main/java/org/apache/doris/cloud/{qe/SnapshotProxy.java => rpc/VersionHelper.java} (95%) diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index b3f610c0888f17..1d4547667d8f18 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -599,12 +599,46 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll } if (!need_commit) return; - // update table versions + // Update table version only when deleting non-empty partitions if (request->has_db_id()) { - std::string ver_key = - table_version_key({instance_id, request->db_id(), request->table_id()}); - txn->atomic_add(ver_key, 1); - LOG_INFO("update table version").tag("ver_key", hex(ver_key)); + bool needUpdateTableVersion = false; + for (auto part_id : request->partition_ids()) { + std::string partition_ver_key = partition_version_key({ + instance_id, request->db_id(), request->table_id(), part_id}); + std::string ver_val_str; + err = txn->get(partition_ver_key, &ver_val_str); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + code = cast_as(err); + ss << "failed to get partition version, table_id=" << request->table_id() + << "partition_id=" << part_id << " key=" << hex(partition_ver_key); + msg = ss.str(); + return; + } + int64_t part_version = -1; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + // may be it is an empty partition + part_version = 1; + } else { + VersionPB pb; + if (!pb.ParseFromString(ver_val_str)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "malformed partition version value"; + LOG_WARNING(msg).tag("partition_id", part_id); + return; + } + part_version = pb.version(); + } + if (part_version > 1) { + needUpdateTableVersion = true; + break; + } + } + if (needUpdateTableVersion) { + std::string ver_key = + table_version_key({instance_id, request->db_id(), request->table_id()}); + txn->atomic_add(table_ver_key, 1); + LOG_INFO("update table version").tag("ver_key", hex(ver_key)); + } } err = txn->commit(); diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index a9b40ad15436bc..536eedfa197640 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -4489,6 +4489,8 @@ TEST(MetaServiceTest, PartitionRequest) { auto partition_key = recycle_partition_key({instance_id, partition_id}); int64_t val_int = 0; auto tbl_version_key = table_version_key({instance_id, 1, table_id}); + VersionPB version_pb; + auto part_version_key = partition_version_key({instance_id, 1, table_id, partition_id}); std::string val; // ------------Test prepare partition------------ brpc::Controller ctrl; @@ -4685,6 +4687,11 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->atomic_add(tbl_version_key, 1); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + version_pb.set_version(100); + val = version_pb.SerializeAsString(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->put(part_version_key, val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); meta_service->drop_partition(&ctrl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4701,6 +4708,11 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->atomic_add(tbl_version_key, 1); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + version_pb.set_version(100); + val = version_pb.SerializeAsString(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->put(part_version_key, val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::PREPARED); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4717,11 +4729,42 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK); val_int = *reinterpret_cast(val.data()); ASSERT_EQ(val_int, 2); + // Last state PREPARED but drop an empty partition + reset_meta_service(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->atomic_add(tbl_version_key, 1); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + version_pb.set_version(1); + val = version_pb.SerializeAsString(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->put(part_version_key, val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + partition_pb.set_state(RecyclePartitionPB::PREPARED); + val = partition_pb.SerializeAsString(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->put(partition_key, val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + res.Clear(); + meta_service->drop_partition(&ctrl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_OK); + ASSERT_TRUE(partition_pb.ParseFromString(val)); + ASSERT_EQ(partition_pb.state(), RecyclePartitionPB::DROPPED); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK); + val_int = *reinterpret_cast(val.data()); + ASSERT_EQ(val_int, 1); // Last state DROPPED reset_meta_service(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->atomic_add(tbl_version_key, 1); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + version_pb.set_version(100); + val = version_pb.SerializeAsString(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->put(part_version_key, val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::DROPPED); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4743,6 +4786,11 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->atomic_add(tbl_version_key, 1); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + version_pb.set_version(100); + val = version_pb.SerializeAsString(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->put(part_version_key, val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::RECYCLING); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 6243912204d477..d06212e6959ef4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -38,7 +38,7 @@ import org.apache.doris.clone.TabletScheduler; import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.cloud.proto.Cloud; -import org.apache.doris.cloud.qe.SnapshotProxy; +import org.apache.doris.cloud.rpc.VersionHelper; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -2775,7 +2775,7 @@ private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionReque throws RpcException { long startAt = System.nanoTime(); try { - return SnapshotProxy.getVisibleVersion(req); + return VersionHelper.getVisibleVersion(req); } finally { SummaryProfile profile = getSummaryProfile(); if (profile != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java index 8f72415e1de1c5..0f613b6c442d61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java @@ -22,7 +22,7 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.cloud.proto.Cloud.MetaServiceCode; -import org.apache.doris.cloud.qe.SnapshotProxy; +import org.apache.doris.cloud.rpc.VersionHelper; import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; @@ -328,7 +328,7 @@ private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionReque throws RpcException { long startAt = System.nanoTime(); try { - return SnapshotProxy.getVisibleVersion(req); + return VersionHelper.getVisibleVersion(req); } finally { SummaryProfile profile = getSummaryProfile(); if (profile != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/SnapshotProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/VersionHelper.java similarity index 95% rename from fe/fe-core/src/main/java/org/apache/doris/cloud/qe/SnapshotProxy.java rename to fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/VersionHelper.java index 86bc19ecbeceb9..cf5ccceef23b09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/SnapshotProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/VersionHelper.java @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.cloud.qe; +package org.apache.doris.cloud.rpc; import org.apache.doris.cloud.proto.Cloud; -import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.common.Config; import org.apache.doris.rpc.RpcException; @@ -30,8 +29,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -public class SnapshotProxy { - private static final Logger LOG = LogManager.getLogger(SnapshotProxy.class); +public class VersionHelper { + private static final Logger LOG = LogManager.getLogger(VersionHelper.class); public static Cloud.GetVersionResponse getVisibleVersion(Cloud.GetVersionRequest request) throws RpcException { int tryTimes = 0; diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index f9080a978b10d8..7cc9b68dbb53ea 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -872,7 +872,7 @@ message IndexRequest { optional int64 table_id = 3; optional int64 expiration = 4; optional int64 db_id = 5; - optional bool is_new_table = 6; + optional bool is_new_table = 6; // For table version } message IndexResponse { From 94bd5dde094e419de374081a3a8087277e632199 Mon Sep 17 00:00:00 2001 From: lamb Date: Sun, 31 Mar 2024 20:28:41 +0800 Subject: [PATCH 3/7] save --- cloud/src/meta-service/meta_service_partition.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index 1d4547667d8f18..a8d23ebbd773e9 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -612,6 +612,7 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll ss << "failed to get partition version, table_id=" << request->table_id() << "partition_id=" << part_id << " key=" << hex(partition_ver_key); msg = ss.str(); + LOG_WARNING(msg); return; } int64_t part_version = -1; From c714e99d0dcb9e74fd4639fa149dd6c12156a704 Mon Sep 17 00:00:00 2001 From: lamb Date: Sun, 31 Mar 2024 20:39:06 +0800 Subject: [PATCH 4/7] print code in msg --- cloud/src/meta-service/meta_service_partition.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index a8d23ebbd773e9..40d04de13d73d9 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -610,7 +610,8 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { code = cast_as(err); ss << "failed to get partition version, table_id=" << request->table_id() - << "partition_id=" << part_id << " key=" << hex(partition_ver_key); + << " partition_id=" << part_id << " key=" << hex(partition_ver_key) + << " code=" << code; msg = ss.str(); LOG_WARNING(msg); return; From a20c9b1b2f26d815351a658c1d8a9bb959cd08d6 Mon Sep 17 00:00:00 2001 From: lamb Date: Mon, 8 Apr 2024 11:05:49 +0800 Subject: [PATCH 5/7] save --- cloud/src/meta-service/meta_service_partition.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index 40d04de13d73d9..e784ee5a5cad3b 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -601,7 +601,7 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll // Update table version only when deleting non-empty partitions if (request->has_db_id()) { - bool needUpdateTableVersion = false; + bool need_update_table_version = false; for (auto part_id : request->partition_ids()) { std::string partition_ver_key = partition_version_key({ instance_id, request->db_id(), request->table_id(), part_id}); @@ -631,11 +631,11 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll part_version = pb.version(); } if (part_version > 1) { - needUpdateTableVersion = true; + need_update_table_version = true; break; } } - if (needUpdateTableVersion) { + if (need_update_table_version) { std::string ver_key = table_version_key({instance_id, request->db_id(), request->table_id()}); txn->atomic_add(table_ver_key, 1); From 41adcae268db8ccb38287be1dbbda8d52e405d15 Mon Sep 17 00:00:00 2001 From: lamb Date: Mon, 8 Apr 2024 13:30:23 +0800 Subject: [PATCH 6/7] save --- cloud/src/meta-service/meta_service_partition.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index e784ee5a5cad3b..6603028725e901 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -611,7 +611,7 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll code = cast_as(err); ss << "failed to get partition version, table_id=" << request->table_id() << " partition_id=" << part_id << " key=" << hex(partition_ver_key) - << " code=" << code; + << " err=" << err; msg = ss.str(); LOG_WARNING(msg); return; From e16c82991cfe1682ad4e77d197b5fd04c6197dce Mon Sep 17 00:00:00 2001 From: lamb Date: Mon, 8 Apr 2024 15:02:58 +0800 Subject: [PATCH 7/7] save --- cloud/src/meta-service/meta_service_partition.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index 6603028725e901..cfac793ff2fef0 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -638,7 +638,7 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll if (need_update_table_version) { std::string ver_key = table_version_key({instance_id, request->db_id(), request->table_id()}); - txn->atomic_add(table_ver_key, 1); + txn->atomic_add(ver_key, 1); LOG_INFO("update table version").tag("ver_key", hex(ver_key)); } }