From ef9badc7d37300719d02282351e8e534acb4c0ae Mon Sep 17 00:00:00 2001 From: zhangdong Date: Fri, 17 Jan 2025 16:52:58 +0800 Subject: [PATCH 1/2] 1 --- .../iceberg/IcebergExternalTable.java | 6 +- .../paimon/PaimonExternalTable.java | 4 +- .../mtmv/MTMVRefreshPartitionSnapshot.java | 7 ++- .../doris/mtmv/MTMVSnapshotIdSnapshot.java | 57 +++++++++++++++++++ .../doris/mtmv/MTMVVersionSnapshot.java | 4 -- .../doris/mtmv/MTMVRefreshSnapshotTest.java | 20 +++---- 6 files changed, 78 insertions(+), 20 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIdSnapshot.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index 2feab480d7edb8..9dd5e5b56705b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -38,8 +38,8 @@ import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; +import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot; import org.apache.doris.mtmv.MTMVSnapshotIf; -import org.apache.doris.mtmv.MTMVVersionSnapshot; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -223,7 +223,7 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont if (latestSnapshotId <= 0) { throw new AnalysisException("can not find partition: " + partitionName); } - return new MTMVVersionSnapshot(latestSnapshotId); + return new MTMVSnapshotIdSnapshot(latestSnapshotId); } @Override @@ -231,7 +231,7 @@ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) throws AnalysisException { PaimonSnapshotCacheValue paimonSnapshot = getOrFetchSnapshotCacheValue(snapshot); - return new MTMVVersionSnapshot(paimonSnapshot.getSnapshot().getSnapshotId()); + return new MTMVSnapshotIdSnapshot(paimonSnapshot.getSnapshot().getSnapshotId()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java index a8de5b6597bc49..d7d9b65f4a5f48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java @@ -100,10 +100,15 @@ public void compatible(MTMV mtmv) { } private void compatiblePartitions(MTMV mtmv) throws AnalysisException { + MTMVRelatedTableIf relatedTableIf = mtmv.getMvPartitionInfo().getRelatedTable(); + // Only olapTable has historical data issues that require compatibility + if (!(relatedTableIf instanceof OlapTable)) { + return; + } if (!checkHasDataWithoutPartitionId()) { return; } - OlapTable relatedTable = (OlapTable) mtmv.getMvPartitionInfo().getRelatedTable(); + OlapTable relatedTable = (OlapTable) relatedTableIf; for (Entry entry : partitions.entrySet()) { MTMVVersionSnapshot versionSnapshot = (MTMVVersionSnapshot) entry.getValue(); if (versionSnapshot.getId() == 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIdSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIdSnapshot.java new file mode 100644 index 00000000000000..cad23795c32cda --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIdSnapshot.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import com.google.common.base.Objects; +import com.google.gson.annotations.SerializedName; + +/** + * use snapshotId as identification of data changes + */ +public class MTMVSnapshotIdSnapshot implements MTMVSnapshotIf { + @SerializedName("s") + private long snapshotId; + + public MTMVSnapshotIdSnapshot(long snapshotId) { + this.snapshotId = snapshotId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MTMVSnapshotIdSnapshot that = (MTMVSnapshotIdSnapshot) o; + return snapshotId == that.snapshotId; + } + + @Override + public int hashCode() { + return Objects.hashCode(snapshotId); + } + + @Override + public String toString() { + return "MTMVSnapshotIdSnapshot{" + + "snapshotId=" + snapshotId + + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java index 2440649462ebf3..9ca95caebdd4b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java @@ -31,10 +31,6 @@ public class MTMVVersionSnapshot implements MTMVSnapshotIf { @SerializedName("id") private long id; - public MTMVVersionSnapshot(long version) { - this.version = version; - } - public MTMVVersionSnapshot(long version, long id) { this.version = version; this.id = id; diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java index 1890f9c9805926..3e61a7ca54aa1f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java @@ -35,8 +35,8 @@ public class MTMVRefreshSnapshotTest { private long baseExistTableId = 1L; private long correctVersion = 1L; private MTMVRefreshSnapshot refreshSnapshot = new MTMVRefreshSnapshot(); - private MTMVVersionSnapshot p1Snapshot = new MTMVVersionSnapshot(correctVersion); - private MTMVVersionSnapshot t1Snapshot = new MTMVVersionSnapshot(correctVersion); + private MTMVVersionSnapshot p1Snapshot = new MTMVVersionSnapshot(correctVersion, 0); + private MTMVVersionSnapshot t1Snapshot = new MTMVVersionSnapshot(correctVersion, 0); @Mocked private BaseTableInfo existTable; @Mocked @@ -92,19 +92,19 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc public void testPartitionSync() { // normal boolean sync = refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, relatedExistPartitionName, - new MTMVVersionSnapshot(correctVersion)); + new MTMVVersionSnapshot(correctVersion, 0)); Assert.assertTrue(sync); // non exist mv partition sync = refreshSnapshot.equalsWithRelatedPartition("mvp2", relatedExistPartitionName, - new MTMVVersionSnapshot(correctVersion)); + new MTMVVersionSnapshot(correctVersion, 0)); Assert.assertFalse(sync); // non exist related partition sync = refreshSnapshot - .equalsWithRelatedPartition(mvExistPartitionName, "p2", new MTMVVersionSnapshot(correctVersion)); + .equalsWithRelatedPartition(mvExistPartitionName, "p2", new MTMVVersionSnapshot(correctVersion, 0)); Assert.assertFalse(sync); // snapshot value not equal sync = refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, relatedExistPartitionName, - new MTMVVersionSnapshot(2L)); + new MTMVVersionSnapshot(2L, 0)); Assert.assertFalse(sync); // snapshot type not equal sync = refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, relatedExistPartitionName, @@ -116,19 +116,19 @@ public void testPartitionSync() { public void testTableSync() { // normal boolean sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, existTable, - new MTMVVersionSnapshot(correctVersion)); + new MTMVVersionSnapshot(correctVersion, 0)); Assert.assertTrue(sync); // non exist mv partition sync = refreshSnapshot - .equalsWithBaseTable("mvp2", existTable, new MTMVVersionSnapshot(correctVersion)); + .equalsWithBaseTable("mvp2", existTable, new MTMVVersionSnapshot(correctVersion, 0)); Assert.assertFalse(sync); // non exist related partition sync = refreshSnapshot - .equalsWithBaseTable(mvExistPartitionName, nonExistTable, new MTMVVersionSnapshot(correctVersion)); + .equalsWithBaseTable(mvExistPartitionName, nonExistTable, new MTMVVersionSnapshot(correctVersion, 0)); Assert.assertFalse(sync); // snapshot value not equal sync = refreshSnapshot - .equalsWithBaseTable(mvExistPartitionName, existTable, new MTMVVersionSnapshot(2L)); + .equalsWithBaseTable(mvExistPartitionName, existTable, new MTMVVersionSnapshot(2L, 0)); Assert.assertFalse(sync); // snapshot type not equal sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, existTable, From 6af9d0b3611614d0f94622b993b1264447a562da Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 21 Jan 2025 16:27:21 +0800 Subject: [PATCH 2/2] resolve gson --- .../src/main/java/org/apache/doris/persist/gson/GsonUtils.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 8a5516b7616999..4c5236cf6ef531 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -203,6 +203,7 @@ import org.apache.doris.load.sync.SyncJob; import org.apache.doris.load.sync.canal.CanalSyncJob; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; +import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot; import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVTimestampSnapshot; import org.apache.doris.mtmv.MTMVVersionSnapshot; @@ -451,6 +452,7 @@ public class GsonUtils { RuntimeTypeAdapterFactory.of(MTMVSnapshotIf.class, "clazz") .registerSubtype(MTMVMaxTimestampSnapshot.class, MTMVMaxTimestampSnapshot.class.getSimpleName()) .registerSubtype(MTMVTimestampSnapshot.class, MTMVTimestampSnapshot.class.getSimpleName()) + .registerSubtype(MTMVSnapshotIdSnapshot.class, MTMVSnapshotIdSnapshot.class.getSimpleName()) .registerSubtype(MTMVVersionSnapshot.class, MTMVVersionSnapshot.class.getSimpleName()); private static RuntimeTypeAdapterFactory dbTypeAdapterFactory = RuntimeTypeAdapterFactory.of(