From 3d35bed88aa8fbc7f14df32fc69002bbe8243d0b Mon Sep 17 00:00:00 2001 From: Kang Date: Fri, 20 Oct 2023 10:20:19 -0500 Subject: [PATCH] Revert "[improvement](sync version) fe sync version with be (#25236) (#25574)" This reverts commit 66974a89e79d2b2806ebe7910b0d656847af3ee4. --- .../org/apache/doris/catalog/Replica.java | 36 +--- .../doris/catalog/TabletInvertedIndex.java | 22 +-- .../apache/doris/clone/TabletSchedCtx.java | 7 - .../org/apache/doris/master/MasterImpl.java | 9 +- .../apache/doris/master/ReportHandler.java | 42 +---- .../apache/doris/clone/RepairVersionTest.java | 176 ------------------ .../doris/utframe/TestWithFeService.java | 2 +- 7 files changed, 20 insertions(+), 274 deletions(-) delete mode 100644 fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 631f2ebaf3ba05..e55eab8939216f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -20,7 +20,6 @@ import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.thrift.TUniqueId; import com.google.gson.annotations.SerializedName; @@ -115,14 +114,6 @@ public enum ReplicaStatus { private TUniqueId cooldownMetaId; private long cooldownTerm = -1; - // A replica version should increase monotonically, - // but backend may missing some versions due to disk failure or bugs. - // FE should found these and mark the replica as missing versions. - // If backend's report version < fe version, record the backend's report version as `regressiveVersion`, - // and if time exceed 5min, fe should mark this replica as missing versions. - private long regressiveVersion = -1; - private long regressiveVersionTimestamp = 0; - /* * This can happen when this replica is created by a balance clone task, and * when task finished, the version of this replica is behind the partition's visible version. @@ -444,9 +435,9 @@ private void updateReplicaInfo(long newVersion, if (lastFailedVersion != this.lastFailedVersion) { // Case 2: - if (lastFailedVersion > this.lastFailedVersion || lastFailedVersion < 0) { + if (lastFailedVersion > this.lastFailedVersion) { this.lastFailedVersion = lastFailedVersion; - this.lastFailedTimestamp = lastFailedVersion > 0 ? System.currentTimeMillis() : -1L; + this.lastFailedTimestamp = System.currentTimeMillis(); } this.lastSuccessVersion = this.version; @@ -515,6 +506,10 @@ public boolean checkVersionCatchUp(long expectedVersion, boolean ignoreAlter) { return true; } + public void setLastFailedVersion(long lastFailedVersion) { + this.lastFailedVersion = lastFailedVersion; + } + public void setState(ReplicaState replicaState) { this.state = replicaState; } @@ -539,25 +534,6 @@ public void setVersionCount(long versionCount) { this.versionCount = versionCount; } - public boolean checkVersionRegressive(long newVersion) { - if (newVersion >= version) { - regressiveVersion = -1; - regressiveVersionTimestamp = -1; - return false; - } - - if (DebugPointUtil.isEnable("Replica.regressive_version_immediately")) { - return true; - } - - if (newVersion != regressiveVersion) { - regressiveVersion = newVersion; - regressiveVersionTimestamp = System.currentTimeMillis(); - } - - return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 * 60 * 1000L; - } - @Override public String toString() { StringBuilder strBuffer = new StringBuilder("[replicaId="); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index a2d5983aac41c9..2b601f9f0306a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -390,22 +390,10 @@ private boolean needSync(Replica replicaInFe, TTabletInfo backendTabletInfo) { if (backendTabletInfo.getVersion() > versionInFe) { // backend replica's version is larger or newer than replica in FE, sync it. return true; - } else if (versionInFe == backendTabletInfo.getVersion()) { + } else if (versionInFe == backendTabletInfo.getVersion() && replicaInFe.isBad()) { // backend replica's version is equal to replica in FE, but replica in FE is bad, // while backend replica is good, sync it - if (replicaInFe.isBad()) { - return true; - } - - // FE' s replica last failed version > partition's committed version - // this can be occur when be report miss version, fe will set last failed version = visible version + 1 - // then last failed version may greater than partition's committed version - // - // But here cannot got variable partition, we just check lastFailedVersion = version + 1, - // In ReportHandler.sync, we will check if last failed version > partition's committed version again. - if (replicaInFe.getLastFailedVersion() == versionInFe + 1) { - return true; - } + return true; } return false; @@ -513,12 +501,6 @@ private boolean needRecover(Replica replicaInFe, int schemaHashInFe, TTabletInfo // so we only return true if version_miss is true. return true; } - - // backend versions regressive due to bugs - if (replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) { - return true; - } - return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 3f52210e8f1324..b4667f80696d75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -1074,13 +1074,6 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(), reportedTablet.getDataSize(), reportedTablet.getRowCount()); - if (replica.getLastFailedVersion() > partition.getCommittedVersion() - && reportedTablet.getVersion() >= partition.getCommittedVersion() - //&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss() - && !(reportedTablet.isSetUsed() && !reportedTablet.isUsed())) { - LOG.info("change replica {} of tablet {} 's last failed version to -1", replica, tabletId); - replica.updateLastFailedVersion(-1L); - } if (reportedTablet.isSetPathHash()) { replica.setPathHash(reportedTablet.getPathHash()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 64b771663b2465..2833eff5f3d0e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -192,7 +192,7 @@ public TMasterResult finishTask(TFinishTaskRequest request) { finishRecoverTablet(task); break; case ALTER: - finishAlterTask(task, request); + finishAlterTask(task); break; case ALTER_INVERTED_INDEX: finishAlterInvertedIndexTask(task, request); @@ -575,7 +575,7 @@ public TMasterResult report(TReportRequest request) throws TException { return reportHandler.handleReport(request); } - private void finishAlterTask(AgentTask task, TFinishTaskRequest request) { + private void finishAlterTask(AgentTask task) { AlterReplicaTask alterTask = (AlterReplicaTask) task; try { if (alterTask.getJobType() == JobType.ROLLUP) { @@ -584,11 +584,6 @@ private void finishAlterTask(AgentTask task, TFinishTaskRequest request) { Env.getCurrentEnv().getSchemaChangeHandler().handleFinishAlterTask(alterTask); } alterTask.setFinished(true); - if (request.isSetReportVersion()) { - long reportVersion = request.getReportVersion(); - Env.getCurrentSystemInfo().updateBackendReportVersion( - task.getBackendId(), reportVersion, task.getDbId(), task.getTableId()); - } } catch (MetaNotFoundException e) { LOG.warn("failed to handle finish alter task: {}, {}", task.getSignature(), e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 2de0c4e5050585..bfe1bb0a9e2850 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -403,8 +403,7 @@ private static void diffResource(List storageResourcesInBe, Li } } - // public for fe ut - public static void tabletReport(long backendId, Map backendTablets, long backendReportVersion) { + private static void tabletReport(long backendId, Map backendTablets, long backendReportVersion) { long start = System.currentTimeMillis(); LOG.info("backend[{}] reports {} tablet(s). report version: {}", backendId, backendTablets.size(), backendReportVersion); @@ -608,11 +607,6 @@ private static void sync(Map backendTablets, ListMultimap backendTablets, ListMultimap= partition.getCommittedVersion() - && replica.getLastFailedVersion() > partition.getCommittedVersion()) { - LOG.info("sync replica {} of tablet {} in backend {} in db {}. replica last failed" - + " version change to -1 because last failed version > replica's committed" - + " version {}", - replica, tabletId, backendId, dbId, partition.getCommittedVersion()); - replica.updateLastFailedVersion(-1L); - needSync = true; + if (metaVersion < backendVersion + || (metaVersion == backendVersion && replica.isBad())) { + + if (backendReportVersion < Env.getCurrentSystemInfo() + .getBackendReportVersion(backendId)) { + continue; } - } - if (needSync) { // happens when // 1. PUSH finished in BE but failed or not yet report to FE // 2. repair for VERSION_INCOMPLETE finished in BE, but failed or not yet report to FE @@ -1065,25 +1048,18 @@ private static void handleRecoverTablet(ListMultimap tabletRecoveryM break; } - if ((tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) - || replica.checkVersionRegressive(tTabletInfo.getVersion())) { + if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) { // If the origin last failed version is larger than 0, not change it. // Otherwise, we set last failed version to replica'version + 1. // Because last failed version should always larger than replica's version. long newLastFailedVersion = replica.getLastFailedVersion(); if (newLastFailedVersion < 0) { newLastFailedVersion = replica.getVersion() + 1; - replica.updateLastFailedVersion(newLastFailedVersion); - LOG.warn("set missing version for replica {} of tablet {} on backend {}, " - + "version in fe {}, version in be {}, be missing {}", - replica.getId(), tabletId, backendId, replica.getVersion(), - tTabletInfo.getVersion(), tTabletInfo.isVersionMiss()); } + replica.updateLastFailedVersion(newLastFailedVersion); backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion); break; } - - break; } } } finally { diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java deleted file mode 100644 index 7539548583c502..00000000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java +++ /dev/null @@ -1,176 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.clone; - -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.MaterializedIndex; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.Replica; -import org.apache.doris.catalog.Tablet; -import org.apache.doris.common.Config; -import org.apache.doris.common.util.DebugPointUtil; -import org.apache.doris.common.util.DebugPointUtil.DebugPoint; -import org.apache.doris.master.ReportHandler; -import org.apache.doris.thrift.TTablet; -import org.apache.doris.thrift.TTabletInfo; -import org.apache.doris.utframe.TestWithFeService; - -import com.google.common.collect.Maps; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.Map; - -public class RepairVersionTest extends TestWithFeService { - private class TableInfo { - Partition partition; - Tablet tablet; - Replica replica; - } - - @Override - protected void beforeCreatingConnectContext() throws Exception { - Config.enable_debug_points = true; - Config.disable_balance = true; - Config.disable_tablet_scheduler = true; - Config.allow_replica_on_same_host = true; - Config.tablet_checker_interval_ms = 100; - Config.tablet_schedule_interval_ms = 100; - } - - @Override - protected void runBeforeAll() throws Exception { - createDatabase("test"); - } - - @Override - protected int backendNum() { - return 2; - } - - @Test - public void testRepairLastFailedVersionByClone() throws Exception { - TableInfo info = prepareTableForTest("tbl_repair_last_fail_version_by_clone"); - Partition partition = info.partition; - Replica replica = info.replica; - - replica.updateLastFailedVersion(replica.getVersion() + 1); - Assertions.assertEquals(partition.getCommittedVersion() + 1, replica.getLastFailedVersion()); - - Config.disable_tablet_scheduler = false; - Thread.sleep(1000); - Config.disable_tablet_scheduler = true; - - Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - Assertions.assertEquals(-1L, replica.getLastFailedVersion()); - } - - @Test - public void testRepairLastFailedVersionByReport() throws Exception { - TableInfo info = prepareTableForTest("tbl_repair_last_fail_version_by_report"); - Partition partition = info.partition; - Tablet tablet = info.tablet; - Replica replica = info.replica; - - replica.updateLastFailedVersion(replica.getVersion() + 1); - Assertions.assertEquals(partition.getCommittedVersion() + 1, replica.getLastFailedVersion()); - - TTabletInfo tTabletInfo = new TTabletInfo(); - tTabletInfo.setTabletId(tablet.getId()); - tTabletInfo.setSchemaHash(replica.getSchemaHash()); - tTabletInfo.setVersion(replica.getVersion()); - tTabletInfo.setPathHash(replica.getPathHash()); - tTabletInfo.setPartitionId(partition.getId()); - tTabletInfo.setReplicaId(replica.getId()); - - TTablet tTablet = new TTablet(); - tTablet.addToTabletInfos(tTabletInfo); - Map tablets = Maps.newHashMap(); - tablets.put(tablet.getId(), tTablet); - Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); - - Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - Assertions.assertEquals(-1L, replica.getLastFailedVersion()); - } - - @Test - public void testVersionRegressive() throws Exception { - TableInfo info = prepareTableForTest("tbl_version_regressive"); - Partition partition = info.partition; - Tablet tablet = info.tablet; - Replica replica = info.replica; - - Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - Assertions.assertEquals(-1L, replica.getLastFailedVersion()); - Assertions.assertTrue(replica.getVersion() > 1L); - - TTabletInfo tTabletInfo = new TTabletInfo(); - tTabletInfo.setTabletId(tablet.getId()); - tTabletInfo.setSchemaHash(replica.getSchemaHash()); - tTabletInfo.setVersion(1L); // be report version = 1 which less than fe version - tTabletInfo.setPathHash(replica.getPathHash()); - tTabletInfo.setPartitionId(partition.getId()); - tTabletInfo.setReplicaId(replica.getId()); - - TTablet tTablet = new TTablet(); - tTablet.addToTabletInfos(tTabletInfo); - Map tablets = Maps.newHashMap(); - tablets.put(tablet.getId(), tTablet); - - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); - Assertions.assertEquals(-1L, replica.getLastFailedVersion()); - - DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", new DebugPoint()); - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); - Assertions.assertEquals(replica.getVersion() + 1, replica.getLastFailedVersion()); - - Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - } - - private TableInfo prepareTableForTest(String tableName) throws Exception { - createTable("CREATE TABLE test." + tableName + " (k INT) DISTRIBUTED BY HASH(k) " - + " BUCKETS 1 PROPERTIES ( \"replication_num\" = \"2\" )"); - - Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:test"); - OlapTable tbl = (OlapTable) db.getTableOrMetaException(tableName); - Assertions.assertNotNull(tbl); - Partition partition = tbl.getPartitions().iterator().next(); - Tablet tablet = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next() - .getTablets().iterator().next(); - - long visibleVersion = 2L; - partition.updateVisibleVersion(visibleVersion); - partition.setNextVersion(visibleVersion + 1); - tablet.getReplicas().forEach(replica -> replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L)); - - Replica replica = tablet.getReplicas().iterator().next(); - Assertions.assertEquals(visibleVersion, replica.getVersion()); - Assertions.assertEquals(-1L, replica.getLastFailedVersion()); - - TableInfo info = new TableInfo(); - info.partition = partition; - info.tablet = tablet; - info.replica = replica; - - return info; - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index ac4fa2660db7ea..a17fcdf72bc699 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -392,7 +392,7 @@ protected void createDorisCluster(String runningDir, int backendNum) InterruptedException { int feRpcPort = startFEServer(runningDir); List bes = Lists.newArrayList(); - System.out.println("start create backend, backend num " + backendNum); + System.out.println("start create backend"); for (int i = 0; i < backendNum; i++) { bes.add(createBackend("127.0.0.1", feRpcPort)); }