Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,18 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static long tablet_repair_delay_factor_second = 60;

/**
* clone a tablet, further repair timeout.
*/
@ConfField(mutable = true, masterOnly = true)
public static long tablet_further_repair_timeout_second = 20 * 60;

/**
* clone a tablet, further repair max times.
*/
@ConfField(mutable = true, masterOnly = true)
public static int tablet_further_repair_max_times = 5;

/**
* the default slot number per path for hdd in tablet scheduler
* TODO(cmy): remove this config and dynamically adjust it by clone task statistic
Expand Down
65 changes: 55 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.catalog;

import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -114,28 +115,34 @@ public enum ReplicaStatus {
private long cooldownTerm = -1;

/*
* If set to true, with means this replica need to be repaired. explicitly.
* This can happen when this replica is created by a balance clone task, and
* when task finished, the version of this replica is behind the partition's visible version.
* So this replica need a further repair.
* If we do not do this, this replica will be treated as version stale, and will be removed,
* so that the balance task is failed, which is unexpected.
*
* furtherRepairSetTime set alone with needFurtherRepair.
* furtherRepairSetTime and leftFurtherRepairCount are set alone with needFurtherRepair.
* This is an insurance, in case that further repair task always fail. If 20 min passed
* since we set needFurtherRepair to true, the 'needFurtherRepair' will be set to false.
*/
private boolean needFurtherRepair = false;
private long furtherRepairSetTime = -1;
private static final long FURTHER_REPAIR_TIMEOUT_MS = 20 * 60 * 1000L; // 20min
private int leftFurtherRepairCount = 0;

// During full clone, the replica's state is CLONE, it will not load the data.
// After full clone finished, even if the replica's version = partition's visible version,
//
// notice: furtherRepairWatermarkTxnTd is used to clone a replica, protected it from be removed.
//
private long furtherRepairWatermarkTxnTd = -1;

/* Decommission a backend B, steps are as follow:
* 1. wait peer backends catchup with B;
* 2. B change state to DECOMMISSION, set preWatermarkTxnId. B can load data now.
* 3. wait txn before preWatermarkTxnId finished, set postWatermarkTxnId. B can't load data now.
* 4. wait txn before postWatermarkTxnId finished, delete B.
*
* notice: preWatermarkTxnId and postWatermarkTxnId are used to delete this replica.
*
*/
private long preWatermarkTxnId = -1;
private long postWatermarkTxnId = -1;
Expand Down Expand Up @@ -263,15 +270,35 @@ public void setCooldownTerm(long cooldownTerm) {
}

public boolean needFurtherRepair() {
if (needFurtherRepair && System.currentTimeMillis() - this.furtherRepairSetTime < FURTHER_REPAIR_TIMEOUT_MS) {
return true;
}
return false;
return leftFurtherRepairCount > 0
&& System.currentTimeMillis() < furtherRepairSetTime
+ Config.tablet_further_repair_timeout_second * 1000;
}

public void setNeedFurtherRepair(boolean needFurtherRepair) {
this.needFurtherRepair = needFurtherRepair;
this.furtherRepairSetTime = System.currentTimeMillis();
if (needFurtherRepair) {
furtherRepairSetTime = System.currentTimeMillis();
leftFurtherRepairCount = Config.tablet_further_repair_max_times;
} else {
leftFurtherRepairCount = 0;
furtherRepairSetTime = -1;
}
}

public void incrFurtherRepairCount() {
leftFurtherRepairCount--;
}

public int getLeftFurtherRepairCount() {
return leftFurtherRepairCount;
}

public long getFurtherRepairWatermarkTxnTd() {
return furtherRepairWatermarkTxnTd;
}

public void setFurtherRepairWatermarkTxnTd(long furtherRepairWatermarkTxnTd) {
this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd;
}

// for compatibility
Expand Down Expand Up @@ -300,6 +327,7 @@ public synchronized void updateVersionWithFailedInfo(

public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVersion, Long lastSuccessVersion,
long updateTime) {
long oldLastFailedVersion = this.lastFailedVersion;
if (version != null) {
this.version = version;
}
Expand All @@ -324,6 +352,13 @@ public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVer
if (this.lastSuccessVersion < this.version) {
this.lastSuccessVersion = this.version;
}
if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) {
LOG.info("change replica last failed version from '< 0' to '> 0', replica {}, old last failed version {}",
this, oldLastFailedVersion);
} else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) {
LOG.info("change replica last failed version from '> 0' to '< 0', replica {}, old last failed version {}",
this, oldLastFailedVersion);
}
}

/* last failed version: LFV
Expand Down Expand Up @@ -374,6 +409,8 @@ private void updateReplicaInfo(long newVersion,
return;
}

long oldLastFailedVersion = this.lastFailedVersion;

this.version = newVersion;
this.dataSize = newDataSize;
this.remoteDataSize = newRemoteDataSize;
Expand Down Expand Up @@ -427,6 +464,14 @@ private void updateReplicaInfo(long newVersion,
if (LOG.isDebugEnabled()) {
LOG.debug("after update {}", this.toString());
}

if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) {
LOG.info("change replica last failed version from '< 0' to '> 0', replica {}, old last failed version {}",
this, oldLastFailedVersion);
} else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) {
LOG.info("change replica last failed version from '> 0' to '< 0', replica {}, old last failed version {}",
this, oldLastFailedVersion);
}
}

public synchronized void updateLastFailedVersion(long lastFailedVersion) {
Expand Down
74 changes: 55 additions & 19 deletions fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public enum State {
private Replica tempSrcReplica = null;
private long destBackendId = -1;
private long destPathHash = -1;
private long destOldVersion = -1;
// for disk balance to set migration task's datadir
private String destPath = null;
private String errMsg = null;
Expand Down Expand Up @@ -912,28 +913,26 @@ public CloneTask createCloneReplicaAndTask() throws SchedException {
// if this is a balance task, or this is a repair task with
// REPLICA_MISSING/REPLICA_RELOCATING,
// we create a new replica with state CLONE
long replicaId = 0;
Replica replica = null;
if (tabletStatus == TabletStatus.REPLICA_MISSING
|| tabletStatus == TabletStatus.REPLICA_RELOCATING || type == Type.BALANCE
|| tabletStatus == TabletStatus.COLOCATE_MISMATCH
|| tabletStatus == TabletStatus.REPLICA_MISSING_FOR_TAG) {
Replica cloneReplica = new Replica(
replica = new Replica(
Env.getCurrentEnv().getNextId(), destBackendId,
-1 /* version */, schemaHash,
-1 /* data size */, -1, -1 /* row count */,
ReplicaState.CLONE,
committedVersion, /* use committed version as last failed version */
-1 /* last success version */);

LOG.info("create clone task to make new replica, tabletId={}, replicaId={}", tabletId,
cloneReplica.getId());
// addReplica() method will add this replica to tablet inverted index too.
tablet.addReplica(cloneReplica);
replicaId = cloneReplica.getId();
} else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) {
tablet.addReplica(replica);
} else {
// tabletStatus is VERSION_INCOMPLETE || NEED_FURTHER_REPAIR
Preconditions.checkState(type == Type.REPAIR, type);
// double check
Replica replica = tablet.getReplicaByBackendId(destBackendId);
replica = tablet.getReplicaByBackendId(destBackendId);
if (replica == null) {
throw new SchedException(Status.SCHEDULE_FAILED, "dest replica does not exist on BE " + destBackendId);
}
Expand All @@ -942,17 +941,18 @@ public CloneTask createCloneReplicaAndTask() throws SchedException {
throw new SchedException(Status.SCHEDULE_FAILED, "dest replica's path hash is changed. "
+ "current: " + replica.getPathHash() + ", scheduled: " + destPathHash);
}
replicaId = replica.getId();
}

TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort());
TBackend tDestBe = new TBackend(destBe.getHost(), destBe.getBePort(), destBe.getHttpPort());

cloneTask = new CloneTask(tDestBe, destBackendId, dbId, tblId, partitionId, indexId, tabletId,
replicaId, schemaHash, Lists.newArrayList(tSrcBe), storageMedium,
replica.getId(), schemaHash, Lists.newArrayList(tSrcBe), storageMedium,
visibleVersion, (int) (taskTimeoutMs / 1000));
destOldVersion = replica.getVersion();
cloneTask.setPathHash(srcPathHash, destPathHash);
LOG.info("create clone task to repair replica, tabletId={}, replicaId={}", tabletId, replicaId);
LOG.info("create clone task to repair replica, tabletId={}, replica={}, visible version {}, tablet status {}",
tabletId, replica, visibleVersion, tabletStatus);

this.state = State.RUNNING;
return cloneTask;
Expand Down Expand Up @@ -1078,16 +1078,51 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
replica.setPathHash(reportedTablet.getPathHash());
}

if (this.type == Type.BALANCE) {
long partitionVisibleVersion = partition.getVisibleVersion();
if (replica.getVersion() < partitionVisibleVersion) {
// see comment 'needFurtherRepair' of Replica for explanation.
// no need to persist this info. If FE restart, just do it again.
replica.setNeedFurtherRepair(true);
if (type == Type.BALANCE) {
replica.setNeedFurtherRepair(true);
try {
long furtherRepairWatermarkTxnTd = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
replica.setFurtherRepairWatermarkTxnTd(furtherRepairWatermarkTxnTd);
LOG.info("new replica {} of tablet {} set further repair watermark id {}",
replica, tabletId, furtherRepairWatermarkTxnTd);
} catch (Exception e) {
LOG.warn("new replica {} set further repair watermark id failed", replica, e);
}
} else {
}

// isCatchup should check the txns during ReplicaState CLONE finished.
// Because when replica's state = CLONE, it will not load txns.
// Even if this replica version = partition visible version, but later if the txns during CLONE
// change from prepare to committed or visible, this replica will be fall behind and be removed
// in REDUNDANT detection.
//
boolean isCatchup = false;
if (replica.getVersion() >= partition.getVisibleVersion() && replica.getLastFailedVersion() < 0) {
long furtherRepairWatermarkTxnTd = replica.getFurtherRepairWatermarkTxnTd();
if (furtherRepairWatermarkTxnTd < 0) {
isCatchup = true;
} else {
try {
if (Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
furtherRepairWatermarkTxnTd, dbId, tblId, partitionId)) {
isCatchup = true;
LOG.info("new replica {} of tablet {} has catchup with further repair watermark id {}",
replica, tabletId, furtherRepairWatermarkTxnTd);
}
} catch (Exception e) {
isCatchup = true;
}
}
}

replica.incrFurtherRepairCount();
if (isCatchup || replica.getLeftFurtherRepairCount() <= 0) {
replica.setNeedFurtherRepair(false);
}
if (!replica.needFurtherRepair()) {
replica.setFurtherRepairWatermarkTxnTd(-1);
}

ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(dbId, tblId, partitionId, indexId,
tabletId, destBackendId, replica.getId(),
Expand All @@ -1109,7 +1144,8 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
}

state = State.FINISHED;
LOG.info("clone finished: {}", this);
LOG.info("clone finished: {}, replica {}, replica old version {}, need further repair {}, is catchup {}",
this, replica, destOldVersion, replica.needFurtherRepair(), isCatchup);
} finally {
olapTable.writeUnlock();
}
Expand Down