diff --git a/fe/src/main/java/org/apache/doris/alter/RollupHandler.java b/fe/src/main/java/org/apache/doris/alter/RollupHandler.java index 06bb9d34f411ac..e0cf3cb88a663f 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupHandler.java @@ -363,7 +363,7 @@ private void processAddRollup(AddRollupClause alterClause, Database db, OlapTabl // has to set failed verison and version hash here, because there will be no load after rollup // so that if not set here, last failed version will not be set rollupReplica.updateVersionInfo(rollupReplica.getVersion(), rollupReplica.getVersionHash(), - partition.getCurrentVersion(), partition.getCurrentVersionHash(), + partition.getCommittedVersion(), partition.getCommittedVersionHash(), rollupReplica.getLastSuccessVersion(), rollupReplica.getLastSuccessVersionHash()); if (isRestore) { rollupReplica.setState(ReplicaState.NORMAL); diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJob.java b/fe/src/main/java/org/apache/doris/alter/RollupJob.java index 5823497a41d5e8..38d1cd5d3183d6 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJob.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJob.java @@ -229,7 +229,7 @@ public synchronized void updateRollupReplicaInfo(long partitionId, long indexId, throw new MetaNotFoundException("cannot find replica in tablet[" + tabletId + "], backend[" + backendId + "]"); } - replica.updateInfo(version, versionHash, dataSize, rowCount); + replica.updateVersionInfo(version, versionHash, dataSize, rowCount); LOG.debug("rollup replica[{}] info updated. schemaHash:{}", replica.getId(), schemaHash); } @@ -587,7 +587,7 @@ public synchronized void handleFinishedReplica(AgentTask task, TTabletInfo finis // yiguolei: not check version here because the replica's first version will be set by rollup job // the version is not set now // the finish task thread doesn't own db lock here, maybe a bug? - rollupReplica.updateInfo(version, versionHash, dataSize, rowCount); + rollupReplica.updateVersionInfo(version, versionHash, dataSize, rowCount); setReplicaFinished(partitionId, rollupReplicaId); rollupReplica.setState(ReplicaState.NORMAL); @@ -735,7 +735,7 @@ public int tryFinishJob() { // 3. add rollup finished version to base index MaterializedIndex baseIndex = partition.getIndex(baseIndexId); if (baseIndex != null) { - baseIndex.setRollupIndexInfo(rollupIndexId, partition.getCommittedVersion()); + baseIndex.setRollupIndexInfo(rollupIndexId, partition.getVisibleVersion()); } Preconditions.checkState(partition.getState() == PartitionState.ROLLUP); partition.setState(PartitionState.NORMAL); @@ -840,7 +840,7 @@ public void replayFinishing(Database db) { MaterializedIndex baseIndex = partition.getIndex(baseIndexId); if (baseIndex != null) { - baseIndex.setRollupIndexInfo(rollupIndexId, partition.getCommittedVersion()); + baseIndex.setRollupIndexInfo(rollupIndexId, partition.getVisibleVersion()); } partition.createRollupIndex(rollupIndex); diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java index 3bbf64c9a714aa..b34e689b97f11c 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java @@ -599,7 +599,7 @@ public void handleFinishedReplica(AgentTask task, TTabletInfo finishTabletInfo, long dataSize = finishTabletInfo.getData_size(); long rowCount = finishTabletInfo.getRow_count(); // do not need check version > replica.getVersion, because the new replica's version is first set by sc - replica.updateInfo(version, versionHash, dataSize, rowCount); + replica.updateVersionInfo(version, versionHash, dataSize, rowCount); } finally { db.writeUnlock(); } diff --git a/fe/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/src/main/java/org/apache/doris/backup/BackupJob.java index 2af98ad1a95ca4..e0586930aa27ab 100644 --- a/fe/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/src/main/java/org/apache/doris/backup/BackupJob.java @@ -17,6 +17,16 @@ package org.apache.doris.backup; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; +import com.google.common.base.Strings; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import org.apache.doris.analysis.TableRef; import org.apache.doris.backup.Status.ErrCode; import org.apache.doris.catalog.BrokerMgr.BrokerAddress; @@ -40,17 +50,6 @@ import org.apache.doris.thrift.TFinishTaskRequest; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TTaskType; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; -import com.google.common.base.Strings; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Collections2; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -377,25 +376,25 @@ private void prepareAndSendSnapshotTask() { // snapshot partitions for (Partition partition : partitions) { - long committedVersion = partition.getCommittedVersion(); - long committedVersionHash = partition.getCommittedVersionHash(); + long visibleVersion = partition.getVisibleVersion(); + long visibleVersionHash = partition.getVisibleVersionHash(); List indexes = partition.getMaterializedIndices(); for (MaterializedIndex index : indexes) { int schemaHash = tbl.getSchemaHashByIndexId(index.getId()); List tablets = index.getTablets(); for (Tablet tablet : tablets) { - Replica replica = chooseReplica(tablet, committedVersion, committedVersionHash); + Replica replica = chooseReplica(tablet, visibleVersion, visibleVersionHash); if (replica == null) { status = new Status(ErrCode.COMMON_ERROR, "faild to choose replica to make snapshot for tablet " + tablet.getId() - + ". committed version: " + committedVersion - + ", committed version hash: " + committedVersionHash); + + ". visible version: " + visibleVersion + + ", visible version hash: " + visibleVersionHash); return; } SnapshotTask task = new SnapshotTask(null, replica.getBackendId(), tablet.getId(), jobId, dbId, tbl.getId(), partition.getId(), index.getId(), tablet.getId(), - committedVersion, committedVersionHash, + visibleVersion, visibleVersionHash, schemaHash, timeoutMs, false /* not restore task */); batchTask.addTask(task); unfinishedTaskIds.add(tablet.getId()); @@ -403,7 +402,7 @@ private void prepareAndSendSnapshotTask() { } LOG.info("snapshot for partition {}, version: {}, version hash: {}", - partition.getId(), committedVersion, committedVersionHash); + partition.getId(), visibleVersion, visibleVersionHash); } } diff --git a/fe/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/src/main/java/org/apache/doris/backup/BackupJobInfo.java index ac4792dd8fd996..c0038f0db4aeab 100644 --- a/fe/src/main/java/org/apache/doris/backup/BackupJobInfo.java +++ b/fe/src/main/java/org/apache/doris/backup/BackupJobInfo.java @@ -242,8 +242,8 @@ public static BackupJobInfo fromCatalog(long backupTime, String label, String db BackupPartitionInfo partitionInfo = new BackupPartitionInfo(); partitionInfo.id = partition.getId(); partitionInfo.name = partition.getName(); - partitionInfo.version = partition.getCommittedVersion(); - partitionInfo.versionHash = partition.getCommittedVersionHash(); + partitionInfo.version = partition.getVisibleVersion(); + partitionInfo.versionHash = partition.getVisibleVersionHash(); tableInfo.partitions.put(partitionInfo.name, partitionInfo); // indexes for (MaterializedIndex index : partition.getMaterializedIndices()) { diff --git a/fe/src/main/java/org/apache/doris/backup/BackupJob_D.java b/fe/src/main/java/org/apache/doris/backup/BackupJob_D.java index 1ec133dbf8898e..c642efb25215bd 100644 --- a/fe/src/main/java/org/apache/doris/backup/BackupJob_D.java +++ b/fe/src/main/java/org/apache/doris/backup/BackupJob_D.java @@ -365,8 +365,8 @@ private void getMeta(Database db, Map> pathToWr // save version info partitionIdToVersionInfo.put(partitionId, - new Pair(partition.getCommittedVersion(), - partition.getCommittedVersionHash())); + new Pair(partition.getVisibleVersion(), + partition.getVisibleVersionHash())); } } else { Preconditions.checkState(partitionIds.size() == 1); @@ -374,8 +374,8 @@ private void getMeta(Database db, Map> pathToWr Partition partition = olapTable.getPartition(partitionId); // save version info partitionIdToVersionInfo.put(partitionId, - new Pair(partition.getCommittedVersion(), - partition.getCommittedVersionHash())); + new Pair(partition.getVisibleVersion(), + partition.getVisibleVersionHash())); } } } // end for tables diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java index 80cb86e6ea90ec..6ac67862bed878 100644 --- a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -708,7 +708,7 @@ private void checkAndPrepareMeta() { SnapshotTask task = new SnapshotTask(null, replica.getBackendId(), signature, jobId, db.getId(), tbl.getId(), part.getId(), index.getId(), tablet.getId(), - part.getCommittedVersion(), part.getCommittedVersionHash(), + part.getVisibleVersion(), part.getVisibleVersionHash(), tbl.getSchemaHashByIndexId(index.getId()), timeoutMs, true /* is restore task*/); batchTask.addTask(task); @@ -763,8 +763,8 @@ private Partition resetPartitionForRestore(OlapTable localTbl, OlapTable remoteT } // save version info for creating replicas - long committedVersion = remotePart.getCommittedVersion(); - long committedVersionHash = remotePart.getCommittedVersionHash(); + long visibleVersion = remotePart.getVisibleVersion(); + long visibleVersionHash = remotePart.getVisibleVersionHash(); // tablets for (MaterializedIndex remoteIdx : remotePart.getMaterializedIndices()) { @@ -789,7 +789,7 @@ private Partition resetPartitionForRestore(OlapTable localTbl, OlapTable remoteT for (Long beId : beIds) { long newReplicaId = catalog.getNextId(); Replica newReplica = new Replica(newReplicaId, beId, ReplicaState.NORMAL, - committedVersion, committedVersionHash); + visibleVersion, visibleVersionHash); newTablet.addReplica(newReplica, true /* is restore */); } } @@ -1141,15 +1141,15 @@ private Status allTabletCommitted(boolean isReplay) { } // update partition committed version - part.updateCommitVersionAndVersionHash(entry.getValue().first, entry.getValue().second); + part.updateVisibleVersionAndVersionHash(entry.getValue().first, entry.getValue().second); // we also need to update the replica version of these overwritten restored partitions for (MaterializedIndex idx : part.getMaterializedIndices()) { for (Tablet tablet : idx.getTablets()) { for (Replica replica : tablet.getReplicas()) { - if (!replica.checkVersionCatchUp(part.getCommittedVersion(), - part.getCommittedVersionHash())) { - replica.updateInfo(part.getCommittedVersion(), part.getCommittedVersionHash(), + if (!replica.checkVersionCatchUp(part.getVisibleVersion(), + part.getVisibleVersionHash())) { + replica.updateVersionInfo(part.getVisibleVersion(), part.getVisibleVersionHash(), replica.getDataSize(), replica.getRowCount()); } } diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index e79aa1fa4e0a9f..d95baeccb625ef 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -3097,10 +3097,10 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long // version and version hash if (versionInfo != null) { - partition.updateCommitVersionAndVersionHash(versionInfo.first, versionInfo.second); + partition.updateVisibleVersionAndVersionHash(versionInfo.first, versionInfo.second); } - long version = partition.getCommittedVersion(); - long versionHash = partition.getCommittedVersionHash(); + long version = partition.getVisibleVersion(); + long versionHash = partition.getVisibleVersionHash(); for (Map.Entry entry : indexMap.entrySet()) { long indexId = entry.getKey(); @@ -3640,7 +3640,7 @@ public static void getDdlStmt(Table table, List createTableStmt, List createTableStmt, List 0) { sb.append(", \"replication_num\" = \"").append(replicationNum).append("\""); diff --git a/fe/src/main/java/org/apache/doris/catalog/CatalogIdGenerator.java b/fe/src/main/java/org/apache/doris/catalog/CatalogIdGenerator.java index 4aa9bb933c1cfe..70c94c728b072c 100644 --- a/fe/src/main/java/org/apache/doris/catalog/CatalogIdGenerator.java +++ b/fe/src/main/java/org/apache/doris/catalog/CatalogIdGenerator.java @@ -45,7 +45,10 @@ public synchronized long getNextId() { return nextId++; } else { batchEndId = batchEndId + BATCH_ID_INTERVAL; - editLog.logSaveNextId(batchEndId); + if (editLog != null) { + // add this check just for unit test + editLog.logSaveNextId(batchEndId); + } return nextId++; } } diff --git a/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java index 4b5cf2c2d4aef5..e835ca5c26b186 100644 --- a/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -76,7 +76,7 @@ private static List> getTabletStatus(String dbName, String tblName, for (String partName : partitions) { Partition partition = olapTable.getPartition(partName); - long committedVersion = partition.getCommittedVersion(); + long visibleVersion = partition.getVisibleVersion(); short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId()); for (MaterializedIndex index : partition.getMaterializedIndices()) { @@ -92,7 +92,7 @@ private static List> getTabletStatus(String dbName, String tblName, if (be == null || !be.isAvailable()) { status = ReplicaStatus.DEAD; } else { - if (replica.getVersion() < committedVersion + if (replica.getVersion() < visibleVersion || replica.getLastFailedVersion() > 0) { status = ReplicaStatus.VERSION_ERROR; } @@ -108,7 +108,7 @@ private static List> getTabletStatus(String dbName, String tblName, row.add(String.valueOf(replica.getVersion())); row.add(String.valueOf(replica.getLastFailedVersion())); row.add(String.valueOf(replica.getLastSuccessVersion())); - row.add(String.valueOf(committedVersion)); + row.add(String.valueOf(visibleVersion)); row.add(String.valueOf(replica.getVersionCount())); row.add(replica.getState().name()); row.add(status.name()); diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java index ef7acf9e568fa6..d2483f1e8f6fab 100644 --- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -322,7 +322,7 @@ public Status resetIdsForRestore(Catalog catalog, Database db, int restoreReplic for (Long beId : beIds) { long newReplicaId = catalog.getNextId(); Replica replica = new Replica(newReplicaId, beId, ReplicaState.NORMAL, - partition.getCommittedVersion(), partition.getCommittedVersionHash()); + partition.getVisibleVersion(), partition.getVisibleVersionHash()); newTablet.addReplica(replica, true /* is restore */); } } @@ -586,8 +586,8 @@ public AlterTableStmt toAddPartitionStmt(String dbName, String partitionName) { Partition partition = nameToPartition.get(partitionName); Map properties = Maps.newHashMap(); - long version = partition.getCommittedVersion(); - long versionHash = partition.getCommittedVersionHash(); + long version = partition.getVisibleVersion(); + long versionHash = partition.getVisibleVersionHash(); properties.put(PropertyAnalyzer.PROPERTIES_VERSION_INFO, version + "," + versionHash); properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM, String.valueOf(partitionInfo.getReplicationNum(partition.getId()))); @@ -624,8 +624,8 @@ public CreateTableStmt toCreateTableStmt(String dbName) { // and partition version info here for non-partitioned table Partition partition = getPartition(name); Preconditions.checkNotNull(partition); - long version = partition.getCommittedVersion(); - long versionHash = partition.getCommittedVersionHash(); + long version = partition.getVisibleVersion(); + long versionHash = partition.getVisibleVersionHash(); String versionProp = Joiner.on(",").join(version, versionHash); properties.put(PropertyAnalyzer.PROPERTIES_VERSION_INFO, versionProp); } diff --git a/fe/src/main/java/org/apache/doris/catalog/Partition.java b/fe/src/main/java/org/apache/doris/catalog/Partition.java index f5c85f9a3982d8..4359f68211c72a 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/src/main/java/org/apache/doris/catalog/Partition.java @@ -51,13 +51,19 @@ public enum PartitionState { private MaterializedIndex baseIndex; private Map idToRollupIndex; - private long committedVersion; - private long committedVersionHash; + /* + * committed version(hash): after txn is committed, set committed version(hash) + * visible version(hash): after txn is published, set visible version + * next version(hash): next version is set after finished committing, it should equals to committed version + 1 + */ + + // not have committedVersion because committedVersion = nextVersion - 1 + private long committedVersionHash; + private long visibleVersion; + private long visibleVersionHash; private long nextVersion; private long nextVersionHash; - // not have currentVersion because currentVersion = nextVersion - 1 - private long currentVersionHash; private DistributionInfo distributionInfo; @@ -74,13 +80,14 @@ public Partition(long id, String name, this.baseIndex = baseIndex; this.idToRollupIndex = new HashMap(); - this.committedVersion = PARTITION_INIT_VERSION; - this.committedVersionHash = PARTITION_INIT_VERSION_HASH; - this.distributionInfo = distributionInfo; + this.visibleVersion = PARTITION_INIT_VERSION; + this.visibleVersionHash = PARTITION_INIT_VERSION_HASH; // PARTITION_INIT_VERSION == 1, so the first load version is 2 !!! this.nextVersion = PARTITION_INIT_VERSION + 1; this.nextVersionHash = Util.generateVersionHash(); - this.currentVersionHash = PARTITION_INIT_VERSION_HASH; + this.committedVersionHash = PARTITION_INIT_VERSION_HASH; + + this.distributionInfo = distributionInfo; } public void setIdForRestore(long id) { @@ -103,30 +110,30 @@ public void setState(PartitionState state) { this.state = state; } - public void updateCommitVersionAndVersionHash(long committedVersion, long committedVersionHash) { - this.committedVersion = committedVersion; - this.committedVersionHash = committedVersionHash; + public void updateVisibleVersionAndVersionHash(long visibleVersion, long visibleVersionHash) { + this.visibleVersion = visibleVersion; + this.visibleVersionHash = visibleVersionHash; // if it is upgrade from old palo cluster, then should update next version info if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_45) { - // the partition is created and not import any data - if (committedVersion == PARTITION_INIT_VERSION + 1 && committedVersionHash == PARTITION_INIT_VERSION_HASH) { + // the partition is created and not import any data + if (visibleVersion == PARTITION_INIT_VERSION + 1 && visibleVersionHash == PARTITION_INIT_VERSION_HASH) { this.nextVersion = PARTITION_INIT_VERSION + 1; this.nextVersionHash = Util.generateVersionHash(); - this.currentVersionHash = PARTITION_INIT_VERSION_HASH; + this.committedVersionHash = PARTITION_INIT_VERSION_HASH; } else { - this.nextVersion = committedVersion + 1; + this.nextVersion = visibleVersion + 1; this.nextVersionHash = Util.generateVersionHash(); - this.currentVersionHash = committedVersionHash; + this.committedVersionHash = visibleVersionHash; } } } - public long getCommittedVersion() { - return committedVersion; + public long getVisibleVersion() { + return visibleVersion; } - public long getCommittedVersionHash() { - return committedVersionHash; + public long getVisibleVersionHash() { + return visibleVersionHash; } public PartitionState getState() { @@ -161,17 +168,17 @@ public long getNextVersionHash() { return nextVersionHash; } - public void setNextVersionHash(long nextVersionHash, long currentVersionHash) { - this.currentVersionHash = currentVersionHash; + public void setNextVersionHash(long nextVersionHash, long committedVersionHash) { this.nextVersionHash = nextVersionHash; + this.committedVersionHash = committedVersionHash; } - public long getCurrentVersion() { + public long getCommittedVersion() { return Math.max(this.nextVersion - 1, 2); } - public long getCurrentVersionHash() { - return currentVersionHash; + public long getCommittedVersionHash() { + return committedVersionHash; } public List getRollupIndices() { @@ -224,12 +231,12 @@ public void write(DataOutput out) throws IOException { } } - out.writeLong(committedVersion); - out.writeLong(committedVersionHash); + out.writeLong(visibleVersion); + out.writeLong(visibleVersionHash); out.writeLong(nextVersion); out.writeLong(nextVersionHash); - out.writeLong(currentVersionHash); + out.writeLong(committedVersionHash); Text.writeString(out, distributionInfo.getType().name()); distributionInfo.write(out); @@ -250,22 +257,22 @@ public void readFields(DataInput in) throws IOException { idToRollupIndex.put(rollupTable.getId(), rollupTable); } - committedVersion = in.readLong(); - committedVersionHash = in.readLong(); + visibleVersion = in.readLong(); + visibleVersionHash = in.readLong(); if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_45) { nextVersion = in.readLong(); nextVersionHash = in.readLong(); - currentVersionHash = in.readLong(); + committedVersionHash = in.readLong(); } else { // the partition is created and not import any data - if (committedVersion == PARTITION_INIT_VERSION + 1 && committedVersionHash == PARTITION_INIT_VERSION_HASH) { + if (visibleVersion == PARTITION_INIT_VERSION + 1 && visibleVersionHash == PARTITION_INIT_VERSION_HASH) { this.nextVersion = PARTITION_INIT_VERSION + 1; this.nextVersionHash = Util.generateVersionHash(); - this.currentVersionHash = PARTITION_INIT_VERSION_HASH; + this.committedVersionHash = PARTITION_INIT_VERSION_HASH; } else { - this.nextVersion = committedVersion + 1; + this.nextVersion = visibleVersion + 1; this.nextVersionHash = Util.generateVersionHash(); - this.currentVersionHash = committedVersionHash; + this.committedVersionHash = visibleVersionHash; } } DistributionInfoType distriType = DistributionInfoType.valueOf(Text.readString(in)); @@ -302,8 +309,8 @@ public boolean equals(Object obj) { } } - return (committedVersion == partition.committedVersion) - && (committedVersionHash == partition.committedVersionHash) + return (visibleVersion == partition.visibleVersion) + && (visibleVersionHash == partition.visibleVersionHash) && (baseIndex.equals(partition.baseIndex) && distributionInfo.eqauls(partition.distributionInfo)); } @@ -326,8 +333,8 @@ public String toString() { } } - buffer.append("committedVersion: ").append(committedVersion).append("; "); - buffer.append("committedVersionHash: ").append(committedVersionHash).append("; "); + buffer.append("committedVersion: ").append(visibleVersion).append("; "); + buffer.append("committedVersionHash: ").append(visibleVersionHash).append("; "); buffer.append("distribution_info.type: ").append(distributionInfo.getType().name()).append("; "); buffer.append("distribution_info: ").append(distributionInfo.toString()); diff --git a/fe/src/main/java/org/apache/doris/catalog/Replica.java b/fe/src/main/java/org/apache/doris/catalog/Replica.java index 314516db548431..f60460572fb78b 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/src/main/java/org/apache/doris/catalog/Replica.java @@ -166,7 +166,7 @@ public synchronized void updateStat(long dataSize, long rowNum) { this.rowCount = rowNum; } - public synchronized void updateInfo(long newVersion, long newVersionHash, long newDataSize, long newRowCount) { + public synchronized void updateVersionInfo(long newVersion, long newVersionHash, long newDataSize, long newRowCount) { updateReplicaInfo(newVersion, newVersionHash, this.lastFailedVersion, this.lastFailedVersionHash, this.lastSuccessVersion, this.lastSuccessVersionHash, newDataSize, newRowCount); } diff --git a/fe/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index 5c41f908ab394c..ee5540579ed8bf 100644 --- a/fe/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -114,8 +114,8 @@ protected void runOneCycle() { OlapTable olapTable = (OlapTable) table; for (Partition partition : olapTable.getPartitions()) { - long version = partition.getCommittedVersion(); - long versionHash = partition.getCommittedVersionHash(); + long version = partition.getVisibleVersion(); + long versionHash = partition.getVisibleVersionHash(); for (MaterializedIndex index : partition.getMaterializedIndices()) { long indexRowCount = 0L; for (Tablet tablet : index.getTablets()) { diff --git a/fe/src/main/java/org/apache/doris/clone/Clone.java b/fe/src/main/java/org/apache/doris/clone/Clone.java index 72aa17759f6177..8b43fc0474ed3c 100644 --- a/fe/src/main/java/org/apache/doris/clone/Clone.java +++ b/fe/src/main/java/org/apache/doris/clone/Clone.java @@ -495,7 +495,7 @@ public void finishCloneJob(CloneTask task, TTabletInfo tabletInfo) { } replica.setState(ReplicaState.NORMAL); - replica.updateInfo(version, versionHash, dataSize, rowCount); + replica.updateVersionInfo(version, versionHash, dataSize, rowCount); job.setCloneFinishTimeMs(System.currentTimeMillis()); job.setState(JobState.FINISHED); diff --git a/fe/src/main/java/org/apache/doris/clone/CloneChecker.java b/fe/src/main/java/org/apache/doris/clone/CloneChecker.java index 91bc0b4b1bede8..58a807d0fb19fd 100644 --- a/fe/src/main/java/org/apache/doris/clone/CloneChecker.java +++ b/fe/src/main/java/org/apache/doris/clone/CloneChecker.java @@ -17,6 +17,14 @@ package org.apache.doris.clone; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.DataProperty; import org.apache.doris.catalog.Database; @@ -48,15 +56,6 @@ import org.apache.doris.task.CloneTask; import org.apache.doris.thrift.TBackend; import org.apache.doris.thrift.TStorageMedium; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.LinkedHashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -972,9 +971,9 @@ private void deleteRedundantReplicas(Database db, TabletInfo tabletInfo, // and the remaining replica is not quorum if (replica.getState() != ReplicaState.CLONE && replica.getLastFailedVersion() < 0 - && (replica.getVersion() == partition.getCommittedVersion() - && replica.getVersionHash() == partition.getCommittedVersionHash() - || replica.getVersion() > partition.getCommittedVersionHash())) { + && (replica.getVersion() == partition.getVisibleVersion() + && replica.getVersionHash() == partition.getVisibleVersionHash() + || replica.getVersion() > partition.getVisibleVersionHash())) { ++realReplicaNum; } } @@ -998,8 +997,8 @@ public int compare(Replica arg0, Replica arg1) { } }); - long committedVersion = partition.getCommittedVersion(); - long committedVersionHash = partition.getCommittedVersionHash(); + long committedVersion = partition.getVisibleVersion(); + long committedVersionHash = partition.getVisibleVersionHash(); int deleteNum = replicas.size() - replicationNum; Replica deletedReplica = null; while (deleteNum > 0) { @@ -1261,8 +1260,8 @@ private void runCloneJob(CloneJob job) { short replicationNum = 0; short onlineReplicaNum = 0; short onlineReplicaNumInCluster = 0; - long committedVersion = -1L; - long committedVersionHash = -1L; + long visibleVersion = -1L; + long visibleVersionHash = -1L; int schemaHash = 0; List srcBackends = new ArrayList(); Tablet tablet = null; @@ -1346,8 +1345,8 @@ private void runCloneJob(CloneJob job) { } // sort replica by version desc - committedVersion = partition.getCommittedVersion(); - committedVersionHash = partition.getCommittedVersionHash(); + visibleVersion = partition.getVisibleVersion(); + visibleVersionHash = partition.getVisibleVersionHash(); Tablet.sortReplicaByVersionDesc(sortedReplicas); for (Replica replica : sortedReplicas) { backend = clusterInfoService.getBackend(replica.getBackendId()); @@ -1362,13 +1361,13 @@ private void runCloneJob(CloneJob job) { } // DO NOT choose replica with stale version or invalid version hash if (job.getType() != JobType.CATCHUP) { - if (replica.getVersion() > committedVersion || (replica.getVersion() == committedVersion - && replica.getVersionHash() == committedVersionHash)) { + if (replica.getVersion() > visibleVersion || (replica.getVersion() == visibleVersion + && replica.getVersionHash() == visibleVersionHash)) { srcBackends.add(new TBackend(backend.getHost(), backend.getBePort(), backend.getHttpPort())); } else { - LOG.debug("replica [{}] the version not equal to large than commit version {}" + LOG.debug("replica [{}] the version not equal to large than visible version {}" + " or commit version hash {}, ignore this replica", - replica, committedVersion, committedVersionHash); + replica, visibleVersion, visibleVersionHash); } } else { // deal with this case @@ -1378,11 +1377,11 @@ private void runCloneJob(CloneJob job) { // then C comes up, the partition's committed version is 10, then C try to clone 10, then clone finished // but last failed version is 11, it is abnormal // the publish will still fail - if (replica.getVersion() > committedVersion - || replica.getVersion() == committedVersion - && replica.getVersionHash() != committedVersionHash) { - committedVersion = replica.getVersion(); - committedVersionHash = replica.getVersionHash(); + if (replica.getVersion() > visibleVersion + || replica.getVersion() == visibleVersion + && replica.getVersionHash() != visibleVersionHash) { + visibleVersion = replica.getVersion(); + visibleVersionHash = replica.getVersionHash(); } // if this is a catchup job, then should exclude the dest backend id from src backends if (job.getDestBackendId() != backend.getId() @@ -1419,8 +1418,8 @@ private void runCloneJob(CloneJob job) { // and another clone task will send to the replica to clone again // not find a more sufficient method cloneReplica = new Replica(replicaId, job.getDestBackendId(), -1, 0, - -1, -1, ReplicaState.CLONE, partition.getCurrentVersion(), - partition.getCurrentVersionHash(), -1, 0); + -1, -1, ReplicaState.CLONE, partition.getCommittedVersion(), + partition.getCommittedVersionHash(), -1, 0); tablet.addReplica(cloneReplica); } // set the replica's state to clone @@ -1436,7 +1435,7 @@ private void runCloneJob(CloneJob job) { AgentBatchTask batchTask = new AgentBatchTask(); // very important, it is partition's commit version here CloneTask task = new CloneTask(job.getDestBackendId(), dbId, tableId, partitionId, indexId, tabletId, - schemaHash, srcBackends, storageMedium, committedVersion, committedVersionHash); + schemaHash, srcBackends, storageMedium, visibleVersion, visibleVersionHash); batchTask.addTask(task); if (clone.runCloneJob(job, task)) { AgentTaskExecutor.submit(batchTask); diff --git a/fe/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java index 4430f16a5e1449..26c5361deca8bd 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java @@ -17,26 +17,26 @@ package org.apache.doris.common.proc; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Range; + import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DataProperty; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.RangePartitionInfo; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; import org.apache.doris.catalog.Table.TableType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.TimeUtils; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Range; - import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -47,7 +47,7 @@ */ public class PartitionsProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("PartitionId").add("PartitionName").add("CommittedVersion").add("CommittedVersionHash") + .add("PartitionId").add("PartitionName").add("VisibleVersion").add("VisibleVersionHash") .add("State").add("PartitionKey").add("Range").add("DistributionKey") .add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime") .add("LastConsistencyCheckTime") @@ -85,8 +85,8 @@ public ProcResult fetchResult() throws AnalysisException { String partitionName = partition.getName(); partitionInfo.add(partitionId); partitionInfo.add(partitionName); - partitionInfo.add(partition.getCommittedVersion()); - partitionInfo.add(partition.getCommittedVersionHash()); + partitionInfo.add(partition.getVisibleVersion()); + partitionInfo.add(partition.getVisibleVersionHash()); partitionInfo.add(partition.getState()); // partition @@ -136,8 +136,8 @@ public ProcResult fetchResult() throws AnalysisException { long partitionId = partition.getId(); partitionInfo.add(partitionId); partitionInfo.add(partitionName); - partitionInfo.add(partition.getCommittedVersion()); - partitionInfo.add(partition.getCommittedVersionHash()); + partitionInfo.add(partition.getVisibleVersion()); + partitionInfo.add(partition.getVisibleVersionHash()); partitionInfo.add(partition.getState()); // partition diff --git a/fe/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java b/fe/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java index af37c99d12218c..0b657c1f29e09d 100644 --- a/fe/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java +++ b/fe/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java @@ -164,8 +164,8 @@ public boolean sendTasks() { return false; } - checkedVersion = partition.getCommittedVersion(); - checkedVersionHash = partition.getCommittedVersionHash(); + checkedVersion = partition.getVisibleVersion(); + checkedVersionHash = partition.getVisibleVersionHash(); checkedSchemaHash = olapTable.getSchemaHashByIndexId(tabletMeta.getIndexId()); int sentTaskReplicaNum = 0; diff --git a/fe/src/main/java/org/apache/doris/consistency/ConsistencyChecker.java b/fe/src/main/java/org/apache/doris/consistency/ConsistencyChecker.java index 8fdb765e00da17..3e329a5cc531ff 100644 --- a/fe/src/main/java/org/apache/doris/consistency/ConsistencyChecker.java +++ b/fe/src/main/java/org/apache/doris/consistency/ConsistencyChecker.java @@ -290,7 +290,7 @@ private long chooseTablet() { } // check if this partition has no data - if (partition.getCommittedVersion() == Partition.PARTITION_INIT_VERSION) { + if (partition.getVisibleVersion() == Partition.PARTITION_INIT_VERSION) { LOG.debug("partition[{}]'s version is {}. ignore", partition.getId(), Partition.PARTITION_INIT_VERSION); continue; @@ -327,8 +327,8 @@ private long chooseTablet() { } // check if version has already been checked - if (partition.getCommittedVersion() == tablet.getCheckedVersion() - && partition.getCommittedVersionHash() == tablet.getCheckedVersionHash()) { + if (partition.getVisibleVersion() == tablet.getCheckedVersion() + && partition.getVisibleVersionHash() == tablet.getCheckedVersionHash()) { if (tablet.isConsistent()) { LOG.debug("tablet[{}]'s version[{}-{}] has been checked. ignore", chosenTabletId, tablet.getCheckedVersion(), diff --git a/fe/src/main/java/org/apache/doris/http/rest/RowCountAction.java b/fe/src/main/java/org/apache/doris/http/rest/RowCountAction.java index 58a0d1a9ac61d7..f8c5733615fc60 100644 --- a/fe/src/main/java/org/apache/doris/http/rest/RowCountAction.java +++ b/fe/src/main/java/org/apache/doris/http/rest/RowCountAction.java @@ -92,8 +92,8 @@ public void execute(BaseRequest request, BaseResponse response) throws DdlExcept OlapTable olapTable = (OlapTable) table; for (Partition partition : olapTable.getPartitions()) { - long version = partition.getCommittedVersion(); - long versionHash = partition.getCommittedVersionHash(); + long version = partition.getVisibleVersion(); + long versionHash = partition.getVisibleVersionHash(); for (MaterializedIndex index : partition.getMaterializedIndices()) { long indexRowCount = 0L; for (Tablet tablet : index.getTablets()) { diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index ddec1eead5d634..79a5e9149bff41 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -1765,7 +1765,7 @@ public void unprotectQuorumLoadJob(LoadJob job, Database db) { LOG.warn("the replica[{}] is missing", info.getReplicaId()); continue; } - replica.updateInfo(info.getVersion(), info.getVersionHash(), + replica.updateVersionInfo(info.getVersion(), info.getVersionHash(), info.getDataSize(), info.getRowCount()); } } @@ -1864,7 +1864,7 @@ public void unprotectFinishLoadJob(LoadJob job, Database db) { LOG.warn("the replica[{}] is missing", info.getReplicaId()); continue; } - replica.updateInfo(info.getVersion(), info.getVersionHash(), + replica.updateVersionInfo(info.getVersion(), info.getVersionHash(), info.getDataSize(), info.getRowCount()); } } @@ -2345,7 +2345,7 @@ private boolean processQuorumFinished(LoadJob job, Database db) { private void updatePartitionVersion(Partition partition, long version, long versionHash, long jobId) { long partitionId = partition.getId(); - partition.updateCommitVersionAndVersionHash(version, versionHash); + partition.updateVisibleVersionAndVersionHash(version, versionHash); LOG.info("update partition version success. version: {}, version hash: {}, job id: {}, partition id: {}", version, versionHash, jobId, partitionId); } @@ -2528,7 +2528,7 @@ public void unprotectDelete(DeleteInfo deleteInfo, Database db) { MaterializedIndex index = partition.getIndex(info.getIndexId()); Tablet tablet = index.getTablet(info.getTabletId()); Replica replica = tablet.getReplicaById(info.getReplicaId()); - replica.updateInfo(info.getVersion(), info.getVersionHash(), + replica.updateVersionInfo(info.getVersion(), info.getVersionHash(), info.getDataSize(), info.getRowCount()); } } @@ -2587,7 +2587,7 @@ public void replayFinishAsyncDeleteJob(AsyncDeleteJob deleteJob, Catalog catalog LOG.warn("the replica[{}] is missing", info.getReplicaId()); continue; } - replica.updateInfo(info.getVersion(), info.getVersionHash(), + replica.updateVersionInfo(info.getVersion(), info.getVersionHash(), info.getDataSize(), info.getRowCount()); } } @@ -3122,8 +3122,8 @@ public void deleteOld(DeleteStmt stmt) throws DdlException { long tableId = -1; long partitionId = -1; - long committedVersion = -1; - long committedVersionHash = -1; + long visibleVersion = -1; + long visibleVersionHash = -1; long newVersion = -1; long newVersionHash = -1; AgentBatchTask deleteBatchTask = null; @@ -3153,12 +3153,12 @@ public void deleteOld(DeleteStmt stmt) throws DdlException { partitionId = partition.getId(); // pre check - committedVersion = partition.getCommittedVersion(); - committedVersionHash = partition.getCommittedVersionHash(); - checkDelete(olapTable, partition, conditions, committedVersion, committedVersionHash, + visibleVersion = partition.getVisibleVersion(); + visibleVersionHash = partition.getVisibleVersionHash(); + checkDelete(olapTable, partition, conditions, visibleVersion, visibleVersionHash, null, asyncTabletIdToBackends, true); - newVersion = committedVersion + 1; + newVersion = visibleVersion + 1; newVersionHash = Util.generateVersionHash(); deleteInfo = new DeleteInfo(db.getId(), tableId, tableName, partition.getId(), partitionName, @@ -3235,12 +3235,12 @@ public void deleteOld(DeleteStmt stmt) throws DdlException { // after check // 1. check partition committed version first - if (partition.getCommittedVersion() > committedVersion - || (committedVersion == partition.getCommittedVersion() - && committedVersionHash != partition.getCommittedVersionHash())) { + if (partition.getVisibleVersion() > visibleVersion + || (visibleVersion == partition.getVisibleVersion() + && visibleVersionHash != partition.getVisibleVersionHash())) { LOG.warn("before delete version: {}-{}. after delete version: {}-{}", - committedVersion, committedVersionHash, - partition.getCommittedVersion(), partition.getCommittedVersionHash()); + visibleVersion, visibleVersionHash, + partition.getVisibleVersion(), partition.getVisibleVersionHash()); throw new DdlException("There may have some load job done during delete job. Try again"); } diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/src/main/java/org/apache/doris/master/MasterImpl.java index ff51d2e62238b4..3a9b7ac4dd781f 100644 --- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java @@ -642,7 +642,7 @@ private ReplicaPersistInfo updateReplicaInfo(OlapTable olapTable, Partition part throw new MetaNotFoundException("cannot find replica in tablet[" + tabletId + "], backend[" + backendId + "]"); } - replica.updateInfo(version, versionHash, dataSize, rowCount); + replica.updateVersionInfo(version, versionHash, dataSize, rowCount); LOG.debug("replica[{}] report schemaHash:{}", replica.getId(), schemaHash); return ReplicaPersistInfo.createForLoad(olapTable.getId(), partition.getId(), pushIndexId, tabletId, diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java index bea68c344d2328..d66be0cf8a7692 100644 --- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java @@ -397,7 +397,7 @@ private static void sync(Map backendTablets, ListMultimap tabletDeleteFromMeta double bfFpp = olapTable.getBfFpp(); CreateReplicaTask createReplicaTask = new CreateReplicaTask(backendId, dbId, tableId, partitionId, indexId, tabletId, shortKeyColumnCount, - schemaHash, partition.getCommittedVersion(), - partition.getCommittedVersionHash(), keysType, + schemaHash, partition.getVisibleVersion(), + partition.getVisibleVersionHash(), keysType, TStorageType.COLUMN, TStorageMedium.HDD, columns, bfColumns, bfFpp, null); createReplicaTasks.add(createReplicaTask); @@ -707,13 +707,13 @@ private static void addReplica(long tabletId, TTabletInfo backendTabletInfo, lon throw new MetaNotFoundException("tablet[" + tabletId + "] does not exist"); } - long committedVersion = partition.getCommittedVersion(); - long committedVersionHash = partition.getCommittedVersionHash(); + long visibleVersion = partition.getVisibleVersion(); + long visibleVersionHash = partition.getVisibleVersionHash(); // check replica version - if (version < committedVersion || (version == committedVersion && versionHash != committedVersionHash)) { + if (version < visibleVersion || (version == visibleVersion && versionHash != visibleVersionHash)) { throw new MetaNotFoundException("version is invalid. tablet[" + version + "-" + versionHash + "]" - + ", committed[" + committedVersion + "-" + committedVersionHash + "]"); + + ", visible[" + visibleVersion + "-" + visibleVersionHash + "]"); } // check schema hash @@ -723,17 +723,17 @@ private static void addReplica(long tabletId, TTabletInfo backendTabletInfo, lon } List replicas = tablet.getReplicas(); - int replicationOnLine = 0; + int onlineReplicaNum = 0; for (Replica replica : replicas) { final long id = replica.getBackendId(); final Backend backend = Catalog.getCurrentSystemInfo().getBackend(id); if (backend != null && backend.isAlive() && !backend.isDecommissioned() && replica.getState() == ReplicaState.NORMAL) { - replicationOnLine++; + onlineReplicaNum++; } } - if (replicationOnLine < replicationNum) { + if (onlineReplicaNum < replicationNum) { long replicaId = Catalog.getInstance().getNextId(); long lastFailedVersion = -1L; long lastFailedVersionHash = 0L; @@ -741,11 +741,11 @@ private static void addReplica(long tabletId, TTabletInfo backendTabletInfo, lon // this is a fatal error throw new MetaNotFoundException("version is invalid. tablet[" + version + "-" + versionHash + "]" + ", partition's max version [" + (partition.getNextVersion() - 1) + "]"); - } else if (version < partition.getCurrentVersion() - || version == partition.getCurrentVersion() - && versionHash != partition.getCurrentVersionHash()) { - lastFailedVersion = partition.getCurrentVersion(); - lastFailedVersionHash = partition.getCurrentVersionHash(); + } else if (version < partition.getCommittedVersion() + || version == partition.getCommittedVersion() + && versionHash != partition.getCommittedVersionHash()) { + lastFailedVersion = partition.getCommittedVersion(); + lastFailedVersionHash = partition.getCommittedVersionHash(); } Replica replica = new Replica(replicaId, backendId, version, versionHash, dataSize, rowCount, ReplicaState.NORMAL, diff --git a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java index 8c4b71a51d6e63..9ff17926d59135 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -411,10 +411,10 @@ private void addScanRangeLocations(Partition partition, int logNum = 0; String schemaHashStr = String.valueOf(olapTable.getSchemaHashByIndexId(index.getId())); - long committedVersion = partition.getCommittedVersion(); - long committedVersionHash = partition.getCommittedVersionHash(); - String committedVersionStr = String.valueOf(committedVersion); - String committedVersionHashStr = String.valueOf(partition.getCommittedVersionHash()); + long visibleVersion = partition.getVisibleVersion(); + long visibleVersionHash = partition.getVisibleVersionHash(); + String visibleVersionStr = String.valueOf(visibleVersion); + String visibleVersionHashStr = String.valueOf(partition.getVisibleVersionHash()); for (Tablet tablet : tablets) { long tabletId = tablet.getId(); @@ -424,19 +424,19 @@ private void addScanRangeLocations(Partition partition, TPaloScanRange paloRange = new TPaloScanRange(); paloRange.setDb_name(""); paloRange.setSchema_hash(schemaHashStr); - paloRange.setVersion(committedVersionStr); - paloRange.setVersion_hash(committedVersionHashStr); + paloRange.setVersion(visibleVersionStr); + paloRange.setVersion_hash(visibleVersionHashStr); paloRange.setTablet_id(tabletId); // random shuffle List && only collect one copy List allQueryableReplicas = Lists.newArrayList(); List localReplicas = Lists.newArrayList(); tablet.getQueryableReplicas(allQueryableReplicas, localReplicas, - committedVersion, committedVersionHash, + visibleVersion, visibleVersionHash, localBeId); if (allQueryableReplicas.isEmpty()) { LOG.error("no queryable replica found in tablet[{}]. committed version[{}], committed version hash[{}]", - tabletId, committedVersion, committedVersionHash); + tabletId, visibleVersion, visibleVersionHash); throw new UserException("Failed to get scan range, no replica!"); } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 7dbc126c19b644..d3da6a0b8c0afd 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -17,6 +17,12 @@ package org.apache.doris.transaction; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import org.apache.doris.alter.RollupJob; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -43,13 +49,6 @@ import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; - -import com.google.common.base.Preconditions; -import com.google.common.collect.HashBasedTable; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -98,10 +97,10 @@ public GlobalTransactionMgr(Catalog catalog) { } /** - * the app could specify the transactionid and + * the app could specify the transaction id * * @param coordinator - * @throws BeginTransactionException + * @throws BeginTransactionException * @throws IllegalTransactionParameterException */ public long beginTransaction(long dbId, String label, String coordinator, LoadJobSourceType sourceType) @@ -184,8 +183,9 @@ public void deleteTransaction(long transactionId) { * @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time * @note callers should get db.write lock before call this api */ - public void commitTransaction(long dbId, long transactionId, List tabletCommitInfos) throws MetaNotFoundException, TransactionCommitFailedException { - + public void commitTransaction(long dbId, long transactionId, List tabletCommitInfos) + throws MetaNotFoundException, TransactionCommitFailedException { + if (Config.disable_load_job) { throw new TransactionCommitFailedException("disable_load_job is set to true, all load job is prevented"); } @@ -194,6 +194,7 @@ public void commitTransaction(long dbId, long transactionId, List getReadyToPublishTransactions() { List readyPublishTransactionState = new ArrayList<>(); List allCommittedTransactionState = null; @@ -440,6 +445,7 @@ public List getReadyToPublishTransactions() { } finally { writeUnlock(); } + for (TransactionState transactionState : allCommittedTransactionState) { boolean meetPublishPredicate = true; long dbId = transactionState.getDbId(); @@ -448,35 +454,40 @@ public List getReadyToPublishTransactions() { continue; } db.readLock(); - writeLock(); try { - for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { - OlapTable table = (OlapTable) db.getTable(tableCommitInfo.getTableId()); - if (null == table) { - LOG.warn("table {} is dropped after commit, ignore this table", tableCommitInfo.getTableId()); - continue; - } - for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) { - Partition partition = table.getPartition(partitionCommitInfo.getPartitionId()); - if (null == partition) { - LOG.warn("partition {} is dropped after commit, ignore this partition", partitionCommitInfo.getPartitionId()); + readLock(); + try { + for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { + OlapTable table = (OlapTable) db.getTable(tableCommitInfo.getTableId()); + if (null == table) { + LOG.warn("table {} is dropped after commit, ignore this table", + tableCommitInfo.getTableId()); continue; } - if (partitionCommitInfo.getVersion() != partition.getCommittedVersion() + 1) { - meetPublishPredicate = false; + for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) { + Partition partition = table.getPartition(partitionCommitInfo.getPartitionId()); + if (null == partition) { + LOG.warn("partition {} is dropped after commit, ignore this partition", + partitionCommitInfo.getPartitionId()); + continue; + } + if (partitionCommitInfo.getVersion() != partition.getVisibleVersion() + 1) { + meetPublishPredicate = false; + break; + } + } + if (!meetPublishPredicate) { break; } } - if (!meetPublishPredicate) { - break; + if (meetPublishPredicate) { + LOG.debug("transaction [{}] is ready to publish", transactionState); + readyPublishTransactionState.add(transactionState); } - } - if (meetPublishPredicate) { - LOG.debug("transaction [{}] is ready to publish", transactionState); - readyPublishTransactionState.add(transactionState); + } finally { + readUnlock(); } } finally { - writeUnlock(); db.readUnlock(); } } @@ -485,7 +496,6 @@ public List getReadyToPublishTransactions() { /** * if the table is deleted between commit and publish version, then should ignore the partition - * if a tablet is not find in * * @param transactionId * @param errorReplicaIds @@ -558,27 +568,38 @@ public void finishTransaction(long transactionId, Set errorReplicaIds) { for (Tablet tablet : index.getTablets()) { int healthReplicaNum = 0; for (Replica replica : tablet.getReplicas()) { - // this means the replica is a healthy replica, it is health in the past and does not have error in current load if (!errorReplicaIds.contains(replica.getId()) && replica.getLastFailedVersion() < 0) { - if (replica.getVersion() == partition.getCommittedVersion() && replica.getVersionHash() == partition.getCommittedVersionHash() - || replica.getVersion() >= partitionCommitInfo.getVersion()) { - // during rollup the rollup replica's last failed version < 0, it maybe treated as a normal replica - // the replica is not failed during commit or publish - // during upgrade, one replica's last version maybe invalid, has to compare version hash - // if a,b,c commit 10 transactions, and then b,c crashed, we add new b',c' it has to recover, we improve a's version one by one and b' c' will recover - // from a one by one - replica.updateInfo(partitionCommitInfo.getVersion(), partitionCommitInfo.getVersionHash(), - replica.getDataSize(), replica.getRowCount()); - ++ healthReplicaNum; + // this means the replica is a healthy replica, + // it is healthy in the past and does not have error in current load + + if (replica.checkVersionCatchUp(partition.getVisibleVersion(), + partition.getVisibleVersionHash())) { + // during rollup, the rollup replica's last failed version < 0, + // it may be treated as a normal replica. + // the replica is not failed during commit or publish + // during upgrade, one replica's last version maybe invalid, + // has to compare version hash. + + // Here we still update the replica's info even if we failed to publish + // this txn, for the following case: + // replica A,B,C is successfully committed, but only A is successfully + // published, + // B and C is crashed, now we need a Clone task to repair this tablet. + // So, here we update A's version info, so that clone task will clone + // the latest version of data. + replica.updateVersionInfo(partitionCommitInfo.getVersion(), + partitionCommitInfo.getVersionHash(), + replica.getDataSize(), replica.getRowCount()); + ++healthReplicaNum; } else { // this means the replica has error in the past, but we did not observe it // during upgrade, one job maybe in quorum finished state, for example, A,B,C 3 replica - // A,B 's verison is 10, C's version is 10 but C' 10 is abnormal should be rollback + // A,B 's version is 10, C's version is 10 but C' 10 is abnormal should be rollback // then we will detect this and set C's last failed version to 10 and last success version to 11 // this logic has to be replayed in checkpoint thread replica.updateVersionInfo(replica.getVersion(), replica.getVersionHash(), - partition.getCommittedVersion(), partition.getCommittedVersionHash(), + partition.getVisibleVersion(), partition.getVisibleVersionHash(), partitionCommitInfo.getVersion(), partitionCommitInfo.getVersionHash()); LOG.warn("transaction state {} has error, the replica [{}] not appeared in error replica list " + " and its version not equal to partition commit version or commit version - 1" @@ -586,10 +607,10 @@ public void finishTransaction(long transactionId, Set errorReplicaIds) { } } else if (replica.getVersion() == partitionCommitInfo.getVersion() && replica.getVersionHash() == partitionCommitInfo.getVersionHash()) { - // the replica's version and versionhash is equal to current transaction partition's version and version hash + // the replica's version and version hash is equal to current transaction partition's version and version hash // the replica is normal, then remove it from error replica ids errorReplicaIds.remove(replica.getId()); - ++ healthReplicaNum; + ++healthReplicaNum; } if (replica.getLastFailedVersion() > 0) { // if this error replica is a base replica and it is under rollup @@ -903,12 +924,12 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat rollupJob = (RollupJob) catalog.getRollupHandler().getAlterJob(tableId); rollingUpIndex = rollupJob.getRollupIndex(partitionId); } - List allInices = new ArrayList<>(); - allInices.addAll(partition.getMaterializedIndices()); + List allIndices = new ArrayList<>(); + allIndices.addAll(partition.getMaterializedIndices()); if (rollingUpIndex != null) { - allInices.add(rollingUpIndex); + allIndices.add(rollingUpIndex); } - for (MaterializedIndex index : allInices) { + for (MaterializedIndex index : allIndices) { for (Tablet tablet : index.getTablets()) { for (Replica replica : tablet.getReplicas()) { long lastFailedVersion = replica.getLastFailedVersion(); @@ -919,30 +940,22 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat long lastSuccessVersionHash = replica.getLastSuccessVersionHash(); if (!errorReplicaIds.contains(replica.getId())) { if (replica.getLastFailedVersion() > 0) { - // if the replica is a failed replica, then not change version and version hash + // if the replica is a failed replica, then not changing version and version hash + newVersion = replica.getVersion(); + newVersionHash = replica.getVersionHash(); + } else if (!replica.checkVersionCatchUp(partition.getVisibleVersion(), + partition.getVisibleVersionHash())) { + // this means the replica has error in the past, but we did not observe it + // during upgrade, one job maybe in quorum finished state, for example, A,B,C 3 replica + // A,B 's version is 10, C's version is 10 but C' 10 is abnormal should be rollback + // then we will detect this and set C's last failed version to 10 and last success version to 11 + // this logic has to be replayed in checkpoint thread + lastFailedVersion = partition.getVisibleVersion(); + lastFailedVersionHash = partition.getVisibleVersionHash(); newVersion = replica.getVersion(); newVersionHash = replica.getVersionHash(); - } else { - if (replica.getVersion() == partition.getCommittedVersion() && replica.getVersionHash() == partition.getCommittedVersionHash() - || replica.getVersion() >= partitionCommitInfo.getVersion()) { - // during rollup the rollup replica's last failed version < 0, it maybe treated as a normal replica - // the replica is not failed during commit or publish - // during upgrade, one replica's last version maybe invalid, has to compare version hash - // if a,b,c commit 10 transactions, and then b,c crashed, we add new b',c' it has to recover, we improve a's version one by one and b' c' will recover - // from a one by one - // DO NOTHING - } else { - // this means the replica has error in the past, but we did not observe it - // during upgrade, one job maybe in quorum finished state, for example, A,B,C 3 replica - // A,B 's version is 10, C's version is 10 but C' 10 is abnormal should be rollback - // then we will detect this and set C's last failed version to 10 and last success version to 11 - // this logic has to be replayed in checkpoint thread - lastFailedVersion = partition.getCommittedVersion(); - lastFailedVersionHash = partition.getCommittedVersionHash(); - newVersion = replica.getVersion(); - newVersionHash = replica.getVersionHash(); - } } + // success version always move forward lastSucessVersion = newCommitVersion; lastSuccessVersionHash = newCommitVersionHash; @@ -971,10 +984,10 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat } } } - } + } // end for indices long version = partitionCommitInfo.getVersion(); long versionHash = partitionCommitInfo.getVersionHash(); - partition.updateCommitVersionAndVersionHash(version, versionHash); + partition.updateVisibleVersionAndVersionHash(version, versionHash); if (LOG.isDebugEnabled()) { LOG.debug("transaction state {} set partition's version to [{}] and version hash to [{}]", transactionState, version, versionHash); diff --git a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index b31f1a50cab1e9..324ff9d3f2af36 100644 --- a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -17,13 +17,7 @@ package org.apache.doris.transaction; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import com.google.common.collect.Sets; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Replica; @@ -37,7 +31,13 @@ import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TTaskType; -import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; public class PublishVersionDaemon extends Daemon { @@ -68,8 +68,8 @@ private void publishVersion() { // attention here, we publish transaction state to all backends including dead backend, if not publish to dead backend // then transaction manager will treat it as success List allBackends = Catalog.getCurrentSystemInfo().getBackendIds(false); - if (allBackends == null || allBackends.size() == 0) { - LOG.warn("some transaction state need to publish, but no alive backends!!!"); + if (allBackends.isEmpty()) { + LOG.warn("some transaction state need to publish, but no backend exists"); return; } // every backend-transaction identified a single task @@ -122,7 +122,12 @@ private void publishVersion() { TabletInvertedIndex tabletInvertedIndex = Catalog.getCurrentInvertedIndex(); // try to finish the transaction, if failed just retry in next loop + long currentTime = System.currentTimeMillis(); for (TransactionState transactionState : readyTransactionStates) { + if (currentTime - transactionState.getPublishVersionTime() < Config.publish_version_interval_millis * 2) { + // wait 2 rounds before handling publish result + continue; + } Map transTasks = transactionState.getPublishVersionTasks(); Set transErrorReplicas = Sets.newHashSet(); for (PublishVersionTask publishVersionTask : transTasks.values()) { @@ -130,7 +135,7 @@ private void publishVersion() { // sometimes backend finish publish version task, but it maybe failed to change transactionid to version for some tablets // and it will upload the failed tabletinfo to fe and fe will deal with them List errorTablets = publishVersionTask.getErrorTablets(); - if (errorTablets == null || errorTablets.size() == 0) { + if (errorTablets == null || errorTablets.isEmpty()) { continue; } else { for (long tabletId : errorTablets) { @@ -172,7 +177,7 @@ private void publishVersion() { allErrorReplicas.add(replica.getId()); if (replica.getState() != ReplicaState.CLONE && replica.getLastFailedVersion() < 1) { - ++ normalReplicasNotRespond; + ++normalReplicasNotRespond; } } if (normalReplicasNotRespond == 0 diff --git a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index 78688e837c4073..49852bdc60e2d2 100644 --- a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -114,10 +114,10 @@ public static boolean compareCatalog(Catalog masterCatalog, Catalog slaveCatalog if (masterPartition.getId() != slavePartition.getId()) { return false; } - if (masterPartition.getCommittedVersion() != slavePartition.getCommittedVersion() - || masterPartition.getCommittedVersionHash() != slavePartition.getCommittedVersionHash() + if (masterPartition.getVisibleVersion() != slavePartition.getVisibleVersion() + || masterPartition.getVisibleVersionHash() != slavePartition.getVisibleVersionHash() || masterPartition.getNextVersion() != slavePartition.getNextVersion() - || masterPartition.getCurrentVersionHash() != slavePartition.getCurrentVersionHash()) { + || masterPartition.getCommittedVersionHash() != slavePartition.getCommittedVersionHash()) { return false; } List allMaterializedIndices = masterPartition.getMaterializedIndices(); @@ -180,7 +180,7 @@ public static Database createSimpleDb(long dbId, long tableId, long partitionId, // partition RandomDistributionInfo distributionInfo = new RandomDistributionInfo(10); Partition partition = new Partition(partitionId, testPartition1, index, distributionInfo); - partition.updateCommitVersionAndVersionHash(testStartVersion, testStartVersionHash); + partition.updateVisibleVersionAndVersionHash(testStartVersion, testStartVersionHash); partition.setNextVersion(testStartVersion + 1); partition.setNextVersionHash(testPartitionNextVersionHash, testPartitionCurrentVersionHash); diff --git a/fe/src/test/java/org/apache/doris/catalog/ReplicaTest.java b/fe/src/test/java/org/apache/doris/catalog/ReplicaTest.java index 8035a96d836970..21ed4574ecef79 100644 --- a/fe/src/test/java/org/apache/doris/catalog/ReplicaTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/ReplicaTest.java @@ -77,7 +77,7 @@ public void getMethodTest() { long newVersionHash = 87654; long newDataSize = dataSize + 100; long newRowCount = rowCount + 10; - replica.updateInfo(newVersion, newVersionHash, newDataSize, newRowCount); + replica.updateVersionInfo(newVersion, newVersionHash, newDataSize, newRowCount); Assert.assertEquals(newVersion, replica.getVersion()); Assert.assertEquals(newVersionHash, replica.getVersionHash()); Assert.assertEquals(newDataSize, replica.getDataSize()); @@ -153,7 +153,7 @@ public void testSerialization() throws Exception { public void testUpdateVersion1() { Replica originalReplica = new Replica(10000, 20000, 3, 1231, 100, 78, ReplicaState.NORMAL, 0, 0, 3, 1231); // new version is little than original version, it is invalid the version will not update - originalReplica.updateInfo(2, 111, 100, 78); + originalReplica.updateVersionInfo(2, 111, 100, 78); assertEquals(3, originalReplica.getVersion()); assertEquals(1231, originalReplica.getVersionHash()); } @@ -161,7 +161,7 @@ public void testUpdateVersion1() { @Test public void testUpdateVersion2() { Replica originalReplica = new Replica(10000, 20000, 3, 1231, 100, 78, ReplicaState.NORMAL, 0, 0, 0, 0); - originalReplica.updateInfo(3, 111, 100, 78); + originalReplica.updateVersionInfo(3, 111, 100, 78); // if new version >= current version and last success version <= new version, then last success version should be updated assertEquals(3, originalReplica.getLastSuccessVersion()); assertEquals(111, originalReplica.getLastSuccessVersionHash()); @@ -194,7 +194,7 @@ public void testUpdateVersion3() { assertEquals(100, originalReplica.getLastFailedVersionHash()); // update version to 8, the last success version and version should be 10 - originalReplica.updateInfo(8, 100, 100, 78); + originalReplica.updateVersionInfo(8, 100, 100, 78); assertEquals(10, originalReplica.getLastSuccessVersion()); assertEquals(1210, originalReplica.getLastSuccessVersionHash()); assertEquals(10, originalReplica.getVersion()); @@ -233,7 +233,7 @@ public void testUpdateVersion3() { assertEquals(1218, originalReplica.getLastFailedVersionHash()); // update version to 17 then version and success version is 17 - originalReplica.updateInfo(17, 1217, 100, 78); + originalReplica.updateVersionInfo(17, 1217, 100, 78); assertEquals(17, originalReplica.getLastSuccessVersion()); assertEquals(1217, originalReplica.getLastSuccessVersionHash()); assertEquals(17, originalReplica.getVersion()); @@ -242,7 +242,7 @@ public void testUpdateVersion3() { assertEquals(1218, originalReplica.getLastFailedVersionHash()); // update version to 18, then version and last success version should be 18 and failed version should be -1 - originalReplica.updateInfo(18, 1218, 100, 78); + originalReplica.updateVersionInfo(18, 1218, 100, 78); assertEquals(18, originalReplica.getLastSuccessVersion()); assertEquals(1218, originalReplica.getLastSuccessVersionHash()); assertEquals(18, originalReplica.getVersion()); diff --git a/fe/src/test/java/org/apache/doris/load/LoadCheckerTest.java b/fe/src/test/java/org/apache/doris/load/LoadCheckerTest.java index ccf43f16d1b768..502c796027b5d8 100644 --- a/fe/src/test/java/org/apache/doris/load/LoadCheckerTest.java +++ b/fe/src/test/java/org/apache/doris/load/LoadCheckerTest.java @@ -237,7 +237,7 @@ public void testRunLoadingJobs() throws Exception { // set table family load infos OlapTable table = (OlapTable) db.getTable(tableId); Partition partition = table.getPartition(partitionId); - long newVersion = partition.getCommittedVersion() + 1; + long newVersion = partition.getVisibleVersion() + 1; long newVersionHash = 1L; PartitionLoadInfo partitionLoadInfo = new PartitionLoadInfo(new ArrayList()); partitionLoadInfo.setVersion(newVersion); @@ -286,7 +286,7 @@ public void testRunLoadingJobs() throws Exception { for (MaterializedIndex olapIndex : partition.getMaterializedIndices()) { for (Tablet tablet : olapIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateInfo(newVersion, newVersionHash, 0L, 0L); + replica.updateVersionInfo(newVersion, newVersionHash, 0L, 0L); } } } @@ -311,7 +311,7 @@ public void testRunQuorumFinishedJobs() throws Exception { // set table family load infos OlapTable table = (OlapTable) db.getTable(tableId); Partition partition = table.getPartition(partitionId); - long newVersion = partition.getCommittedVersion() + 1; + long newVersion = partition.getVisibleVersion() + 1; long newVersionHash = 0L; PartitionLoadInfo partitionLoadInfo = new PartitionLoadInfo(new ArrayList()); partitionLoadInfo.setVersion(newVersion); @@ -328,7 +328,7 @@ public void testRunQuorumFinishedJobs() throws Exception { for (MaterializedIndex index : partition.getMaterializedIndices()) { for (Tablet tablet : index.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateInfo(newVersion, newVersionHash, 0L, 0L); + replica.updateVersionInfo(newVersion, newVersionHash, 0L, 0L); } TabletLoadInfo tabletLoadInfo = new TabletLoadInfo("/label/path", 1L); tabletLoadInfos.put(tablet.getId(), tabletLoadInfo); diff --git a/fe/src/test/java/org/apache/doris/task/LoadEtlTaskTest.java b/fe/src/test/java/org/apache/doris/task/LoadEtlTaskTest.java index f420aabff0b09b..32cd8fa9f7ac63 100644 --- a/fe/src/test/java/org/apache/doris/task/LoadEtlTaskTest.java +++ b/fe/src/test/java/org/apache/doris/task/LoadEtlTaskTest.java @@ -169,7 +169,7 @@ public void testRunEtlTask() throws Exception { // verify finished Assert.assertEquals(100, job.getProgress()); - long expectVersion = partition.getCommittedVersion() + 1; + long expectVersion = partition.getVisibleVersion() + 1; Assert.assertEquals(-1, job.getIdToTableLoadInfo().get(tableId) .getIdToPartitionLoadInfo().get(paritionId).getVersion()); diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index f225f0c27d65df..fa6034a088e154 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -153,7 +153,7 @@ public void testCommitTransaction1() throws MetaNotFoundException, Partition testPartition = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1) .getPartition(CatalogTestUtil.testPartition1); // check partition version - assertEquals(CatalogTestUtil.testStartVersion, testPartition.getCommittedVersion()); + assertEquals(CatalogTestUtil.testStartVersion, testPartition.getVisibleVersion()); assertEquals(CatalogTestUtil.testStartVersion + 2, testPartition.getNextVersion()); // check partition next version Tablet tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1); @@ -214,7 +214,7 @@ public void testCommitTransactionWithOneFailed() throws MetaNotFoundException, Partition testPartition = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1) .getPartition(CatalogTestUtil.testPartition1); // check partition version - assertEquals(CatalogTestUtil.testStartVersion, testPartition.getCommittedVersion()); + assertEquals(CatalogTestUtil.testStartVersion, testPartition.getVisibleVersion()); assertEquals(CatalogTestUtil.testStartVersion + 2, testPartition.getNextVersion()); // check partition next version Tablet tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1); @@ -240,7 +240,7 @@ public void testCommitTransactionWithOneFailed() throws MetaNotFoundException, testPartition = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1) .getPartition(CatalogTestUtil.testPartition1); // check partition version - assertEquals(CatalogTestUtil.testStartVersion, testPartition.getCommittedVersion()); + assertEquals(CatalogTestUtil.testStartVersion, testPartition.getVisibleVersion()); assertEquals(CatalogTestUtil.testStartVersion + 3, testPartition.getNextVersion()); // check partition next version tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1); @@ -262,7 +262,7 @@ public void testCommitTransactionWithOneFailed() throws MetaNotFoundException, assertEquals(CatalogTestUtil.testStartVersion, replcia2.getLastSuccessVersion()); assertEquals(CatalogTestUtil.testStartVersion, replcia3.getLastSuccessVersion()); // check partition version - assertEquals(CatalogTestUtil.testStartVersion, testPartition.getCommittedVersion()); + assertEquals(CatalogTestUtil.testStartVersion, testPartition.getVisibleVersion()); assertEquals(CatalogTestUtil.testStartVersion + 3, testPartition.getNextVersion()); transactionState = fakeEditLog.getTransaction(transactionId2); @@ -301,7 +301,7 @@ public void testFinishTransaction() throws MetaNotFoundException, TransactionCom Partition testPartition = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1) .getPartition(CatalogTestUtil.testPartition1); // check partition version - assertEquals(CatalogTestUtil.testStartVersion + 1, testPartition.getCommittedVersion()); + assertEquals(CatalogTestUtil.testStartVersion + 1, testPartition.getVisibleVersion()); assertEquals(CatalogTestUtil.testStartVersion + 2, testPartition.getNextVersion()); // check partition next version Tablet tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1); @@ -408,7 +408,7 @@ public void testFinishTransactionWithOneFailed() throws MetaNotFoundException, testPartition = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1) .getPartition(CatalogTestUtil.testPartition1); // check partition version - assertEquals(CatalogTestUtil.testStartVersion + 1, testPartition.getCommittedVersion()); + assertEquals(CatalogTestUtil.testStartVersion + 1, testPartition.getVisibleVersion()); assertEquals(CatalogTestUtil.testStartVersion + 3, testPartition.getNextVersion()); // follower catalog replay the transaction @@ -432,7 +432,7 @@ public void testFinishTransactionWithOneFailed() throws MetaNotFoundException, assertEquals(CatalogTestUtil.testStartVersion + 2, replcia2.getLastSuccessVersion()); assertEquals(CatalogTestUtil.testStartVersion + 2, replcia3.getLastSuccessVersion()); // check partition version - assertEquals(CatalogTestUtil.testStartVersion + 2, testPartition.getCommittedVersion()); + assertEquals(CatalogTestUtil.testStartVersion + 2, testPartition.getVisibleVersion()); assertEquals(CatalogTestUtil.testStartVersion + 3, testPartition.getNextVersion()); transactionState = fakeEditLog.getTransaction(transactionId2);