Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,7 @@ public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundExce
task.getSignature(), replica, task.getVersion());
boolean versionChanged = false;
if (replica.getVersion() < task.getVersion()) {
replica.updateVersionInfo(task.getVersion(), replica.getDataSize(), replica.getRemoteDataSize(),
replica.getRowCount());
replica.updateVersion(task.getVersion());
versionChanged = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1765,8 +1765,7 @@ private Status allTabletCommitted(boolean isReplay) {
for (Tablet tablet : idx.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
if (!replica.checkVersionCatchUp(part.getVisibleVersion(), false)) {
replica.updateVersionInfo(part.getVisibleVersion(), replica.getDataSize(),
replica.getRemoteDataSize(), replica.getRowCount());
replica.updateVersion(part.getVisibleVersion());
}
}
}
Expand Down
44 changes: 18 additions & 26 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,26 @@ public long getDataSize() {
return dataSize;
}

public void setDataSize(long dataSize) {
this.dataSize = dataSize;
}

public long getRemoteDataSize() {
return remoteDataSize;
}

public void setRemoteDataSize(long remoteDataSize) {
this.remoteDataSize = remoteDataSize;
}

public long getRowCount() {
return rowCount;
}

public void setRowCount(long rowCount) {
this.rowCount = rowCount;
}

public long getLastFailedVersion() {
return lastFailedVersion;
}
Expand Down Expand Up @@ -311,28 +323,13 @@ public void setFurtherRepairWatermarkTxnTd(long furtherRepairWatermarkTxnTd) {
this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd;
}

// for compatibility
public synchronized void updateStat(long dataSize, long rowNum) {
this.dataSize = dataSize;
this.rowCount = rowNum;
}

public synchronized void updateStat(long dataSize, long remoteDataSize, long rowNum, long versionCount) {
this.dataSize = dataSize;
this.remoteDataSize = remoteDataSize;
this.rowCount = rowNum;
this.versionCount = versionCount;
}

public synchronized void updateVersionInfo(long newVersion, long newDataSize, long newRemoteDataSize,
long newRowCount) {
updateReplicaInfo(newVersion, this.lastFailedVersion, this.lastSuccessVersion, newDataSize, newRemoteDataSize,
newRowCount);
public synchronized void updateVersion(long newVersion) {
updateReplicaVersion(newVersion, this.lastFailedVersion, this.lastSuccessVersion);
}

public synchronized void updateVersionWithFailedInfo(
public synchronized void updateVersionWithFailed(
long newVersion, long lastFailedVersion, long lastSuccessVersion) {
updateReplicaInfo(newVersion, lastFailedVersion, lastSuccessVersion, dataSize, remoteDataSize, rowCount);
updateReplicaVersion(newVersion, lastFailedVersion, lastSuccessVersion);
}

public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVersion, Long lastSuccessVersion,
Expand Down Expand Up @@ -395,9 +392,7 @@ public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVer
* the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash is 0 or some unknown number.
* We just reset the LFV(hash) to recovery this replica.
*/
private void updateReplicaInfo(long newVersion,
long lastFailedVersion, long lastSuccessVersion,
long newDataSize, long newRemoteDataSize, long newRowCount) {
private void updateReplicaVersion(long newVersion, long lastFailedVersion, long lastSuccessVersion) {
if (LOG.isDebugEnabled()) {
LOG.debug("before update: {}", this.toString());
}
Expand All @@ -422,9 +417,6 @@ private void updateReplicaInfo(long newVersion,
long oldLastFailedVersion = this.lastFailedVersion;

this.version = newVersion;
this.dataSize = newDataSize;
this.remoteDataSize = newRemoteDataSize;
this.rowCount = newRowCount;

// just check it
if (lastSuccessVersion <= this.version) {
Expand Down Expand Up @@ -485,7 +477,7 @@ private void updateReplicaInfo(long newVersion,
}

public synchronized void updateLastFailedVersion(long lastFailedVersion) {
updateReplicaInfo(this.version, lastFailedVersion, this.lastSuccessVersion, dataSize, remoteDataSize, rowCount);
updateReplicaVersion(this.version, lastFailedVersion, this.lastSuccessVersion);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,10 @@ private void updateTabletStat(Long beId, TTabletStatResult result) {
if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) {
Replica replica = invertedIndex.getReplica(stat.getTabletId(), beId);
if (replica != null) {
replica.updateStat(stat.getDataSize(), stat.getRemoteDataSize(), stat.getRowNum(),
stat.getVersionCount());
replica.setDataSize(stat.getDataSize());
replica.setRemoteDataSize(stat.getRemoteDataSize());
replica.setRowCount(stat.getRowNum());
replica.setVersionCount(stat.getVersionCount());
}
}
}
Expand All @@ -161,7 +163,8 @@ private void updateTabletStat(Long beId, TTabletStatResult result) {
continue;
}
// TODO(cmy) no db lock protected. I think it is ok even we get wrong row num
replica.updateStat(entry.getValue().getDataSize(), entry.getValue().getRowNum());
replica.setDataSize(entry.getValue().getDataSize());
replica.setRowCount(entry.getValue().getRowNum());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1136,8 +1136,10 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
"replica does not exist. backend id: " + destBackendId);
}

replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(),
reportedTablet.getRemoteDataSize(), reportedTablet.getRowCount());
replica.updateVersion(reportedTablet.getVersion());
replica.setDataSize(reportedTablet.getDataSize());
replica.setRemoteDataSize(reportedTablet.getRemoteDataSize());
replica.setRowCount(reportedTablet.getRowCount());
if (replica.getLastFailedVersion() > partition.getCommittedVersion()
&& reportedTablet.getVersion() >= partition.getCommittedVersion()
//&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,11 @@ private void unprotectUpdateReplica(OlapTable olapTable, ReplicaPersistInfo info
Tablet tablet = materializedIndex.getTablet(info.getTabletId());
Replica replica = tablet.getReplicaByBackendId(info.getBackendId());
Preconditions.checkNotNull(replica, info);
replica.updateVersionInfo(info.getVersion(), info.getDataSize(), info.getRemoteDataSize(), info.getRowCount());
replica.updateVersionWithFailed(info.getVersion(), info.getLastFailedVersion(),
info.getLastSuccessVersion());
replica.setDataSize(info.getDataSize());
replica.setRemoteDataSize(info.getRemoteDataSize());
replica.setRowCount(info.getRowCount());
replica.setBad(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,10 @@ private static void sync(Map<Long, TTablet> backendTablets, ListMultimap<Long, L
// happens when
// 1. PUSH finished in BE but failed or not yet report to FE
// 2. repair for VERSION_INCOMPLETE finished in BE, but failed or not yet report to FE
replica.updateVersionInfo(backendVersion, dataSize, remoteDataSize, rowCount);
replica.updateVersion(backendVersion);
replica.setDataSize(dataSize);
replica.setRemoteDataSize(remoteDataSize);
replica.setRowCount(rowCount);

if (replica.getLastFailedVersion() < 0) {
if (replica.setBad(false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1933,7 +1933,7 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat
lastFailedVersion = newCommitVersion;
}
}
replica.updateVersionWithFailedInfo(newVersion, lastFailedVersion, lastSuccessVersion);
replica.updateVersionWithFailed(newVersion, lastFailedVersion, lastSuccessVersion);
}
}
} // end for indices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,7 @@ public void testSchemaChange1() throws Exception {
MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
for (Tablet shadowTablet : shadowIndex.getTablets()) {
for (Replica shadowReplica : shadowTablet.getReplicas()) {
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
shadowReplica.getDataSize(),
shadowReplica.getRemoteDataSize(),
shadowReplica.getRowCount());
shadowReplica.updateVersion(testPartition.getVisibleVersion());
}
}

Expand Down Expand Up @@ -301,10 +298,7 @@ public void testSchemaChangeWhileTabletNotStable() throws Exception {
MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
for (Tablet shadowTablet : shadowIndex.getTablets()) {
for (Replica shadowReplica : shadowTablet.getReplicas()) {
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
shadowReplica.getDataSize(),
shadowReplica.getRemoteDataSize(),
shadowReplica.getRowCount());
shadowReplica.updateVersion(testPartition.getVisibleVersion());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ public void testSchemaChange1() throws Exception {
MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
for (Tablet shadowTablet : shadowIndex.getTablets()) {
for (Replica shadowReplica : shadowTablet.getReplicas()) {
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), shadowReplica.getDataSize(),
shadowReplica.getRemoteDataSize(), shadowReplica.getRowCount());
shadowReplica.updateVersion(testPartition.getVisibleVersion());
}
}

Expand Down Expand Up @@ -296,8 +295,7 @@ public void testSchemaChangeWhileTabletNotStable() throws Exception {
MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
for (Tablet shadowTablet : shadowIndex.getTablets()) {
for (Replica shadowReplica : shadowTablet.getReplicas()) {
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), shadowReplica.getDataSize(),
shadowReplica.getRemoteDataSize(), shadowReplica.getRowCount());
shadowReplica.updateVersion(testPartition.getVisibleVersion());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public void testShowReplicaDistribution() throws Exception {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
replica.updateStat(1024, 2);
replica.setDataSize(1024L);
replica.setRowCount(2L);
}
}
}
Expand Down
18 changes: 10 additions & 8 deletions fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public void getMethodTest() {
long newVersion = version + 1;
long newDataSize = dataSize + 100;
long newRowCount = rowCount + 10;
replica.updateVersionInfo(newVersion, newDataSize, 0, newRowCount);
replica.updateVersion(newVersion);
replica.setDataSize(newDataSize);
replica.setRowCount(newRowCount);
Assert.assertEquals(newVersion, replica.getVersion());
Assert.assertEquals(newDataSize, replica.getDataSize());
Assert.assertEquals(newRowCount, replica.getRowCount());
Expand Down Expand Up @@ -132,14 +134,14 @@ public void testSerialization() throws Exception {
public void testUpdateVersion1() {
Replica originalReplica = new Replica(10000, 20000, 3, 0, 100, 0, 78, ReplicaState.NORMAL, 0, 3);
// new version is little than original version, it is invalid the version will not update
originalReplica.updateVersionInfo(2, 100, 0, 78);
originalReplica.updateVersion(2L);
Assert.assertEquals(3, originalReplica.getVersion());
}

@Test
public void testUpdateVersion2() {
Replica originalReplica = new Replica(10000, 20000, 3, 0, 100, 0, 78, ReplicaState.NORMAL, 0, 0);
originalReplica.updateVersionInfo(3, 100, 0, 78);
originalReplica.updateVersion(3L);
// if new version >= current version and last success version <= new version, then last success version should be updated
Assert.assertEquals(3, originalReplica.getLastSuccessVersion());
Assert.assertEquals(3, originalReplica.getVersion());
Expand All @@ -155,15 +157,15 @@ public void testUpdateVersion3() {
Assert.assertEquals(8, originalReplica.getLastFailedVersion());

// update last success version 10
originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(),
originalReplica.updateVersionWithFailed(originalReplica.getVersion(),
originalReplica.getLastFailedVersion(),
10);
Assert.assertEquals(10, originalReplica.getLastSuccessVersion());
Assert.assertEquals(3, originalReplica.getVersion());
Assert.assertEquals(8, originalReplica.getLastFailedVersion());

// update version to 8, the last success version and version should be 10
originalReplica.updateVersionInfo(8, 100, 0, 78);
originalReplica.updateVersion(8);
Assert.assertEquals(10, originalReplica.getLastSuccessVersion());
Assert.assertEquals(10, originalReplica.getVersion());
Assert.assertEquals(-1, originalReplica.getLastFailedVersion());
Expand All @@ -175,7 +177,7 @@ public void testUpdateVersion3() {
Assert.assertEquals(12, originalReplica.getLastFailedVersion());

// update last success version to 15
originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(),
originalReplica.updateVersionWithFailed(originalReplica.getVersion(),
originalReplica.getLastFailedVersion(),
15);
Assert.assertEquals(15, originalReplica.getLastSuccessVersion());
Expand All @@ -189,13 +191,13 @@ public void testUpdateVersion3() {
Assert.assertEquals(18, originalReplica.getLastFailedVersion());

// update version to 17 then version and success version is 17
originalReplica.updateVersionInfo(17, 100, 0, 78);
originalReplica.updateVersion(17L);
Assert.assertEquals(17, originalReplica.getLastSuccessVersion());
Assert.assertEquals(17, originalReplica.getVersion());
Assert.assertEquals(18, originalReplica.getLastFailedVersion());

// update version to 18, then version and last success version should be 18 and failed version should be -1
originalReplica.updateVersionInfo(18, 100, 0, 78);
originalReplica.updateVersion(18L);
Assert.assertEquals(18, originalReplica.getLastSuccessVersion());
Assert.assertEquals(18, originalReplica.getVersion());
Assert.assertEquals(-1, originalReplica.getLastFailedVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public void testDiskReblanceWhenSchedulerIdle() throws Exception {
Lists.newArrayList(tablet.getReplicas()).forEach(
replica -> {
if (replica.getBackendId() == backends.get(1).getId()) {
replica.updateStat(totalCapacity / 4, 1);
replica.setDataSize(totalCapacity / 4);
replica.setRowCount(1L);
tablet.deleteReplica(replica);
replica.setBackendId(backends.get(0).getId());
replica.setPathHash(diskInfo0.getPathHash());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex
replica.setPathHash(beIds.get(i));
if (replicaSizes != null) {
// for disk rebalancer, every beId corresponding to a replicaSize
replica.updateStat(replicaSizes.get(i), 0);
replica.setDataSize(replicaSizes.get(i));
}
// isRestore set true, to avoid modifying Catalog.getCurrentInvertedIndex
tablet.addReplica(replica, true);
Expand Down Expand Up @@ -164,7 +164,8 @@ public static void updateReplicaDataSize(long minReplicaSize, int tableSkew, in
for (Tablet tablet : idx.getTablets()) {
long tabletSize = tableBaseSize * (1 + random.nextInt(tabletSkew));
for (Replica replica : tablet.getReplicas()) {
replica.updateStat(tabletSize, 1000L);
replica.setDataSize(tabletSize);
replica.setRowCount(1000L);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private TableInfo prepareTableForTest(String tableName) throws Exception {
long visibleVersion = 2L;
partition.updateVisibleVersion(visibleVersion);
partition.setNextVersion(visibleVersion + 1);
tablet.getReplicas().forEach(replica -> replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L));
tablet.getReplicas().forEach(replica -> replica.updateVersion(visibleVersion));

Replica replica = tablet.getReplicas().iterator().next();
Assertions.assertEquals(visibleVersion, replica.getVersion());
Expand Down
Loading