From 8f7b49dbcd8219140572846892e6e9485088d6fd Mon Sep 17 00:00:00 2001 From: yujun777 Date: Mon, 25 Sep 2023 10:22:21 +0800 Subject: [PATCH] fix balanced new replica will be removed when load txn continuously and none-stop --- .../java/org/apache/doris/common/Config.java | 12 +++ .../org/apache/doris/catalog/Replica.java | 65 +++++++++++++--- .../apache/doris/clone/TabletSchedCtx.java | 74 ++++++++++++++----- 3 files changed, 122 insertions(+), 29 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 09b9e4abef8622..ef09363f3750d8 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -929,6 +929,18 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static long tablet_repair_delay_factor_second = 60; + /** + * clone a tablet, further repair timeout. + */ + @ConfField(mutable = true, masterOnly = true) + public static long tablet_further_repair_timeout_second = 20 * 60; + + /** + * clone a tablet, further repair max times. + */ + @ConfField(mutable = true, masterOnly = true) + public static int tablet_further_repair_max_times = 5; + /** * the default slot number per path for hdd in tablet scheduler * TODO(cmy): remove this config and dynamically adjust it by clone task statistic diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 2f82bd26a3ecbe..e55eab8939216f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -17,6 +17,7 @@ package org.apache.doris.catalog; +import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.thrift.TUniqueId; @@ -114,21 +115,25 @@ public enum ReplicaStatus { private long cooldownTerm = -1; /* - * If set to true, with means this replica need to be repaired. explicitly. * This can happen when this replica is created by a balance clone task, and * when task finished, the version of this replica is behind the partition's visible version. * So this replica need a further repair. * If we do not do this, this replica will be treated as version stale, and will be removed, * so that the balance task is failed, which is unexpected. * - * furtherRepairSetTime set alone with needFurtherRepair. + * furtherRepairSetTime and leftFurtherRepairCount are set alone with needFurtherRepair. * This is an insurance, in case that further repair task always fail. If 20 min passed * since we set needFurtherRepair to true, the 'needFurtherRepair' will be set to false. */ - private boolean needFurtherRepair = false; private long furtherRepairSetTime = -1; - private static final long FURTHER_REPAIR_TIMEOUT_MS = 20 * 60 * 1000L; // 20min + private int leftFurtherRepairCount = 0; + // During full clone, the replica's state is CLONE, it will not load the data. + // After full clone finished, even if the replica's version = partition's visible version, + // + // notice: furtherRepairWatermarkTxnTd is used to clone a replica, protected it from be removed. + // + private long furtherRepairWatermarkTxnTd = -1; /* Decommission a backend B, steps are as follow: * 1. wait peer backends catchup with B; @@ -136,6 +141,8 @@ public enum ReplicaStatus { * 3. wait txn before preWatermarkTxnId finished, set postWatermarkTxnId. B can't load data now. * 4. wait txn before postWatermarkTxnId finished, delete B. * + * notice: preWatermarkTxnId and postWatermarkTxnId are used to delete this replica. + * */ private long preWatermarkTxnId = -1; private long postWatermarkTxnId = -1; @@ -263,15 +270,35 @@ public void setCooldownTerm(long cooldownTerm) { } public boolean needFurtherRepair() { - if (needFurtherRepair && System.currentTimeMillis() - this.furtherRepairSetTime < FURTHER_REPAIR_TIMEOUT_MS) { - return true; - } - return false; + return leftFurtherRepairCount > 0 + && System.currentTimeMillis() < furtherRepairSetTime + + Config.tablet_further_repair_timeout_second * 1000; } public void setNeedFurtherRepair(boolean needFurtherRepair) { - this.needFurtherRepair = needFurtherRepair; - this.furtherRepairSetTime = System.currentTimeMillis(); + if (needFurtherRepair) { + furtherRepairSetTime = System.currentTimeMillis(); + leftFurtherRepairCount = Config.tablet_further_repair_max_times; + } else { + leftFurtherRepairCount = 0; + furtherRepairSetTime = -1; + } + } + + public void incrFurtherRepairCount() { + leftFurtherRepairCount--; + } + + public int getLeftFurtherRepairCount() { + return leftFurtherRepairCount; + } + + public long getFurtherRepairWatermarkTxnTd() { + return furtherRepairWatermarkTxnTd; + } + + public void setFurtherRepairWatermarkTxnTd(long furtherRepairWatermarkTxnTd) { + this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd; } // for compatibility @@ -300,6 +327,7 @@ public synchronized void updateVersionWithFailedInfo( public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVersion, Long lastSuccessVersion, long updateTime) { + long oldLastFailedVersion = this.lastFailedVersion; if (version != null) { this.version = version; } @@ -324,6 +352,13 @@ public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVer if (this.lastSuccessVersion < this.version) { this.lastSuccessVersion = this.version; } + if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) { + LOG.info("change replica last failed version from '< 0' to '> 0', replica {}, old last failed version {}", + this, oldLastFailedVersion); + } else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) { + LOG.info("change replica last failed version from '> 0' to '< 0', replica {}, old last failed version {}", + this, oldLastFailedVersion); + } } /* last failed version: LFV @@ -374,6 +409,8 @@ private void updateReplicaInfo(long newVersion, return; } + long oldLastFailedVersion = this.lastFailedVersion; + this.version = newVersion; this.dataSize = newDataSize; this.remoteDataSize = newRemoteDataSize; @@ -427,6 +464,14 @@ private void updateReplicaInfo(long newVersion, if (LOG.isDebugEnabled()) { LOG.debug("after update {}", this.toString()); } + + if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) { + LOG.info("change replica last failed version from '< 0' to '> 0', replica {}, old last failed version {}", + this, oldLastFailedVersion); + } else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) { + LOG.info("change replica last failed version from '> 0' to '< 0', replica {}, old last failed version {}", + this, oldLastFailedVersion); + } } public synchronized void updateLastFailedVersion(long lastFailedVersion) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index f3c03b3e1e558e..86b252c12cf35d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -188,6 +188,7 @@ public enum State { private Replica tempSrcReplica = null; private long destBackendId = -1; private long destPathHash = -1; + private long destOldVersion = -1; // for disk balance to set migration task's datadir private String destPath = null; private String errMsg = null; @@ -912,12 +913,12 @@ public CloneTask createCloneReplicaAndTask() throws SchedException { // if this is a balance task, or this is a repair task with // REPLICA_MISSING/REPLICA_RELOCATING, // we create a new replica with state CLONE - long replicaId = 0; + Replica replica = null; if (tabletStatus == TabletStatus.REPLICA_MISSING || tabletStatus == TabletStatus.REPLICA_RELOCATING || type == Type.BALANCE || tabletStatus == TabletStatus.COLOCATE_MISMATCH || tabletStatus == TabletStatus.REPLICA_MISSING_FOR_TAG) { - Replica cloneReplica = new Replica( + replica = new Replica( Env.getCurrentEnv().getNextId(), destBackendId, -1 /* version */, schemaHash, -1 /* data size */, -1, -1 /* row count */, @@ -925,15 +926,13 @@ public CloneTask createCloneReplicaAndTask() throws SchedException { committedVersion, /* use committed version as last failed version */ -1 /* last success version */); - LOG.info("create clone task to make new replica, tabletId={}, replicaId={}", tabletId, - cloneReplica.getId()); // addReplica() method will add this replica to tablet inverted index too. - tablet.addReplica(cloneReplica); - replicaId = cloneReplica.getId(); - } else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) { + tablet.addReplica(replica); + } else { + // tabletStatus is VERSION_INCOMPLETE || NEED_FURTHER_REPAIR Preconditions.checkState(type == Type.REPAIR, type); // double check - Replica replica = tablet.getReplicaByBackendId(destBackendId); + replica = tablet.getReplicaByBackendId(destBackendId); if (replica == null) { throw new SchedException(Status.SCHEDULE_FAILED, "dest replica does not exist on BE " + destBackendId); } @@ -942,17 +941,18 @@ public CloneTask createCloneReplicaAndTask() throws SchedException { throw new SchedException(Status.SCHEDULE_FAILED, "dest replica's path hash is changed. " + "current: " + replica.getPathHash() + ", scheduled: " + destPathHash); } - replicaId = replica.getId(); } TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort()); TBackend tDestBe = new TBackend(destBe.getHost(), destBe.getBePort(), destBe.getHttpPort()); cloneTask = new CloneTask(tDestBe, destBackendId, dbId, tblId, partitionId, indexId, tabletId, - replicaId, schemaHash, Lists.newArrayList(tSrcBe), storageMedium, + replica.getId(), schemaHash, Lists.newArrayList(tSrcBe), storageMedium, visibleVersion, (int) (taskTimeoutMs / 1000)); + destOldVersion = replica.getVersion(); cloneTask.setPathHash(srcPathHash, destPathHash); - LOG.info("create clone task to repair replica, tabletId={}, replicaId={}", tabletId, replicaId); + LOG.info("create clone task to repair replica, tabletId={}, replica={}, visible version {}, tablet status {}", + tabletId, replica, visibleVersion, tabletStatus); this.state = State.RUNNING; return cloneTask; @@ -1078,16 +1078,51 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) replica.setPathHash(reportedTablet.getPathHash()); } - if (this.type == Type.BALANCE) { - long partitionVisibleVersion = partition.getVisibleVersion(); - if (replica.getVersion() < partitionVisibleVersion) { - // see comment 'needFurtherRepair' of Replica for explanation. - // no need to persist this info. If FE restart, just do it again. - replica.setNeedFurtherRepair(true); + if (type == Type.BALANCE) { + replica.setNeedFurtherRepair(true); + try { + long furtherRepairWatermarkTxnTd = Env.getCurrentGlobalTransactionMgr() + .getTransactionIDGenerator().getNextTransactionId(); + replica.setFurtherRepairWatermarkTxnTd(furtherRepairWatermarkTxnTd); + LOG.info("new replica {} of tablet {} set further repair watermark id {}", + replica, tabletId, furtherRepairWatermarkTxnTd); + } catch (Exception e) { + LOG.warn("new replica {} set further repair watermark id failed", replica, e); } - } else { + } + + // isCatchup should check the txns during ReplicaState CLONE finished. + // Because when replica's state = CLONE, it will not load txns. + // Even if this replica version = partition visible version, but later if the txns during CLONE + // change from prepare to committed or visible, this replica will be fall behind and be removed + // in REDUNDANT detection. + // + boolean isCatchup = false; + if (replica.getVersion() >= partition.getVisibleVersion() && replica.getLastFailedVersion() < 0) { + long furtherRepairWatermarkTxnTd = replica.getFurtherRepairWatermarkTxnTd(); + if (furtherRepairWatermarkTxnTd < 0) { + isCatchup = true; + } else { + try { + if (Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished( + furtherRepairWatermarkTxnTd, dbId, tblId, partitionId)) { + isCatchup = true; + LOG.info("new replica {} of tablet {} has catchup with further repair watermark id {}", + replica, tabletId, furtherRepairWatermarkTxnTd); + } + } catch (Exception e) { + isCatchup = true; + } + } + } + + replica.incrFurtherRepairCount(); + if (isCatchup || replica.getLeftFurtherRepairCount() <= 0) { replica.setNeedFurtherRepair(false); } + if (!replica.needFurtherRepair()) { + replica.setFurtherRepairWatermarkTxnTd(-1); + } ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(dbId, tblId, partitionId, indexId, tabletId, destBackendId, replica.getId(), @@ -1109,7 +1144,8 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) } state = State.FINISHED; - LOG.info("clone finished: {}", this); + LOG.info("clone finished: {}, replica {}, replica old version {}, need further repair {}, is catchup {}", + this, replica, destOldVersion, replica.needFurtherRepair(), isCatchup); } finally { olapTable.writeUnlock(); }