From 51d23b8bef3eaa3a9e017df562a021607ce30a50 Mon Sep 17 00:00:00 2001 From: caiconghui1 Date: Sat, 4 Mar 2023 14:53:31 +0800 Subject: [PATCH 1/3] [enhancement](transaction) Reduce hold writeLock time for DatabaseTransactionMgr to clear transaction --- .../apache/doris/journal/JournalEntity.java | 6 ++ .../BatchRemoveTransactionsOperationV2.java | 79 +++++++++++++++++++ .../org/apache/doris/persist/EditLog.java | 10 ++- .../apache/doris/persist/OperationType.java | 5 +- .../transaction/DatabaseTransactionMgr.java | 75 ++++++++++++------ .../transaction/GlobalTransactionMgr.java | 10 +++ 6 files changed, 159 insertions(+), 26 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 2cadc0efe1f70f..718d718f4fbf94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -71,6 +71,7 @@ import org.apache.doris.persist.BatchDropInfo; import org.apache.doris.persist.BatchModifyPartitionsInfo; import org.apache.doris.persist.BatchRemoveTransactionsOperation; +import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.CleanLabelOperationLog; import org.apache.doris.persist.ClusterInfo; import org.apache.doris.persist.ColocatePersistInfo; @@ -454,6 +455,11 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_BATCH_REMOVE_TXNS_V2: { + data = BatchRemoveTransactionsOperationV2.read(in); + isRead = true; + break; + } case OperationType.OP_CREATE_REPOSITORY: { data = Repository.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java new file mode 100644 index 00000000000000..ce42fa814758de --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +// Persist the info when removing batch of expired txns +public class BatchRemoveTransactionsOperationV2 implements Writable { + + @SerializedName(value = "dbId") + private long dbId; + + @SerializedName(value = "latestTxnIdForShort") + private long latestTxnIdForShort; + + @SerializedName(value = "latestTxnIdForLong") + private long latestTxnIdForLong; + + @SerializedName(value = "numOfClearedTransaction") + private int numOfClearedTransaction; + + public BatchRemoveTransactionsOperationV2(long dbId, long latestTxnIdForShort, long latestTxnIdForLong, + int numOfClearedTransaction) { + this.dbId = dbId; + this.latestTxnIdForShort = latestTxnIdForShort; + this.latestTxnIdForLong = latestTxnIdForLong; + this.numOfClearedTransaction = numOfClearedTransaction; + } + + public long getDbId() { + return dbId; + } + + public long getLatestTxnIdForShort() { + return latestTxnIdForShort; + } + + public long getLatestTxnIdForLong() { + return latestTxnIdForLong; + } + + public int getNumOfClearedTransaction() { + return numOfClearedTransaction; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static BatchRemoveTransactionsOperationV2 read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, BatchRemoveTransactionsOperationV2.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 8d4d0212cdf0a8..fc40a2dc577566 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -557,6 +557,12 @@ public static void loadJournal(Env env, JournalEntity journal) { Env.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactions(operation); break; } + case OperationType.OP_BATCH_REMOVE_TXNS_V2: { + final BatchRemoveTransactionsOperationV2 operation = + (BatchRemoveTransactionsOperationV2) journal.getData(); + Env.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactionV2(operation); + break; + } case OperationType.OP_CREATE_REPOSITORY: { Repository repository = (Repository) journal.getData(); env.getBackupHandler().getRepoMgr().addAndInitRepoIfNotExist(repository, true); @@ -1595,8 +1601,8 @@ public void logReplaceTable(ReplaceTableOperationLog log) { logEdit(OperationType.OP_REPLACE_TABLE, log); } - public void logBatchRemoveTransactions(BatchRemoveTransactionsOperation op) { - logEdit(OperationType.OP_BATCH_REMOVE_TXNS, op); + public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2 op) { + logEdit(OperationType.OP_BATCH_REMOVE_TXNS_V2, op); } public void logModifyComment(ModifyCommentOperationLog op) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index a8dfb590786982..3b4337449c96bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -161,12 +161,15 @@ public class OperationType { //real time load 100 -108 public static final short OP_UPSERT_TRANSACTION_STATE = 100; @Deprecated - // use OP_BATCH_REMOVE_TXNS instead + // use OP_BATCH_REMOVE_TXNS_V2 instead public static final short OP_DELETE_TRANSACTION_STATE = 101; public static final short OP_FINISHING_ROLLUP = 102; public static final short OP_FINISHING_SCHEMA_CHANGE = 103; public static final short OP_SAVE_TRANSACTION_ID = 104; + @Deprecated + // use OP_BATCH_REMOVE_TXNS_V2 instead public static final short OP_BATCH_REMOVE_TXNS = 105; + public static final short OP_BATCH_REMOVE_TXNS_V2 = 106; // routine load 110~120 public static final short OP_ROUTINE_LOAD_JOB = 110; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index ed5c0cff7f2ece..033ed4de2fa39d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -46,7 +46,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.persist.BatchRemoveTransactionsOperation; +import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.EditLog; import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.AgentBatchTask; @@ -742,6 +742,37 @@ public void replayBatchRemoveTransaction(List txnIds) { } } + public void replayBatchRemoveTransaction(BatchRemoveTransactionsOperationV2 operation) { + writeLock(); + try { + int numOfClearedTransaction = 0; + if (operation.getLatestTxnIdForShort() != -1) { + while (!finalStatusTransactionStateDequeShort.isEmpty()) { + TransactionState transactionState = finalStatusTransactionStateDequeShort.pop(); + clearTransactionState(transactionState.getTransactionId()); + numOfClearedTransaction++; + if (operation.getLatestTxnIdForShort() == transactionState.getTransactionId()) { + break; + } + } + } + + if (operation.getLatestTxnIdForLong() != -1) { + while (!finalStatusTransactionStateDequeLong.isEmpty()) { + TransactionState transactionState = finalStatusTransactionStateDequeLong.pop(); + clearTransactionState(transactionState.getTransactionId()); + numOfClearedTransaction++; + if (operation.getLatestTxnIdForLong() == transactionState.getTransactionId()) { + break; + } + } + } + Preconditions.checkState(numOfClearedTransaction == operation.getNumOfClearedTransaction()); + } finally { + writeUnlock(); + } + } + public TransactionStatus getLabelState(String label) { readLock(); try { @@ -1368,23 +1399,21 @@ protected List> getPartitionTransInfo(long txnId, long tableId) } public void removeExpiredTxns(long currentMillis) { - List expiredTxnIds = Lists.newArrayList(); // delete expired txns - int leftNum = MAX_REMOVE_TXN_PER_ROUND; writeLock(); try { - leftNum = unprotectedRemoveExpiredTxns(currentMillis, expiredTxnIds, - finalStatusTransactionStateDequeShort, leftNum); - leftNum = unprotectedRemoveExpiredTxns(currentMillis, expiredTxnIds, - finalStatusTransactionStateDequeLong, leftNum); - - if (!expiredTxnIds.isEmpty()) { - Map> dbExpiredTxnIds = Maps.newHashMap(); - dbExpiredTxnIds.put(dbId, expiredTxnIds); - BatchRemoveTransactionsOperation op = new BatchRemoveTransactionsOperation(dbExpiredTxnIds); + Pair expiredTxnsInfoForShort = unprotectedRemoveExpiredTxns(currentMillis, + finalStatusTransactionStateDequeShort, MAX_REMOVE_TXN_PER_ROUND); + Pair expiredTxnsInfoForLong = unprotectedRemoveExpiredTxns(currentMillis, + finalStatusTransactionStateDequeLong, + MAX_REMOVE_TXN_PER_ROUND - expiredTxnsInfoForShort.second); + int numOfClearedTransaction = expiredTxnsInfoForShort.second + expiredTxnsInfoForLong.second; + if (numOfClearedTransaction > 0) { + BatchRemoveTransactionsOperationV2 op = new BatchRemoveTransactionsOperationV2(dbId, + expiredTxnsInfoForShort.first, expiredTxnsInfoForLong.first, numOfClearedTransaction); editLog.logBatchRemoveTransactions(op); if (LOG.isDebugEnabled()) { - LOG.debug("Remove {} expired transactions", MAX_REMOVE_TXN_PER_ROUND - leftNum); + LOG.debug("Remove {} expired transactions", numOfClearedTransaction); } } } finally { @@ -1392,22 +1421,22 @@ public void removeExpiredTxns(long currentMillis) { } } - private int unprotectedRemoveExpiredTxns(long currentMillis, List expiredTxnIds, - ArrayDeque finalStatusTransactionStateDequeShort, - int maxNumber) { - int left = maxNumber; - while (!finalStatusTransactionStateDequeShort.isEmpty() && left > 0) { - TransactionState transactionState = finalStatusTransactionStateDequeShort.getFirst(); + private Pair unprotectedRemoveExpiredTxns(long currentMillis, + ArrayDeque finalStatusTransactionStateDeque, int left) { + long latestTxnId = -1; + int numOfClearedTransaction = 0; + while (!finalStatusTransactionStateDeque.isEmpty() && numOfClearedTransaction < left) { + TransactionState transactionState = finalStatusTransactionStateDeque.getFirst(); if (transactionState.isExpired(currentMillis)) { - finalStatusTransactionStateDequeShort.pop(); + finalStatusTransactionStateDeque.pop(); clearTransactionState(transactionState.getTransactionId()); - expiredTxnIds.add(transactionState.getTransactionId()); - left--; + latestTxnId = transactionState.getTransactionId(); + numOfClearedTransaction++; } else { break; } } - return left; + return Pair.of(latestTxnId, numOfClearedTransaction); } private void clearTransactionState(long txnId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 3c8405678944fc..18bf57bb19462d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -35,6 +35,7 @@ import org.apache.doris.metric.GaugeMetricImpl; import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.BatchRemoveTransactionsOperation; +import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.EditLog; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TUniqueId; @@ -441,6 +442,15 @@ public void replayBatchRemoveTransactions(BatchRemoveTransactionsOperation opera } } + public void replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 operation) { + try { + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(operation.getDbId()); + dbTransactionMgr.replayBatchRemoveTransaction(operation); + } catch (AnalysisException e) { + LOG.warn("replay batch remove transactions failed. db " + operation.getDbId(), e); + } + } + public List> getDbInfo() { List> infos = new ArrayList>(); List dbIds = Lists.newArrayList(dbIdToDatabaseTransactionMgrs.keySet()); From 1ca965d282da6f13fcc1fed95450599cb6574f85 Mon Sep 17 00:00:00 2001 From: caiconghui1 Date: Sat, 4 Mar 2023 20:33:33 +0800 Subject: [PATCH 2/3] fix ut --- .../src/test/java/org/apache/doris/catalog/FakeEditLog.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java index bffcc9c4f54e11..08800510a1a6ef 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java @@ -20,7 +20,7 @@ import org.apache.doris.alter.AlterJobV2; import org.apache.doris.alter.BatchAlterJobPersistInfo; import org.apache.doris.cluster.Cluster; -import org.apache.doris.persist.BatchRemoveTransactionsOperation; +import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.EditLog; import org.apache.doris.persist.ModifyTablePropertyOperationLog; import org.apache.doris.persist.RoutineLoadOperation; @@ -91,7 +91,7 @@ public void logDynamicPartition(ModifyTablePropertyOperationLog info) { } @Mock - public void logBatchRemoveTransactions(BatchRemoveTransactionsOperation info) { + public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2 info) { } From 17adbb946807b742017f26487f10066da0b51b75 Mon Sep 17 00:00:00 2001 From: caiconghui1 Date: Sun, 5 Mar 2023 17:33:49 +0800 Subject: [PATCH 3/3] remove unnessary field for remove txn bdbje log --- .../persist/BatchRemoveTransactionsOperationV2.java | 11 +---------- .../doris/transaction/DatabaseTransactionMgr.java | 6 +----- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java index ce42fa814758de..0371a61bc05b8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java @@ -39,15 +39,10 @@ public class BatchRemoveTransactionsOperationV2 implements Writable { @SerializedName(value = "latestTxnIdForLong") private long latestTxnIdForLong; - @SerializedName(value = "numOfClearedTransaction") - private int numOfClearedTransaction; - - public BatchRemoveTransactionsOperationV2(long dbId, long latestTxnIdForShort, long latestTxnIdForLong, - int numOfClearedTransaction) { + public BatchRemoveTransactionsOperationV2(long dbId, long latestTxnIdForShort, long latestTxnIdForLong) { this.dbId = dbId; this.latestTxnIdForShort = latestTxnIdForShort; this.latestTxnIdForLong = latestTxnIdForLong; - this.numOfClearedTransaction = numOfClearedTransaction; } public long getDbId() { @@ -62,10 +57,6 @@ public long getLatestTxnIdForLong() { return latestTxnIdForLong; } - public int getNumOfClearedTransaction() { - return numOfClearedTransaction; - } - @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 033ed4de2fa39d..29fcf2b296efdb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -745,12 +745,10 @@ public void replayBatchRemoveTransaction(List txnIds) { public void replayBatchRemoveTransaction(BatchRemoveTransactionsOperationV2 operation) { writeLock(); try { - int numOfClearedTransaction = 0; if (operation.getLatestTxnIdForShort() != -1) { while (!finalStatusTransactionStateDequeShort.isEmpty()) { TransactionState transactionState = finalStatusTransactionStateDequeShort.pop(); clearTransactionState(transactionState.getTransactionId()); - numOfClearedTransaction++; if (operation.getLatestTxnIdForShort() == transactionState.getTransactionId()) { break; } @@ -761,13 +759,11 @@ public void replayBatchRemoveTransaction(BatchRemoveTransactionsOperationV2 oper while (!finalStatusTransactionStateDequeLong.isEmpty()) { TransactionState transactionState = finalStatusTransactionStateDequeLong.pop(); clearTransactionState(transactionState.getTransactionId()); - numOfClearedTransaction++; if (operation.getLatestTxnIdForLong() == transactionState.getTransactionId()) { break; } } } - Preconditions.checkState(numOfClearedTransaction == operation.getNumOfClearedTransaction()); } finally { writeUnlock(); } @@ -1410,7 +1406,7 @@ public void removeExpiredTxns(long currentMillis) { int numOfClearedTransaction = expiredTxnsInfoForShort.second + expiredTxnsInfoForLong.second; if (numOfClearedTransaction > 0) { BatchRemoveTransactionsOperationV2 op = new BatchRemoveTransactionsOperationV2(dbId, - expiredTxnsInfoForShort.first, expiredTxnsInfoForLong.first, numOfClearedTransaction); + expiredTxnsInfoForShort.first, expiredTxnsInfoForLong.first); editLog.logBatchRemoveTransactions(op); if (LOG.isDebugEnabled()) { LOG.debug("Remove {} expired transactions", numOfClearedTransaction);