From 6df8a3fd3eced978c189c19fcc85f5a3bd4028c6 Mon Sep 17 00:00:00 2001 From: yujun Date: Mon, 26 Aug 2024 21:26:01 +0800 Subject: [PATCH] fix replay update replica info --- .../org/apache/doris/alter/AlterHandler.java | 3 +- .../org/apache/doris/backup/RestoreJob.java | 3 +- .../org/apache/doris/catalog/Replica.java | 44 ++++++++----------- .../apache/doris/catalog/TabletStatMgr.java | 9 ++-- .../apache/doris/clone/TabletSchedCtx.java | 6 ++- .../doris/datasource/InternalCatalog.java | 6 ++- .../apache/doris/master/ReportHandler.java | 5 ++- .../transaction/DatabaseTransactionMgr.java | 2 +- .../apache/doris/alter/RollupJobV2Test.java | 10 +---- .../doris/alter/SchemaChangeJobV2Test.java | 6 +-- .../doris/analysis/ShowReplicaTest.java | 3 +- .../org/apache/doris/catalog/ReplicaTest.java | 18 ++++---- .../clone/DiskReblanceWhenSchedulerIdle.java | 3 +- .../doris/clone/RebalancerTestUtil.java | 5 ++- .../apache/doris/clone/RepairVersionTest.java | 2 +- .../apache/doris/planner/QueryPlanTest.java | 18 +++++--- 16 files changed, 74 insertions(+), 69 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index dbc43059e5b65d..21329ad832f5ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -241,8 +241,7 @@ public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundExce task.getSignature(), replica, task.getVersion()); boolean versionChanged = false; if (replica.getVersion() < task.getVersion()) { - replica.updateVersionInfo(task.getVersion(), replica.getDataSize(), replica.getRemoteDataSize(), - replica.getRowCount()); + replica.updateVersion(task.getVersion()); versionChanged = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index e52aab9382306a..79e49a2b09c809 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1765,8 +1765,7 @@ private Status allTabletCommitted(boolean isReplay) { for (Tablet tablet : idx.getTablets()) { for (Replica replica : tablet.getReplicas()) { if (!replica.checkVersionCatchUp(part.getVisibleVersion(), false)) { - replica.updateVersionInfo(part.getVisibleVersion(), replica.getDataSize(), - replica.getRemoteDataSize(), replica.getRowCount()); + replica.updateVersion(part.getVisibleVersion()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 0d36228e999438..70dffaa16ecc14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -223,14 +223,26 @@ public long getDataSize() { return dataSize; } + public void setDataSize(long dataSize) { + this.dataSize = dataSize; + } + public long getRemoteDataSize() { return remoteDataSize; } + public void setRemoteDataSize(long remoteDataSize) { + this.remoteDataSize = remoteDataSize; + } + public long getRowCount() { return rowCount; } + public void setRowCount(long rowCount) { + this.rowCount = rowCount; + } + public long getLastFailedVersion() { return lastFailedVersion; } @@ -311,28 +323,13 @@ public void setFurtherRepairWatermarkTxnTd(long furtherRepairWatermarkTxnTd) { this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd; } - // for compatibility - public synchronized void updateStat(long dataSize, long rowNum) { - this.dataSize = dataSize; - this.rowCount = rowNum; - } - - public synchronized void updateStat(long dataSize, long remoteDataSize, long rowNum, long versionCount) { - this.dataSize = dataSize; - this.remoteDataSize = remoteDataSize; - this.rowCount = rowNum; - this.versionCount = versionCount; - } - - public synchronized void updateVersionInfo(long newVersion, long newDataSize, long newRemoteDataSize, - long newRowCount) { - updateReplicaInfo(newVersion, this.lastFailedVersion, this.lastSuccessVersion, newDataSize, newRemoteDataSize, - newRowCount); + public synchronized void updateVersion(long newVersion) { + updateReplicaVersion(newVersion, this.lastFailedVersion, this.lastSuccessVersion); } - public synchronized void updateVersionWithFailedInfo( + public synchronized void updateVersionWithFailed( long newVersion, long lastFailedVersion, long lastSuccessVersion) { - updateReplicaInfo(newVersion, lastFailedVersion, lastSuccessVersion, dataSize, remoteDataSize, rowCount); + updateReplicaVersion(newVersion, lastFailedVersion, lastSuccessVersion); } public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVersion, Long lastSuccessVersion, @@ -395,9 +392,7 @@ public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVer * the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash is 0 or some unknown number. * We just reset the LFV(hash) to recovery this replica. */ - private void updateReplicaInfo(long newVersion, - long lastFailedVersion, long lastSuccessVersion, - long newDataSize, long newRemoteDataSize, long newRowCount) { + private void updateReplicaVersion(long newVersion, long lastFailedVersion, long lastSuccessVersion) { if (LOG.isDebugEnabled()) { LOG.debug("before update: {}", this.toString()); } @@ -422,9 +417,6 @@ private void updateReplicaInfo(long newVersion, long oldLastFailedVersion = this.lastFailedVersion; this.version = newVersion; - this.dataSize = newDataSize; - this.remoteDataSize = newRemoteDataSize; - this.rowCount = newRowCount; // just check it if (lastSuccessVersion <= this.version) { @@ -485,7 +477,7 @@ private void updateReplicaInfo(long newVersion, } public synchronized void updateLastFailedVersion(long lastFailedVersion) { - updateReplicaInfo(this.version, lastFailedVersion, this.lastSuccessVersion, dataSize, remoteDataSize, rowCount); + updateReplicaVersion(this.version, lastFailedVersion, this.lastSuccessVersion); } /* diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index feae4f589fdc9c..092bf84a6617db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -144,8 +144,10 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) { Replica replica = invertedIndex.getReplica(stat.getTabletId(), beId); if (replica != null) { - replica.updateStat(stat.getDataSize(), stat.getRemoteDataSize(), stat.getRowNum(), - stat.getVersionCount()); + replica.setDataSize(stat.getDataSize()); + replica.setRemoteDataSize(stat.getRemoteDataSize()); + replica.setRowCount(stat.getRowNum()); + replica.setVersionCount(stat.getVersionCount()); } } } @@ -161,7 +163,8 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { continue; } // TODO(cmy) no db lock protected. I think it is ok even we get wrong row num - replica.updateStat(entry.getValue().getDataSize(), entry.getValue().getRowNum()); + replica.setDataSize(entry.getValue().getDataSize()); + replica.setRowCount(entry.getValue().getRowNum()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 3544da69e88001..747457ce813183 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -1136,8 +1136,10 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) "replica does not exist. backend id: " + destBackendId); } - replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(), - reportedTablet.getRemoteDataSize(), reportedTablet.getRowCount()); + replica.updateVersion(reportedTablet.getVersion()); + replica.setDataSize(reportedTablet.getDataSize()); + replica.setRemoteDataSize(reportedTablet.getRemoteDataSize()); + replica.setRowCount(reportedTablet.getRowCount()); if (replica.getLastFailedVersion() > partition.getCommittedVersion() && reportedTablet.getVersion() >= partition.getCommittedVersion() //&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss() diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 7a123e50fad0bc..d89abe348898e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1046,7 +1046,11 @@ private void unprotectUpdateReplica(OlapTable olapTable, ReplicaPersistInfo info Tablet tablet = materializedIndex.getTablet(info.getTabletId()); Replica replica = tablet.getReplicaByBackendId(info.getBackendId()); Preconditions.checkNotNull(replica, info); - replica.updateVersionInfo(info.getVersion(), info.getDataSize(), info.getRemoteDataSize(), info.getRowCount()); + replica.updateVersionWithFailed(info.getVersion(), info.getLastFailedVersion(), + info.getLastSuccessVersion()); + replica.setDataSize(info.getDataSize()); + replica.setRemoteDataSize(info.getRemoteDataSize()); + replica.setRowCount(info.getRowCount()); replica.setBad(false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 125f2553ec935e..9ead1a56ff893e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -681,7 +681,10 @@ private static void sync(Map backendTablets, ListMultimap= current version and last success version <= new version, then last success version should be updated Assert.assertEquals(3, originalReplica.getLastSuccessVersion()); Assert.assertEquals(3, originalReplica.getVersion()); @@ -155,7 +157,7 @@ public void testUpdateVersion3() { Assert.assertEquals(8, originalReplica.getLastFailedVersion()); // update last success version 10 - originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(), + originalReplica.updateVersionWithFailed(originalReplica.getVersion(), originalReplica.getLastFailedVersion(), 10); Assert.assertEquals(10, originalReplica.getLastSuccessVersion()); @@ -163,7 +165,7 @@ public void testUpdateVersion3() { Assert.assertEquals(8, originalReplica.getLastFailedVersion()); // update version to 8, the last success version and version should be 10 - originalReplica.updateVersionInfo(8, 100, 0, 78); + originalReplica.updateVersion(8); Assert.assertEquals(10, originalReplica.getLastSuccessVersion()); Assert.assertEquals(10, originalReplica.getVersion()); Assert.assertEquals(-1, originalReplica.getLastFailedVersion()); @@ -175,7 +177,7 @@ public void testUpdateVersion3() { Assert.assertEquals(12, originalReplica.getLastFailedVersion()); // update last success version to 15 - originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(), + originalReplica.updateVersionWithFailed(originalReplica.getVersion(), originalReplica.getLastFailedVersion(), 15); Assert.assertEquals(15, originalReplica.getLastSuccessVersion()); @@ -189,13 +191,13 @@ public void testUpdateVersion3() { Assert.assertEquals(18, originalReplica.getLastFailedVersion()); // update version to 17 then version and success version is 17 - originalReplica.updateVersionInfo(17, 100, 0, 78); + originalReplica.updateVersion(17L); Assert.assertEquals(17, originalReplica.getLastSuccessVersion()); Assert.assertEquals(17, originalReplica.getVersion()); Assert.assertEquals(18, originalReplica.getLastFailedVersion()); // update version to 18, then version and last success version should be 18 and failed version should be -1 - originalReplica.updateVersionInfo(18, 100, 0, 78); + originalReplica.updateVersion(18L); Assert.assertEquals(18, originalReplica.getLastSuccessVersion()); Assert.assertEquals(18, originalReplica.getVersion()); Assert.assertEquals(-1, originalReplica.getLastFailedVersion()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java index 8d65bffd10b8b5..9c32d9b0a353c6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java @@ -108,7 +108,8 @@ public void testDiskReblanceWhenSchedulerIdle() throws Exception { Lists.newArrayList(tablet.getReplicas()).forEach( replica -> { if (replica.getBackendId() == backends.get(1).getId()) { - replica.updateStat(totalCapacity / 4, 1); + replica.setDataSize(totalCapacity / 4); + replica.setRowCount(1L); tablet.deleteReplica(replica); replica.setBackendId(backends.get(0).getId()); replica.setPathHash(diskInfo0.getPathHash()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java index da03d42a644cba..2883c88d07b795 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java @@ -103,7 +103,7 @@ public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex replica.setPathHash(beIds.get(i)); if (replicaSizes != null) { // for disk rebalancer, every beId corresponding to a replicaSize - replica.updateStat(replicaSizes.get(i), 0); + replica.setDataSize(replicaSizes.get(i)); } // isRestore set true, to avoid modifying Catalog.getCurrentInvertedIndex tablet.addReplica(replica, true); @@ -164,7 +164,8 @@ public static void updateReplicaDataSize(long minReplicaSize, int tableSkew, in for (Tablet tablet : idx.getTablets()) { long tabletSize = tableBaseSize * (1 + random.nextInt(tabletSkew)); for (Replica replica : tablet.getReplicas()) { - replica.updateStat(tabletSize, 1000L); + replica.setDataSize(tabletSize); + replica.setRowCount(1000L); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java index 7564ba1d6f53b5..f182131315a9ee 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java @@ -89,7 +89,7 @@ private TableInfo prepareTableForTest(String tableName) throws Exception { long visibleVersion = 2L; partition.updateVisibleVersion(visibleVersion); partition.setNextVersion(visibleVersion + 1); - tablet.getReplicas().forEach(replica -> replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L)); + tablet.getReplicas().forEach(replica -> replica.updateVersion(visibleVersion)); Replica replica = tablet.getReplicas().iterator().next(); Assertions.assertEquals(visibleVersion, replica.getVersion()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 2e44dcec9e2aee..e56fe7c5de75ba 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -1099,7 +1099,9 @@ public void testBucketShuffleJoin() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2L); + replica.setDataSize(200000L); + replica.setRowCount(10000L); } } } @@ -1113,7 +1115,9 @@ public void testBucketShuffleJoin() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2L); + replica.setDataSize(200000L); + replica.setRowCount(10000L); } } } @@ -1197,7 +1201,9 @@ public void testJoinWithMysqlTable() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2L); + replica.setDataSize(200000L); + replica.setRowCount(10000L); } } } @@ -1227,7 +1233,7 @@ public void testJoinWithMysqlTable() throws Exception { mIndex.setRowCount(0); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 0, 0, 0); + replica.updateVersion(2L); } } } @@ -1247,7 +1253,7 @@ public void testJoinWithOdbcTable() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2L); } } } @@ -1276,7 +1282,7 @@ public void testJoinWithOdbcTable() throws Exception { mIndex.setRowCount(0); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 0, 0, 0); + replica.updateVersion(2L); } } }