Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public enum PartitionState {
private long committedVersionHash;
@SerializedName(value = "visibleVersion")
private long visibleVersion;
@SerializedName(value = "visibleVersionTime")
private long visibleVersionTime;
@SerializedName(value = "visibleVersionHash")
private long visibleVersionHash;
@SerializedName(value = "nextVersion")
Expand All @@ -113,6 +115,7 @@ public Partition(long id, String name,
this.baseIndex = baseIndex;

this.visibleVersion = PARTITION_INIT_VERSION;
this.visibleVersionTime = System.currentTimeMillis();
this.visibleVersionHash = PARTITION_INIT_VERSION_HASH;
// PARTITION_INIT_VERSION == 1, so the first load version is 2 !!!
this.nextVersion = PARTITION_INIT_VERSION + 1;
Expand Down Expand Up @@ -147,8 +150,7 @@ public void setState(PartitionState state) {
* the restored partition version info》
*/
public void updateVersionForRestore(long visibleVersion, long visibleVersionHash) {
this.visibleVersion = visibleVersion;
this.visibleVersionHash = visibleVersionHash;
this.setVisibleVersion(visibleVersion, visibleVersionHash);
this.nextVersion = this.visibleVersion + 1;
this.nextVersionHash = Util.generateVersionHash();
this.committedVersionHash = visibleVersionHash;
Expand All @@ -157,8 +159,11 @@ public void updateVersionForRestore(long visibleVersion, long visibleVersionHash
}

public void updateVisibleVersionAndVersionHash(long visibleVersion, long visibleVersionHash) {
this.visibleVersion = visibleVersion;
this.visibleVersionHash = visibleVersionHash;
updateVisibleVersionAndVersionHash(visibleVersion, System.currentTimeMillis(), visibleVersionHash);
}

public void updateVisibleVersionAndVersionHash(long visibleVersion, long visibleVersionTime, long visibleVersionHash) {
this.setVisibleVersion(visibleVersion, visibleVersionTime, visibleVersionHash);
if (MetaContext.get() != null) {
// MetaContext is not null means we are in a edit log replay thread.
// if it is upgrade from old palo cluster, then should update next version info
Expand All @@ -181,9 +186,26 @@ public long getVisibleVersion() {
return visibleVersion;
}

public long getVisibleVersionTime() {
return visibleVersionTime;
}

public long getVisibleVersionHash() {
return visibleVersionHash;
}

// The method updateVisibleVersionAndVersionHash is called when fe restart, the visibleVersionTime is updated
private void setVisibleVersion(long visibleVersion, long visibleVersionHash) {
this.visibleVersion = visibleVersion;
this.visibleVersionTime = System.currentTimeMillis();
this.visibleVersionHash = visibleVersionHash;
}

public void setVisibleVersion(long visibleVersion, long visibleVersionTime, long visibleVersionHash) {
this.visibleVersion = visibleVersion;
this.visibleVersionTime = visibleVersionTime;
this.visibleVersionHash = visibleVersionHash;
}

public PartitionState getState() {
return this.state;
Expand Down Expand Up @@ -344,6 +366,7 @@ public void write(DataOutput out) throws IOException {
}

out.writeLong(visibleVersion);
out.writeLong(visibleVersionTime);
out.writeLong(visibleVersionHash);

out.writeLong(nextVersion);
Expand Down Expand Up @@ -379,6 +402,11 @@ public void readFields(DataInput in) throws IOException {
}

visibleVersion = in.readLong();
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_88) {
visibleVersionTime = in.readLong();
} else {
visibleVersionTime = System.currentTimeMillis();
}
visibleVersionHash = in.readLong();
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_45) {
nextVersion = in.readLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ public final class FeMetaVersion {
public static final int VERSION_86 = 86;
// spark resource, resource privilege, broker file group for hive table
public static final int VERSION_87 = 87;
// add partition visibleVersionTime
public static final int VERSION_88 = 88;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_87;
public static final int VERSION_CURRENT = VERSION_88;
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
*/
public class PartitionsProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("PartitionId").add("PartitionName").add("VisibleVersion").add("VisibleVersionHash")
.add("PartitionId").add("PartitionName")
.add("VisibleVersion").add("VisibleVersionTime").add("VisibleVersionHash")
.add("State").add("PartitionKey").add("Range").add("DistributionKey")
.add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime")
.add("LastConsistencyCheckTime")
Expand Down Expand Up @@ -231,6 +232,7 @@ private List<List<Comparable>> getPartitionInfos() {
partitionInfo.add(partitionId);
partitionInfo.add(partitionName);
partitionInfo.add(partition.getVisibleVersion());
partitionInfo.add(TimeUtils.longToTimeString(partition.getVisibleVersionTime()));
partitionInfo.add(partition.getVisibleVersionHash());
partitionInfo.add(partition.getState());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,8 +788,8 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S
OlapTable table = (OlapTable) db.getTable(tableId);
Partition partition = table.getPartition(partitionId);
PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId,
partition.getNextVersion(),
partition.getNextVersionHash());
partition.getNextVersion(), partition.getNextVersionHash(),
System.currentTimeMillis() /* use as partition visible time */);
tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
}
transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
Expand Down Expand Up @@ -1258,8 +1258,9 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat
}
} // end for indices
long version = partitionCommitInfo.getVersion();
long versionTime = partitionCommitInfo.getVersionTime();
long versionHash = partitionCommitInfo.getVersionHash();
partition.updateVisibleVersionAndVersionHash(version, versionHash);
partition.updateVisibleVersionAndVersionHash(version, versionTime, versionHash);
if (LOG.isDebugEnabled()) {
LOG.debug("transaction state {} set partition {}'s version to [{}] and version hash to [{}]",
transactionState, partition.getId(), version, versionHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,57 @@

package org.apache.doris.transaction;

import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.doris.common.io.Writable;

public class PartitionCommitInfo implements Writable {

@SerializedName(value = "partitionId")
private long partitionId;
@SerializedName(value = "version")
private long version;
@SerializedName(value = "versionTime")
private long versionTime;
@SerializedName(value = "versionHash")
private long versionHash;

public PartitionCommitInfo() {

}

public PartitionCommitInfo(long partitionId, long version, long versionHash) {
public PartitionCommitInfo(long partitionId, long version, long versionHash, long visibleTime) {
super();
this.partitionId = partitionId;
this.version = version;
this.versionTime = visibleTime;
this.versionHash = versionHash;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(partitionId);
out.writeLong(version);
out.writeLong(versionHash);
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}

public void readFields(DataInput in) throws IOException {
partitionId = in.readLong();
version = in.readLong();
versionHash = in.readLong();
public static PartitionCommitInfo read(DataInput in) throws IOException {
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_88) {
long partitionId = in.readLong();
long version = in.readLong();
long versionHash = in.readLong();
return new PartitionCommitInfo(partitionId, version, versionHash, System.currentTimeMillis());
} else {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, PartitionCommitInfo.class);
}
}

public long getPartitionId() {
Expand All @@ -60,19 +77,22 @@ public long getPartitionId() {
public long getVersion() {
return version;
}

public long getVersionTime() {
return versionTime;
}

public long getVersionHash() {
return versionHash;
}

@Override
public String toString() {
StringBuffer strBuffer = new StringBuffer("partitionid=");
strBuffer.append(partitionId);
strBuffer.append(", version=");
strBuffer.append(version);
strBuffer.append(", versionHash=");
strBuffer.append(versionHash);
return strBuffer.toString();
StringBuilder sb = new StringBuilder("partitionid=");
sb.append(partitionId);
sb.append(", version=").append(version);
sb.append(", versionHash=").append(versionHash);
sb.append(", versionTime=").append(versionTime);
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.doris.transaction;

import org.apache.doris.common.io.Writable;

import com.google.common.collect.Maps;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;

import org.apache.doris.common.io.Writable;
import com.google.common.collect.Maps;

public class TableCommitInfo implements Writable {

private long tableId;
Expand Down Expand Up @@ -60,8 +61,7 @@ public void readFields(DataInput in) throws IOException {
if (hasPartitionInfo) {
int elementNum = in.readInt();
for (int i = 0; i < elementNum; ++i) {
PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo();
partitionCommitInfo.readFields(in);
PartitionCommitInfo partitionCommitInfo = PartitionCommitInfo.read(in);
idToPartitionCommitInfo.put(partitionCommitInfo.getPartitionId(), partitionCommitInfo);
}
}
Expand Down