diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 2cadc0efe1f70f..718d718f4fbf94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -71,6 +71,7 @@ import org.apache.doris.persist.BatchDropInfo; import org.apache.doris.persist.BatchModifyPartitionsInfo; import org.apache.doris.persist.BatchRemoveTransactionsOperation; +import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.CleanLabelOperationLog; import org.apache.doris.persist.ClusterInfo; import org.apache.doris.persist.ColocatePersistInfo; @@ -454,6 +455,11 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_BATCH_REMOVE_TXNS_V2: { + data = BatchRemoveTransactionsOperationV2.read(in); + isRead = true; + break; + } case OperationType.OP_CREATE_REPOSITORY: { data = Repository.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java new file mode 100644 index 00000000000000..0371a61bc05b8b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +// Persist the info when removing batch of expired txns +public class BatchRemoveTransactionsOperationV2 implements Writable { + + @SerializedName(value = "dbId") + private long dbId; + + @SerializedName(value = "latestTxnIdForShort") + private long latestTxnIdForShort; + + @SerializedName(value = "latestTxnIdForLong") + private long latestTxnIdForLong; + + public BatchRemoveTransactionsOperationV2(long dbId, long latestTxnIdForShort, long latestTxnIdForLong) { + this.dbId = dbId; + this.latestTxnIdForShort = latestTxnIdForShort; + this.latestTxnIdForLong = latestTxnIdForLong; + } + + public long getDbId() { + return dbId; + } + + public long getLatestTxnIdForShort() { + return latestTxnIdForShort; + } + + public long getLatestTxnIdForLong() { + return latestTxnIdForLong; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static BatchRemoveTransactionsOperationV2 read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, BatchRemoveTransactionsOperationV2.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 8d4d0212cdf0a8..fc40a2dc577566 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -557,6 +557,12 @@ public static void loadJournal(Env env, JournalEntity journal) { Env.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactions(operation); break; } + case OperationType.OP_BATCH_REMOVE_TXNS_V2: { + final BatchRemoveTransactionsOperationV2 operation = + (BatchRemoveTransactionsOperationV2) journal.getData(); + Env.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactionV2(operation); + break; + } case OperationType.OP_CREATE_REPOSITORY: { Repository repository = (Repository) journal.getData(); env.getBackupHandler().getRepoMgr().addAndInitRepoIfNotExist(repository, true); @@ -1595,8 +1601,8 @@ public void logReplaceTable(ReplaceTableOperationLog log) { logEdit(OperationType.OP_REPLACE_TABLE, log); } - public void logBatchRemoveTransactions(BatchRemoveTransactionsOperation op) { - logEdit(OperationType.OP_BATCH_REMOVE_TXNS, op); + public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2 op) { + logEdit(OperationType.OP_BATCH_REMOVE_TXNS_V2, op); } public void logModifyComment(ModifyCommentOperationLog op) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index a8dfb590786982..3b4337449c96bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -161,12 +161,15 @@ public class OperationType { //real time load 100 -108 public static final short OP_UPSERT_TRANSACTION_STATE = 100; @Deprecated - // use OP_BATCH_REMOVE_TXNS instead + // use OP_BATCH_REMOVE_TXNS_V2 instead public static final short OP_DELETE_TRANSACTION_STATE = 101; public static final short OP_FINISHING_ROLLUP = 102; public static final short OP_FINISHING_SCHEMA_CHANGE = 103; public static final short OP_SAVE_TRANSACTION_ID = 104; + @Deprecated + // use OP_BATCH_REMOVE_TXNS_V2 instead public static final short OP_BATCH_REMOVE_TXNS = 105; + public static final short OP_BATCH_REMOVE_TXNS_V2 = 106; // routine load 110~120 public static final short OP_ROUTINE_LOAD_JOB = 110; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index ed5c0cff7f2ece..29fcf2b296efdb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -46,7 +46,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.persist.BatchRemoveTransactionsOperation; +import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.EditLog; import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.AgentBatchTask; @@ -742,6 +742,33 @@ public void replayBatchRemoveTransaction(List txnIds) { } } + public void replayBatchRemoveTransaction(BatchRemoveTransactionsOperationV2 operation) { + writeLock(); + try { + if (operation.getLatestTxnIdForShort() != -1) { + while (!finalStatusTransactionStateDequeShort.isEmpty()) { + TransactionState transactionState = finalStatusTransactionStateDequeShort.pop(); + clearTransactionState(transactionState.getTransactionId()); + if (operation.getLatestTxnIdForShort() == transactionState.getTransactionId()) { + break; + } + } + } + + if (operation.getLatestTxnIdForLong() != -1) { + while (!finalStatusTransactionStateDequeLong.isEmpty()) { + TransactionState transactionState = finalStatusTransactionStateDequeLong.pop(); + clearTransactionState(transactionState.getTransactionId()); + if (operation.getLatestTxnIdForLong() == transactionState.getTransactionId()) { + break; + } + } + } + } finally { + writeUnlock(); + } + } + public TransactionStatus getLabelState(String label) { readLock(); try { @@ -1368,23 +1395,21 @@ protected List> getPartitionTransInfo(long txnId, long tableId) } public void removeExpiredTxns(long currentMillis) { - List expiredTxnIds = Lists.newArrayList(); // delete expired txns - int leftNum = MAX_REMOVE_TXN_PER_ROUND; writeLock(); try { - leftNum = unprotectedRemoveExpiredTxns(currentMillis, expiredTxnIds, - finalStatusTransactionStateDequeShort, leftNum); - leftNum = unprotectedRemoveExpiredTxns(currentMillis, expiredTxnIds, - finalStatusTransactionStateDequeLong, leftNum); - - if (!expiredTxnIds.isEmpty()) { - Map> dbExpiredTxnIds = Maps.newHashMap(); - dbExpiredTxnIds.put(dbId, expiredTxnIds); - BatchRemoveTransactionsOperation op = new BatchRemoveTransactionsOperation(dbExpiredTxnIds); + Pair expiredTxnsInfoForShort = unprotectedRemoveExpiredTxns(currentMillis, + finalStatusTransactionStateDequeShort, MAX_REMOVE_TXN_PER_ROUND); + Pair expiredTxnsInfoForLong = unprotectedRemoveExpiredTxns(currentMillis, + finalStatusTransactionStateDequeLong, + MAX_REMOVE_TXN_PER_ROUND - expiredTxnsInfoForShort.second); + int numOfClearedTransaction = expiredTxnsInfoForShort.second + expiredTxnsInfoForLong.second; + if (numOfClearedTransaction > 0) { + BatchRemoveTransactionsOperationV2 op = new BatchRemoveTransactionsOperationV2(dbId, + expiredTxnsInfoForShort.first, expiredTxnsInfoForLong.first); editLog.logBatchRemoveTransactions(op); if (LOG.isDebugEnabled()) { - LOG.debug("Remove {} expired transactions", MAX_REMOVE_TXN_PER_ROUND - leftNum); + LOG.debug("Remove {} expired transactions", numOfClearedTransaction); } } } finally { @@ -1392,22 +1417,22 @@ public void removeExpiredTxns(long currentMillis) { } } - private int unprotectedRemoveExpiredTxns(long currentMillis, List expiredTxnIds, - ArrayDeque finalStatusTransactionStateDequeShort, - int maxNumber) { - int left = maxNumber; - while (!finalStatusTransactionStateDequeShort.isEmpty() && left > 0) { - TransactionState transactionState = finalStatusTransactionStateDequeShort.getFirst(); + private Pair unprotectedRemoveExpiredTxns(long currentMillis, + ArrayDeque finalStatusTransactionStateDeque, int left) { + long latestTxnId = -1; + int numOfClearedTransaction = 0; + while (!finalStatusTransactionStateDeque.isEmpty() && numOfClearedTransaction < left) { + TransactionState transactionState = finalStatusTransactionStateDeque.getFirst(); if (transactionState.isExpired(currentMillis)) { - finalStatusTransactionStateDequeShort.pop(); + finalStatusTransactionStateDeque.pop(); clearTransactionState(transactionState.getTransactionId()); - expiredTxnIds.add(transactionState.getTransactionId()); - left--; + latestTxnId = transactionState.getTransactionId(); + numOfClearedTransaction++; } else { break; } } - return left; + return Pair.of(latestTxnId, numOfClearedTransaction); } private void clearTransactionState(long txnId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 3c8405678944fc..18bf57bb19462d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -35,6 +35,7 @@ import org.apache.doris.metric.GaugeMetricImpl; import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.BatchRemoveTransactionsOperation; +import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.EditLog; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TUniqueId; @@ -441,6 +442,15 @@ public void replayBatchRemoveTransactions(BatchRemoveTransactionsOperation opera } } + public void replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 operation) { + try { + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(operation.getDbId()); + dbTransactionMgr.replayBatchRemoveTransaction(operation); + } catch (AnalysisException e) { + LOG.warn("replay batch remove transactions failed. db " + operation.getDbId(), e); + } + } + public List> getDbInfo() { List> infos = new ArrayList>(); List dbIds = Lists.newArrayList(dbIdToDatabaseTransactionMgrs.keySet()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java index bffcc9c4f54e11..08800510a1a6ef 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java @@ -20,7 +20,7 @@ import org.apache.doris.alter.AlterJobV2; import org.apache.doris.alter.BatchAlterJobPersistInfo; import org.apache.doris.cluster.Cluster; -import org.apache.doris.persist.BatchRemoveTransactionsOperation; +import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.EditLog; import org.apache.doris.persist.ModifyTablePropertyOperationLog; import org.apache.doris.persist.RoutineLoadOperation; @@ -91,7 +91,7 @@ public void logDynamicPartition(ModifyTablePropertyOperationLog info) { } @Mock - public void logBatchRemoveTransactions(BatchRemoveTransactionsOperation info) { + public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2 info) { }