diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 0730bed5b84e2f..14fd6ef483e97e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -92,6 +92,8 @@ public enum PartitionState { private long committedVersionHash; @SerializedName(value = "visibleVersion") private long visibleVersion; + @SerializedName(value = "visibleVersionTime") + private long visibleVersionTime; @SerializedName(value = "visibleVersionHash") private long visibleVersionHash; @SerializedName(value = "nextVersion") @@ -113,6 +115,7 @@ public Partition(long id, String name, this.baseIndex = baseIndex; this.visibleVersion = PARTITION_INIT_VERSION; + this.visibleVersionTime = System.currentTimeMillis(); this.visibleVersionHash = PARTITION_INIT_VERSION_HASH; // PARTITION_INIT_VERSION == 1, so the first load version is 2 !!! this.nextVersion = PARTITION_INIT_VERSION + 1; @@ -147,8 +150,7 @@ public void setState(PartitionState state) { * the restored partition version info》 */ public void updateVersionForRestore(long visibleVersion, long visibleVersionHash) { - this.visibleVersion = visibleVersion; - this.visibleVersionHash = visibleVersionHash; + this.setVisibleVersion(visibleVersion, visibleVersionHash); this.nextVersion = this.visibleVersion + 1; this.nextVersionHash = Util.generateVersionHash(); this.committedVersionHash = visibleVersionHash; @@ -157,8 +159,11 @@ public void updateVersionForRestore(long visibleVersion, long visibleVersionHash } public void updateVisibleVersionAndVersionHash(long visibleVersion, long visibleVersionHash) { - this.visibleVersion = visibleVersion; - this.visibleVersionHash = visibleVersionHash; + updateVisibleVersionAndVersionHash(visibleVersion, System.currentTimeMillis(), visibleVersionHash); + } + + public void updateVisibleVersionAndVersionHash(long visibleVersion, long visibleVersionTime, long visibleVersionHash) { + this.setVisibleVersion(visibleVersion, visibleVersionTime, visibleVersionHash); if (MetaContext.get() != null) { // MetaContext is not null means we are in a edit log replay thread. // if it is upgrade from old palo cluster, then should update next version info @@ -181,9 +186,26 @@ public long getVisibleVersion() { return visibleVersion; } + public long getVisibleVersionTime() { + return visibleVersionTime; + } + public long getVisibleVersionHash() { return visibleVersionHash; } + + // The method updateVisibleVersionAndVersionHash is called when fe restart, the visibleVersionTime is updated + private void setVisibleVersion(long visibleVersion, long visibleVersionHash) { + this.visibleVersion = visibleVersion; + this.visibleVersionTime = System.currentTimeMillis(); + this.visibleVersionHash = visibleVersionHash; + } + + public void setVisibleVersion(long visibleVersion, long visibleVersionTime, long visibleVersionHash) { + this.visibleVersion = visibleVersion; + this.visibleVersionTime = visibleVersionTime; + this.visibleVersionHash = visibleVersionHash; + } public PartitionState getState() { return this.state; @@ -344,6 +366,7 @@ public void write(DataOutput out) throws IOException { } out.writeLong(visibleVersion); + out.writeLong(visibleVersionTime); out.writeLong(visibleVersionHash); out.writeLong(nextVersion); @@ -379,6 +402,11 @@ public void readFields(DataInput in) throws IOException { } visibleVersion = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_88) { + visibleVersionTime = in.readLong(); + } else { + visibleVersionTime = System.currentTimeMillis(); + } visibleVersionHash = in.readLong(); if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_45) { nextVersion = in.readLong(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index 7c52c9438c68e3..ec7205f97c85ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -185,6 +185,8 @@ public final class FeMetaVersion { public static final int VERSION_86 = 86; // spark resource, resource privilege, broker file group for hive table public static final int VERSION_87 = 87; + // add partition visibleVersionTime + public static final int VERSION_88 = 88; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_87; + public static final int VERSION_CURRENT = VERSION_88; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java index 5cd64cd18bbe38..551407ab4ac23c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java @@ -64,7 +64,8 @@ */ public class PartitionsProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("PartitionId").add("PartitionName").add("VisibleVersion").add("VisibleVersionHash") + .add("PartitionId").add("PartitionName") + .add("VisibleVersion").add("VisibleVersionTime").add("VisibleVersionHash") .add("State").add("PartitionKey").add("Range").add("DistributionKey") .add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime") .add("LastConsistencyCheckTime") @@ -231,6 +232,7 @@ private List> getPartitionInfos() { partitionInfo.add(partitionId); partitionInfo.add(partitionName); partitionInfo.add(partition.getVisibleVersion()); + partitionInfo.add(TimeUtils.longToTimeString(partition.getVisibleVersionTime())); partitionInfo.add(partition.getVisibleVersionHash()); partitionInfo.add(partition.getState()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index aa15bc0169a710..e3c72238f84651 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -788,8 +788,8 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S OlapTable table = (OlapTable) db.getTable(tableId); Partition partition = table.getPartition(partitionId); PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, - partition.getNextVersion(), - partition.getNextVersionHash()); + partition.getNextVersion(), partition.getNextVersionHash(), + System.currentTimeMillis() /* use as partition visible time */); tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo); } transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); @@ -1258,8 +1258,9 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat } } // end for indices long version = partitionCommitInfo.getVersion(); + long versionTime = partitionCommitInfo.getVersionTime(); long versionHash = partitionCommitInfo.getVersionHash(); - partition.updateVisibleVersionAndVersionHash(version, versionHash); + partition.updateVisibleVersionAndVersionHash(version, versionTime, versionHash); if (LOG.isDebugEnabled()) { LOG.debug("transaction state {} set partition {}'s version to [{}] and version hash to [{}]", transactionState, partition.getId(), version, versionHash); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java index c22a376b50a31e..b88d3e6b1e74a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java @@ -17,40 +17,57 @@ package org.apache.doris.transaction; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.doris.common.io.Writable; - public class PartitionCommitInfo implements Writable { + @SerializedName(value = "partitionId") private long partitionId; + @SerializedName(value = "version") private long version; + @SerializedName(value = "versionTime") + private long versionTime; + @SerializedName(value = "versionHash") private long versionHash; public PartitionCommitInfo() { } - public PartitionCommitInfo(long partitionId, long version, long versionHash) { + public PartitionCommitInfo(long partitionId, long version, long versionHash, long visibleTime) { super(); this.partitionId = partitionId; this.version = version; + this.versionTime = visibleTime; this.versionHash = versionHash; } @Override public void write(DataOutput out) throws IOException { - out.writeLong(partitionId); - out.writeLong(version); - out.writeLong(versionHash); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } - public void readFields(DataInput in) throws IOException { - partitionId = in.readLong(); - version = in.readLong(); - versionHash = in.readLong(); + public static PartitionCommitInfo read(DataInput in) throws IOException { + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_88) { + long partitionId = in.readLong(); + long version = in.readLong(); + long versionHash = in.readLong(); + return new PartitionCommitInfo(partitionId, version, versionHash, System.currentTimeMillis()); + } else { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, PartitionCommitInfo.class); + } } public long getPartitionId() { @@ -60,6 +77,10 @@ public long getPartitionId() { public long getVersion() { return version; } + + public long getVersionTime() { + return versionTime; + } public long getVersionHash() { return versionHash; @@ -67,12 +88,11 @@ public long getVersionHash() { @Override public String toString() { - StringBuffer strBuffer = new StringBuffer("partitionid="); - strBuffer.append(partitionId); - strBuffer.append(", version="); - strBuffer.append(version); - strBuffer.append(", versionHash="); - strBuffer.append(versionHash); - return strBuffer.toString(); + StringBuilder sb = new StringBuilder("partitionid="); + sb.append(partitionId); + sb.append(", version=").append(version); + sb.append(", versionHash=").append(versionHash); + sb.append(", versionTime=").append(versionTime); + return sb.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java index 024ea74ea39af3..e59206ad577ba1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java @@ -17,14 +17,15 @@ package org.apache.doris.transaction; +import org.apache.doris.common.io.Writable; + +import com.google.common.collect.Maps; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Map; -import org.apache.doris.common.io.Writable; -import com.google.common.collect.Maps; - public class TableCommitInfo implements Writable { private long tableId; @@ -60,8 +61,7 @@ public void readFields(DataInput in) throws IOException { if (hasPartitionInfo) { int elementNum = in.readInt(); for (int i = 0; i < elementNum; ++i) { - PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(); - partitionCommitInfo.readFields(in); + PartitionCommitInfo partitionCommitInfo = PartitionCommitInfo.read(in); idToPartitionCommitInfo.put(partitionCommitInfo.getPartitionId(), partitionCommitInfo); } }