Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions cloud/src/meta-service/meta_service_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,12 +599,48 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
}
if (!need_commit) return;

// update table versions
// Update table version only when deleting non-empty partitions
if (request->has_db_id()) {
std::string ver_key =
table_version_key({instance_id, request->db_id(), request->table_id()});
txn->atomic_add(ver_key, 1);
LOG_INFO("update table version").tag("ver_key", hex(ver_key));
bool need_update_table_version = false;
for (auto part_id : request->partition_ids()) {
std::string partition_ver_key = partition_version_key({
instance_id, request->db_id(), request->table_id(), part_id});
std::string ver_val_str;
err = txn->get(partition_ver_key, &ver_val_str);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get partition version, table_id=" << request->table_id()
<< " partition_id=" << part_id << " key=" << hex(partition_ver_key)
<< " err=" << err;
msg = ss.str();
LOG_WARNING(msg);
return;
}
int64_t part_version = -1;
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
// may be it is an empty partition
part_version = 1;
} else {
VersionPB pb;
if (!pb.ParseFromString(ver_val_str)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed partition version value";
LOG_WARNING(msg).tag("partition_id", part_id);
return;
}
part_version = pb.version();
}
if (part_version > 1) {
need_update_table_version = true;
break;
}
}
if (need_update_table_version) {
std::string ver_key =
table_version_key({instance_id, request->db_id(), request->table_id()});
txn->atomic_add(ver_key, 1);
LOG_INFO("update table version").tag("ver_key", hex(ver_key));
}
}

err = txn->commit();
Expand Down
48 changes: 48 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4489,6 +4489,8 @@ TEST(MetaServiceTest, PartitionRequest) {
auto partition_key = recycle_partition_key({instance_id, partition_id});
int64_t val_int = 0;
auto tbl_version_key = table_version_key({instance_id, 1, table_id});
VersionPB version_pb;
auto part_version_key = partition_version_key({instance_id, 1, table_id, partition_id});
std::string val;
// ------------Test prepare partition------------
brpc::Controller ctrl;
Expand Down Expand Up @@ -4685,6 +4687,11 @@ TEST(MetaServiceTest, PartitionRequest) {
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->atomic_add(tbl_version_key, 1);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
version_pb.set_version(100);
val = version_pb.SerializeAsString();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(part_version_key, val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
meta_service->drop_partition(&ctrl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
Expand All @@ -4701,6 +4708,11 @@ TEST(MetaServiceTest, PartitionRequest) {
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->atomic_add(tbl_version_key, 1);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
version_pb.set_version(100);
val = version_pb.SerializeAsString();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(part_version_key, val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
partition_pb.set_state(RecyclePartitionPB::PREPARED);
val = partition_pb.SerializeAsString();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
Expand All @@ -4717,11 +4729,42 @@ TEST(MetaServiceTest, PartitionRequest) {
ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK);
val_int = *reinterpret_cast<const int64_t*>(val.data());
ASSERT_EQ(val_int, 2);
// Last state PREPARED but drop an empty partition
reset_meta_service();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->atomic_add(tbl_version_key, 1);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
version_pb.set_version(1);
val = version_pb.SerializeAsString();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(part_version_key, val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
partition_pb.set_state(RecyclePartitionPB::PREPARED);
val = partition_pb.SerializeAsString();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(partition_key, val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
res.Clear();
meta_service->drop_partition(&ctrl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_OK);
ASSERT_TRUE(partition_pb.ParseFromString(val));
ASSERT_EQ(partition_pb.state(), RecyclePartitionPB::DROPPED);
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK);
val_int = *reinterpret_cast<const int64_t*>(val.data());
ASSERT_EQ(val_int, 1);
// Last state DROPPED
reset_meta_service();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->atomic_add(tbl_version_key, 1);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
version_pb.set_version(100);
val = version_pb.SerializeAsString();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(part_version_key, val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
partition_pb.set_state(RecyclePartitionPB::DROPPED);
val = partition_pb.SerializeAsString();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
Expand All @@ -4743,6 +4786,11 @@ TEST(MetaServiceTest, PartitionRequest) {
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->atomic_add(tbl_version_key, 1);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
version_pb.set_version(100);
val = version_pb.SerializeAsString();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(part_version_key, val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
partition_pb.set_state(RecyclePartitionPB::RECYCLING);
val = partition_pb.SerializeAsString();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.doris.clone.TabletScheduler;
import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.qe.SnapshotProxy;
import org.apache.doris.cloud.rpc.VersionHelper;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
Expand Down Expand Up @@ -2775,7 +2775,7 @@ private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionReque
throws RpcException {
long startAt = System.nanoTime();
try {
return SnapshotProxy.getVisibleVersion(req);
return VersionHelper.getVisibleVersion(req);
} finally {
SummaryProfile profile = getSummaryProfile();
if (profile != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.doris.catalog.Partition;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
import org.apache.doris.cloud.qe.SnapshotProxy;
import org.apache.doris.cloud.rpc.VersionHelper;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
Expand Down Expand Up @@ -328,7 +328,7 @@ private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionReque
throws RpcException {
long startAt = System.nanoTime();
try {
return SnapshotProxy.getVisibleVersion(req);
return VersionHelper.getVisibleVersion(req);
} finally {
SummaryProfile profile = getSummaryProfile();
if (profile != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud.qe;
package org.apache.doris.cloud.rpc;

import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.Config;
import org.apache.doris.rpc.RpcException;

Expand All @@ -30,8 +29,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SnapshotProxy {
private static final Logger LOG = LogManager.getLogger(SnapshotProxy.class);
public class VersionHelper {
private static final Logger LOG = LogManager.getLogger(VersionHelper.class);

public static Cloud.GetVersionResponse getVisibleVersion(Cloud.GetVersionRequest request) throws RpcException {
int tryTimes = 0;
Expand Down
2 changes: 1 addition & 1 deletion gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ message IndexRequest {
optional int64 table_id = 3;
optional int64 expiration = 4;
optional int64 db_id = 5;
optional bool is_new_table = 6;
optional bool is_new_table = 6; // For table version
}

message IndexResponse {
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/table_p0/test_table_version.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ suite("test_table_version") {
assertEquals(2, visibleVersion);

sql """
alter table ${tableNameNum} drop partition p201703_all;
alter table ${tableNameNum} drop partition p201703_all force;
"""
visibleVersion = getTableVersion(dbId,tableNameNum);
assertEquals(3, visibleVersion);
Expand Down