Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.thrift.TUniqueId;

import com.google.gson.annotations.SerializedName;
Expand Down Expand Up @@ -114,6 +115,14 @@ public enum ReplicaStatus {
private TUniqueId cooldownMetaId;
private long cooldownTerm = -1;

// A replica version should increase monotonically,
// but backend may missing some versions due to disk failure or bugs.
// FE should found these and mark the replica as missing versions.
// If backend's report version < fe version, record the backend's report version as `regressiveVersion`,
// and if time exceed 5min, fe should mark this replica as missing versions.
private long regressiveVersion = -1;
private long regressiveVersionTimestamp = 0;

/*
* This can happen when this replica is created by a balance clone task, and
* when task finished, the version of this replica is behind the partition's visible version.
Expand Down Expand Up @@ -435,9 +444,9 @@ private void updateReplicaInfo(long newVersion,

if (lastFailedVersion != this.lastFailedVersion) {
// Case 2:
if (lastFailedVersion > this.lastFailedVersion) {
if (lastFailedVersion > this.lastFailedVersion || lastFailedVersion < 0) {
this.lastFailedVersion = lastFailedVersion;
this.lastFailedTimestamp = System.currentTimeMillis();
this.lastFailedTimestamp = lastFailedVersion > 0 ? System.currentTimeMillis() : -1L;
}

this.lastSuccessVersion = this.version;
Expand Down Expand Up @@ -506,10 +515,6 @@ public boolean checkVersionCatchUp(long expectedVersion, boolean ignoreAlter) {
return true;
}

public void setLastFailedVersion(long lastFailedVersion) {
this.lastFailedVersion = lastFailedVersion;
}

public void setState(ReplicaState replicaState) {
this.state = replicaState;
}
Expand All @@ -534,6 +539,25 @@ public void setVersionCount(long versionCount) {
this.versionCount = versionCount;
}

public boolean checkVersionRegressive(long newVersion) {
if (newVersion >= version) {
regressiveVersion = -1;
regressiveVersionTimestamp = -1;
return false;
}

if (DebugPointUtil.isEnable("Replica.regressive_version_immediately")) {
return true;
}

if (newVersion != regressiveVersion) {
regressiveVersion = newVersion;
regressiveVersionTimestamp = System.currentTimeMillis();
}

return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 * 60 * 1000L;
}

@Override
public String toString() {
StringBuilder strBuffer = new StringBuilder("[replicaId=");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,22 @@ private boolean needSync(Replica replicaInFe, TTabletInfo backendTabletInfo) {
if (backendTabletInfo.getVersion() > versionInFe) {
// backend replica's version is larger or newer than replica in FE, sync it.
return true;
} else if (versionInFe == backendTabletInfo.getVersion() && replicaInFe.isBad()) {
} else if (versionInFe == backendTabletInfo.getVersion()) {
// backend replica's version is equal to replica in FE, but replica in FE is bad,
// while backend replica is good, sync it
return true;
if (replicaInFe.isBad()) {
return true;
}

// FE' s replica last failed version > partition's committed version
// this can be occur when be report miss version, fe will set last failed version = visible version + 1
// then last failed version may greater than partition's committed version
//
// But here cannot got variable partition, we just check lastFailedVersion = version + 1,
// In ReportHandler.sync, we will check if last failed version > partition's committed version again.
if (replicaInFe.getLastFailedVersion() == versionInFe + 1) {
return true;
}
}

return false;
Expand Down Expand Up @@ -501,6 +513,12 @@ private boolean needRecover(Replica replicaInFe, int schemaHashInFe, TTabletInfo
// so we only return true if version_miss is true.
return true;
}

// backend versions regressive due to bugs
if (replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) {
return true;
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,13 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)

replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(),
reportedTablet.getDataSize(), reportedTablet.getRowCount());
if (replica.getLastFailedVersion() > partition.getCommittedVersion()
&& reportedTablet.getVersion() >= partition.getCommittedVersion()
//&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss()
&& !(reportedTablet.isSetUsed() && !reportedTablet.isUsed())) {
LOG.info("change replica {} of tablet {} 's last failed version to -1", replica, tabletId);
replica.updateLastFailedVersion(-1L);
}
if (reportedTablet.isSetPathHash()) {
replica.setPathHash(reportedTablet.getPathHash());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
finishRecoverTablet(task);
break;
case ALTER:
finishAlterTask(task);
finishAlterTask(task, request);
break;
case ALTER_INVERTED_INDEX:
finishAlterInvertedIndexTask(task, request);
Expand Down Expand Up @@ -575,7 +575,7 @@ public TMasterResult report(TReportRequest request) throws TException {
return reportHandler.handleReport(request);
}

private void finishAlterTask(AgentTask task) {
private void finishAlterTask(AgentTask task, TFinishTaskRequest request) {
AlterReplicaTask alterTask = (AlterReplicaTask) task;
try {
if (alterTask.getJobType() == JobType.ROLLUP) {
Expand All @@ -584,6 +584,11 @@ private void finishAlterTask(AgentTask task) {
Env.getCurrentEnv().getSchemaChangeHandler().handleFinishAlterTask(alterTask);
}
alterTask.setFinished(true);
if (request.isSetReportVersion()) {
long reportVersion = request.getReportVersion();
Env.getCurrentSystemInfo().updateBackendReportVersion(
task.getBackendId(), reportVersion, task.getDbId(), task.getTableId());
}
} catch (MetaNotFoundException e) {
LOG.warn("failed to handle finish alter task: {}, {}", task.getSignature(), e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ private static void diffResource(List<TStorageResource> storageResourcesInBe, Li
}
}

private static void tabletReport(long backendId, Map<Long, TTablet> backendTablets, long backendReportVersion) {
// public for fe ut
public static void tabletReport(long backendId, Map<Long, TTablet> backendTablets, long backendReportVersion) {
long start = System.currentTimeMillis();
LOG.info("backend[{}] reports {} tablet(s). report version: {}",
backendId, backendTablets.size(), backendReportVersion);
Expand Down Expand Up @@ -607,6 +608,11 @@ private static void sync(Map<Long, TTablet> backendTablets, ListMultimap<Long, L
if (olapTable == null || !olapTable.writeLockIfExist()) {
continue;
}

if (backendReportVersion < Env.getCurrentSystemInfo().getBackendReportVersion(backendId)) {
break;
}

try {
long partitionId = tabletMeta.getPartitionId();
Partition partition = olapTable.getPartition(partitionId);
Expand Down Expand Up @@ -660,14 +666,25 @@ private static void sync(Map<Long, TTablet> backendTablets, ListMultimap<Long, L
continue;
}

if (metaVersion < backendVersion
|| (metaVersion == backendVersion && replica.isBad())) {

if (backendReportVersion < Env.getCurrentSystemInfo()
.getBackendReportVersion(backendId)) {
continue;
boolean needSync = false;
if (metaVersion < backendVersion) {
needSync = true;
} else if (metaVersion == backendVersion) {
if (replica.isBad()) {
needSync = true;
}
if (replica.getVersion() >= partition.getCommittedVersion()
&& replica.getLastFailedVersion() > partition.getCommittedVersion()) {
LOG.info("sync replica {} of tablet {} in backend {} in db {}. replica last failed"
+ " version change to -1 because last failed version > replica's committed"
+ " version {}",
replica, tabletId, backendId, dbId, partition.getCommittedVersion());
replica.updateLastFailedVersion(-1L);
needSync = true;
}
}

if (needSync) {
// happens when
// 1. PUSH finished in BE but failed or not yet report to FE
// 2. repair for VERSION_INCOMPLETE finished in BE, but failed or not yet report to FE
Expand Down Expand Up @@ -1048,18 +1065,25 @@ private static void handleRecoverTablet(ListMultimap<Long, Long> tabletRecoveryM
break;
}

if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) {
if ((tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss())
|| replica.checkVersionRegressive(tTabletInfo.getVersion())) {
// If the origin last failed version is larger than 0, not change it.
// Otherwise, we set last failed version to replica'version + 1.
// Because last failed version should always larger than replica's version.
long newLastFailedVersion = replica.getLastFailedVersion();
if (newLastFailedVersion < 0) {
newLastFailedVersion = replica.getVersion() + 1;
replica.updateLastFailedVersion(newLastFailedVersion);
LOG.warn("set missing version for replica {} of tablet {} on backend {}, "
+ "version in fe {}, version in be {}, be missing {}",
replica.getId(), tabletId, backendId, replica.getVersion(),
tTabletInfo.getVersion(), tTabletInfo.isVersionMiss());
}
replica.updateLastFailedVersion(newLastFailedVersion);
backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion);
break;
}

break;
}
}
} finally {
Expand Down
Loading